package org.jetlinks.supports.rpc;

import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.core.Payload;
import org.jetlinks.core.codec.defaults.DirectCodec;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.rpc.Invoker;
import org.jetlinks.core.rpc.RpcDefinition;
import org.jetlinks.core.rpc.RpcService;
import org.jetlinks.supports.rpc.RpcRequest;
import org.jetlinks.supports.rpc.RpcResult;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/jetlinks/supports/rpc/EventBusRpcService.class */
public class EventBusRpcService implements RpcService {
    private static final Logger log = LoggerFactory.getLogger(EventBusRpcService.class);
    private final EventBus eventBus;
    private final long requesterId = ((Long) IDGenerator.SNOW_FLAKE.generate()).longValue();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jetlinks/supports/rpc/EventBusRpcService$PendingRequest.class */
    public class PendingRequest<REQ, RES> {
        long requestId;
        long requesterId;
        String reqTopicRes;
        String reqTopic;
        RpcDefinition<REQ, RES> definition;
        BiFunction<String, Publisher<REQ>, Publisher<RES>> invoker;
        EmitterProcessor<REQ> processor = EmitterProcessor.create(Integer.MAX_VALUE);
        FluxSink<REQ> sink = this.processor.sink();
        boolean started = false;

        public PendingRequest(long j, long j2, RpcDefinition<REQ, RES> rpcDefinition, BiFunction<String, Publisher<REQ>, Publisher<RES>> biFunction, Disposable disposable) {
            this.requestId = j2;
            this.requesterId = j;
            this.reqTopic = EventBusRpcService.this.getTopic(rpcDefinition);
            this.reqTopicRes = this.reqTopic + "/" + j + "/_reply";
            this.definition = rpcDefinition;
            this.invoker = biFunction;
            doStart();
            this.sink.onDispose(disposable);
        }

        void doStart() {
            if (this.started) {
                return;
            }
            EventBusRpcService.log.trace("handle rpc request {},requestId:{}", this.definition, Long.valueOf(this.requestId));
            this.started = true;
            Flux.from(this.invoker.apply(this.reqTopic, this.processor)).flatMap(obj -> {
                return EventBusRpcService.this.reply(this.reqTopicRes, RpcResult.result(this.requestId, this.definition.responseCodec().encode(obj)));
            }).doOnComplete(() -> {
                EventBusRpcService.this.reply(this.reqTopicRes, RpcResult.complete(this.requestId)).subscribe();
            }).doOnError(th -> {
                EventBusRpcService.log.error(th.getMessage(), th);
                EventBusRpcService.this.reply(this.reqTopicRes, RpcResult.error(this.requestId, this.definition.errorCodec().encode(th))).subscribe();
            }).subscribe();
        }

        void release() {
            this.processor.onComplete();
        }

        void next(RpcRequest rpcRequest) {
            try {
                if (rpcRequest.getType() == RpcRequest.Type.COMPLETE) {
                    this.sink.complete();
                    return;
                }
                Object decode = rpcRequest.decode(this.definition.requestCodec(), false);
                if (decode != null) {
                    this.sink.next(decode);
                }
                if (!(decode instanceof ReferenceCounted)) {
                    ReferenceCountUtil.safeRelease(rpcRequest);
                }
                if (rpcRequest.getType() == RpcRequest.Type.NEXT_AND_END) {
                    this.sink.complete();
                }
            } catch (Throwable th) {
                EventBusRpcService.log.error(th.getMessage(), th);
                this.sink.error(th);
            } finally {
                ReferenceCountUtil.safeRelease(rpcRequest);
            }
        }
    }

    public <REQ, RES> Disposable listen(RpcDefinition<REQ, RES> rpcDefinition, BiFunction<String, REQ, Publisher<RES>> biFunction) {
        return doListen(rpcDefinition, (str, publisher) -> {
            return Flux.from(publisher).flatMap(obj -> {
                return (Publisher) biFunction.apply(str, obj);
            });
        });
    }

