package org.jetlinks.rule.engine.cluster;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.rpc.RpcServiceFactory;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.cluster.scheduler.RemoteScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/cluster/ClusterSchedulerRegistry.class */
public class ClusterSchedulerRegistry implements SchedulerRegistry {
    private static final Logger log = LoggerFactory.getLogger(ClusterSchedulerRegistry.class);
    private final EventBus eventBus;
    private final RpcServiceFactory serviceFactory;
    private final Set<Scheduler> localSchedulers = new ConcurrentSkipListSet(Comparator.comparing((v0) -> {
        return v0.getId();
    }));
    private final Map<String, RemoteScheduler> remoteSchedulers = new ConcurrentHashMap();
    private final EmitterProcessor<Scheduler> joinProcessor = EmitterProcessor.create(Integer.MAX_VALUE, false);
    private final EmitterProcessor<Scheduler> leaveProcessor = EmitterProcessor.create(Integer.MAX_VALUE, false);
    private final FluxSink<Scheduler> joinSink = this.joinProcessor.sink();
    private final FluxSink<Scheduler> leaveSink = this.leaveProcessor.sink();
    private final List<Disposable> disposables = new CopyOnWriteArrayList();
    private Duration keepaliveInterval = Duration.ofSeconds(10);

    public ClusterSchedulerRegistry(EventBus eventBus, RpcServiceFactory rpcServiceFactory) {
        this.eventBus = eventBus;
        this.serviceFactory = rpcServiceFactory;
    }

    public void setup() {
        if (this.disposables.isEmpty()) {
            this.joinProcessor.subscribe(scheduler -> {
                RemoteScheduler put = this.remoteSchedulers.put(scheduler.getId(), (RemoteScheduler) scheduler);
                if (put != null) {
                    put.dispose();
                }
                log.debug("remote scheduler join:{}", scheduler.getId());
            });
            this.leaveProcessor.subscribe(scheduler2 -> {
                log.debug("remote scheduler leave:{}", scheduler2.getId());
                RemoteScheduler remove = this.remoteSchedulers.remove(scheduler2.getId());
                if (remove != null && remove != scheduler2) {
                    remove.dispose();
                }
                scheduler2.dispose();
            });
            this.disposables.add(this.eventBus.subscribe(Subscription.of("rule-engine.register", "/rule-engine/cluster-scheduler/keepalive", new Subscription.Feature[]{Subscription.Feature.broker}), String.class).filter(str -> {
                return !this.remoteSchedulers.containsKey(str);
            }).doOnNext(str2 -> {
                RemoteScheduler remoteScheduler = new RemoteScheduler(str2, this.serviceFactory);
                remoteScheduler.init();
                this.joinSink.next(remoteScheduler);
                publishLocal().subscribe();
            }).subscribe());
            List<Disposable> list = this.disposables;
            Flux subscribe = this.eventBus.subscribe(Subscription.of("rule-engine.register", "/rule-engine/cluster-scheduler/leave", new Subscription.Feature[]{Subscription.Feature.broker}), String.class);
            Map<String, RemoteScheduler> map = this.remoteSchedulers;
            map.getClass();
            Flux flatMap = subscribe.filter((v1) -> {
                return r2.containsKey(v1);
            }).flatMap(str3 -> {
                return Mono.justOrEmpty(this.remoteSchedulers.remove(str3));
            });
            FluxSink<Scheduler> fluxSink = this.leaveSink;
            fluxSink.getClass();
            list.add(flatMap.doOnNext((v1) -> {
                r2.next(v1);
            }).subscribe());
            this.disposables.add(Flux.interval(this.keepaliveInterval).subscribe(l -> {
                Mono<Void> publishLocal = publishLocal();
                Flux filterWhen = Flux.fromIterable(this.remoteSchedulers.values()).filterWhen(remoteScheduler -> {
                    return remoteScheduler.isNoAlive().onErrorResume(th -> {
                        return Mono.just(true);
                    });
                });
                FluxSink<Scheduler> fluxSink2 = this.leaveSink;
                fluxSink2.getClass();
                publishLocal.then(filterWhen.doOnNext((v1) -> {
                    r2.next(v1);
                }).then()).subscribe();
            }));
            publishLocal().block();
        }
    }

    private Mono<Void> publishLocal() {
        return this.eventBus.publish("/rule-engine/cluster-scheduler/keepalive", Flux.fromIterable(this.localSchedulers).map((v0) -> {
            return v0.getId();
        })).then();
    }

    public void cleanup() {
        this.eventBus.publish("/rule-engine/cluster-scheduler/leave", Flux.fromIterable(this.localSchedulers).map((v0) -> {
            return v0.getId();
        })).subscribe();
        this.remoteSchedulers.values().forEach((v0) -> {
            v0.dispose();
        });
        this.disposables.forEach((v0) -> {
            v0.dispose();
        });
        this.disposables.clear();
        this.remoteSchedulers.clear();
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public Flux<Scheduler> getSchedulers() {
        return Flux.just(new Collection[]{this.localSchedulers, this.remoteSchedulers.values()}).flatMapIterable(Function.identity());
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public Flux<Scheduler> handleSchedulerJoin() {
        return this.joinProcessor;
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public Flux<Scheduler> handleSchedulerLeave() {
        return this.leaveProcessor;
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public void register(Scheduler scheduler) {
        this.localSchedulers.add(scheduler);
        if (this.disposables.isEmpty()) {
            return;
        }
        publishLocal().subscribe();
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public List<Scheduler> getLocalSchedulers() {
        return new ArrayList(this.localSchedulers);
    }

    public void setKeepaliveInterval(Duration duration) {
        this.keepaliveInterval = duration;
    }
}
