package org.jetlinks.supports.cluster.redis;

import java.time.Duration;
import java.util.Map;
import org.jetlinks.core.cache.Caches;
import org.jetlinks.core.cluster.ClusterCache;
import org.jetlinks.core.cluster.ClusterCounter;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.cluster.ClusterNotifier;
import org.jetlinks.core.cluster.ClusterQueue;
import org.jetlinks.core.cluster.ClusterSet;
import org.jetlinks.core.cluster.ClusterTopic;
import org.jetlinks.core.cluster.HaManager;
import org.jetlinks.core.cluster.ServerNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/jetlinks/supports/cluster/redis/RedisClusterManager.class */
public class RedisClusterManager implements ClusterManager {
    private static final Logger log = LoggerFactory.getLogger(RedisClusterManager.class);
    private String clusterName;
    private String serverId;
    private Map<String, RedisClusterQueue> queues;
    private Map<String, ClusterTopic> topics;
    private Map<String, ClusterCache> caches;
    private Map<String, ClusterSet> sets;
    private ReactiveRedisTemplate<?, ?> commonOperations;
    private RedisHaManager haManager;
    private RedisClusterNotifier notifier;
    private ReactiveRedisOperations<String, String> stringOperations;
    private ReactiveRedisTemplate<String, ?> queueRedisTemplate;

    public RedisClusterManager(String str, ServerNode serverNode, ReactiveRedisTemplate<?, ?> reactiveRedisTemplate) {
        this.queues = Caches.newCache();
        this.topics = Caches.newCache();
        this.caches = Caches.newCache();
        this.sets = Caches.newCache();
        this.clusterName = str;
        this.commonOperations = reactiveRedisTemplate;
        this.notifier = new RedisClusterNotifier(str, serverNode.getId(), this);
        this.serverId = serverNode.getId();
        this.haManager = new RedisHaManager(str, serverNode, this, reactiveRedisTemplate);
        this.stringOperations = new ReactiveRedisTemplate(reactiveRedisTemplate.getConnectionFactory(), RedisSerializationContext.string());
        this.queueRedisTemplate = new ReactiveRedisTemplate<>(reactiveRedisTemplate.getConnectionFactory(), RedisSerializationContext.newSerializationContext().key(RedisSerializer.string()).value(reactiveRedisTemplate.getSerializationContext().getValueSerializationPair()).hashKey(RedisSerializer.string()).hashValue(reactiveRedisTemplate.getSerializationContext().getHashValueSerializationPair()).build());
    }

    public RedisClusterManager(String str, String str2, ReactiveRedisTemplate<?, ?> reactiveRedisTemplate) {
        this(str, ServerNode.builder().id(str2).build(), reactiveRedisTemplate);
    }

    public String getCurrentServerId() {
        return this.serverId;
    }

    public void startup() {
        this.notifier.startup();
        this.haManager.startup();
        Flux.interval(Duration.ofSeconds(5L)).flatMap(l -> {
            return Flux.fromIterable(this.queues.values());
        }).subscribe((v0) -> {
            v0.tryPoll();
        });
        this.queueRedisTemplate.listenToPattern(new String[]{"queue:data:produced"}).doOnError(th -> {
            log.error(th.getMessage(), th);
        }).subscribe(message -> {
            RedisClusterQueue redisClusterQueue = this.queues.get(message.getMessage());
            if (redisClusterQueue != null) {
                redisClusterQueue.tryPoll();
            }
        });
    }

    public void shutdown() {
        this.haManager.shutdown();
    }

    public HaManager getHaManager() {
        return this.haManager;
    }

    protected <K, V> ReactiveRedisTemplate<K, V> getRedis() {
        return (ReactiveRedisTemplate<K, V>) this.commonOperations;
    }

    public String getClusterName() {
        return this.clusterName;
    }

    public ClusterNotifier getNotifier() {
        return this.notifier;
    }

    public <T> ClusterQueue<T> getQueue(String str) {
        return this.queues.computeIfAbsent(str, str2 -> {
            return new RedisClusterQueue(str2, this.queueRedisTemplate);
        });
    }

    public <T> ClusterTopic<T> getTopic(String str) {
        return this.topics.computeIfAbsent(str, str2 -> {
            return new RedisClusterTopic(str2, getRedis());
        });
    }

    public <K, V> ClusterCache<K, V> getCache(String str) {
        return this.caches.computeIfAbsent(str, str2 -> {
            return new RedisClusterCache(str2, (ReactiveRedisOperations<Object, Object>) getRedis());
        });
    }

    public <V> ClusterSet<V> getSet(String str) {
        return this.sets.computeIfAbsent(str, str2 -> {
            return new RedisClusterSet(str2, (ReactiveRedisOperations<Object, Object>) getRedis());
        });
    }

    public ClusterCounter getCounter(String str) {
        return new RedisClusterCounter(this.stringOperations, this.clusterName + ":counter:" + str);
    }
}