    public <RES> Disposable listen(RpcDefinition<Void, RES> rpcDefinition, Function<String, Publisher<RES>> function) {
        return doListen(rpcDefinition, (str, publisher) -> {
            return Flux.from(publisher).thenMany((Publisher) function.apply(str));
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getTopic(RpcDefinition<?, ?> rpcDefinition) {
        String address = rpcDefinition.getAddress();
        if (!address.startsWith("/")) {
            address = "/" + address;
        }
        if (address.endsWith("/")) {
            address = address.substring(0, address.length() - 1);
        }
        return address;
    }

    public <REQ, RES> Invoker<REQ, RES> createInvoker(final RpcDefinition<REQ, RES> rpcDefinition) {
        final String topic = getTopic(rpcDefinition);
        String str = topic + "/" + this.requesterId + "/_reply";
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        final Disposable subscribe = this.eventBus.subscribe(Subscription.of(rpcDefinition.getId(), str, new Subscription.Feature[]{Subscription.Feature.local, Subscription.Feature.broker})).doOnNext(topicPayload -> {
            try {
                RpcResult parse = RpcResult.parse(topicPayload);
                log.trace("handle rpc[{}] reply {} {}", new Object[]{rpcDefinition, parse.getType(), Long.valueOf(parse.getRequestId())});
                FluxSink fluxSink = (FluxSink) concurrentHashMap.get(Long.valueOf(parse.getRequestId()));
                if (null == fluxSink || fluxSink.isCancelled()) {
                    log.info("discard rpc[{}] reply {} {}", new Object[]{rpcDefinition, parse.getType(), Long.valueOf(parse.getRequestId())});
                } else {
                    fluxSink.next(parse);
                }
            } finally {
                ReferenceCountUtil.safeRelease(topicPayload);
            }
        }).onErrorContinue((th, obj) -> {
            log.error(th.getMessage(), th);
        }).subscribe();
        return new Invoker<REQ, RES>() { // from class: org.jetlinks.supports.rpc.EventBusRpcService.1
            public Flux<RES> invoke() {
                return invoke((Publisher) null);
            }

            private Mono<Long> doSend(long j, Publisher<? extends REQ> publisher) {
                if (publisher instanceof Mono) {
                    Mono from = Mono.from(publisher);
                    String str2 = topic;
                    RpcDefinition rpcDefinition2 = rpcDefinition;
                    return from.flatMap(obj2 -> {
                        return EventBusRpcService.this.eventBus.publish(str2, DirectCodec.INSTANCE, RpcRequest.nextAndComplete(EventBusRpcService.this.requesterId, j, rpcDefinition2.requestCodec().encode(obj2)), Schedulers.immediate());
                    });
                }
                if (!(publisher instanceof Flux)) {
                    return EventBusRpcService.this.eventBus.publish(topic, DirectCodec.INSTANCE, RpcRequest.nextAndComplete(EventBusRpcService.this.requesterId, j, Payload.voidPayload), Schedulers.immediate());
                }
                Flux from2 = Flux.from(publisher);
                RpcDefinition rpcDefinition3 = rpcDefinition;
                Flux map = from2.map(obj3 -> {
                    return RpcRequest.next(EventBusRpcService.this.requesterId, j, rpcDefinition3.requestCodec().encode(obj3));
                });
                String str3 = topic;
                Mono mono = (Mono) map.as(flux -> {
                    return EventBusRpcService.this.eventBus.publish(str3, DirectCodec.INSTANCE, flux, Schedulers.immediate());
                });
                String str4 = topic;
                return mono.doOnSuccess(l -> {
                    EventBusRpcService.this.eventBus.publish(str4, RpcRequest.complete(EventBusRpcService.this.requesterId, j), Schedulers.immediate()).subscribe();
                });
            }

            public Flux<RES> invoke(Publisher<? extends REQ> publisher) {
                Map map = concurrentHashMap;
                RpcDefinition rpcDefinition2 = rpcDefinition;
                Flux create = Flux.create(fluxSink -> {
                    long longValue = ((Long) IDGenerator.SNOW_FLAKE.generate()).longValue();
                    map.put(Long.valueOf(longValue), fluxSink);
                    fluxSink.onDispose(() -> {
                    });
                    EventBusRpcService.log.trace("do invoke rpc:{},requestId:{}", rpcDefinition2.getAddress(), Long.valueOf(longValue));
                    Mono doOnNext = doSend(longValue, publisher).doOnNext(l -> {
                        if (l.longValue() == 0) {
                            fluxSink.error(new UnsupportedOperationException("no rpc service for:" + rpcDefinition2.getAddress()));
                        }
                    });
                    fluxSink.getClass();
                    doOnNext.doOnError(fluxSink::error).subscribe();
                });
                RpcDefinition rpcDefinition3 = rpcDefinition;
                Flux handle = create.handle((rpcResult, synchronousSink) -> {
                    try {
                        if (rpcResult.getType() == RpcResult.Type.RESULT_AND_COMPLETE) {
                            Object decode = rpcDefinition3.responseCodec().decode(rpcResult);
                            if (decode != null) {
                                synchronousSink.next(decode);
                            }
                            synchronousSink.complete();
                        } else if (rpcResult.getType() == RpcResult.Type.RESULT) {
                            Object decode2 = rpcDefinition3.responseCodec().decode(rpcResult);
                            if (decode2 != null) {
                                synchronousSink.next(decode2);
                            }
                        } else if (rpcResult.getType() == RpcResult.Type.COMPLETE) {
                            synchronousSink.complete();
                        } else if (rpcResult.getType() == RpcResult.Type.ERROR) {
                            Throwable th2 = (Throwable) rpcDefinition3.errorCodec().decode(rpcResult);
                            if (th2 != null) {
                                synchronousSink.error(th2);
                            } else {
                                synchronousSink.complete();
                            }
                        }
                    } finally {
                        ReferenceCountUtil.safeRelease(rpcResult);
                    }
                });
                Duration ofSeconds = Duration.ofSeconds(10L);
                RpcDefinition rpcDefinition4 = rpcDefinition;
                return handle.timeout(ofSeconds, Mono.error(() -> {
                    return new TimeoutException("invoke " + rpcDefinition4 + "timeout");
                }));
            }

            public void dispose() {
                subscribe.dispose();
            }

            public boolean isDisposed() {
                return subscribe.isDisposed();
            }
        };
    }

    protected Mono<Void> reply(String str, RpcResult rpcResult) {
        return this.eventBus.publish(str, rpcResult, Schedulers.immediate()).doOnNext(l -> {
            if (l.longValue() == 0) {
                log.warn("reply rpc request {} requestId:{} failed: no listener[{}]", new Object[]{rpcResult.getType(), Long.valueOf(rpcResult.getRequestId()), str});
            } else {
                log.trace("reply rpc request {} requestId:{}", rpcResult.getType(), Long.valueOf(rpcResult.getRequestId()));
            }
        }).then();
    }

    private <REQ, RES> Disposable doListen(RpcDefinition<REQ, RES> rpcDefinition, BiFunction<String, Publisher<REQ>, Publisher<RES>> biFunction) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Flux map = this.eventBus.subscribe(Subscription.of(rpcDefinition.getId(), rpcDefinition.getAddress(), new Subscription.Feature[]{Subscription.Feature.local, Subscription.Feature.broker})).map((v0) -> {
            return RpcRequest.parse(v0);
        });
        concurrentHashMap.getClass();
        return map.doOnCancel(concurrentHashMap::clear).subscribe(rpcRequest -> {
            ((PendingRequest) concurrentHashMap.computeIfAbsent(Long.valueOf(rpcRequest.getRequestId()), l -> {
                return new PendingRequest(rpcRequest.getRequesterId(), l.longValue(), rpcDefinition, biFunction, () -> {
                });
            })).next(rpcRequest);
        });
    }

    public EventBusRpcService(EventBus eventBus) {
        this.eventBus = eventBus;
    }
}
