/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.rsocket.AbstractRSocket;
import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.DefaultConnectionSetupPayload;
import io.rsocket.core.Invalidatable;
import io.rsocket.core.RSocketRequester;
import io.rsocket.core.RSocketResponder;
import io.rsocket.core.ReconnectMono;
import io.rsocket.core.Resume;
import io.rsocket.core.StreamIdSupplier;
import io.rsocket.frame.SetupFrameFlyweight;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.ClientServerInputMultiplexer;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.lease.LeaseStats;
import io.rsocket.lease.Leases;
import io.rsocket.lease.RequesterLeaseHandler;
import io.rsocket.lease.ResponderLeaseHandler;
import io.rsocket.plugins.InitializingInterceptorRegistry;
import io.rsocket.plugins.InterceptorRegistry;
import io.rsocket.resume.ClientRSocketSession;
import io.rsocket.transport.ClientTransport;
import io.rsocket.util.EmptyPayload;
import java.time.Duration;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

public class RSocketConnector {
    private static final String CLIENT_TAG = "client";
    private static final int MIN_MTU_SIZE = 64;
    private static final BiConsumer<RSocket, Invalidatable> INVALIDATE_FUNCTION = (r, i) -> r.onClose().subscribe(null, __ -> i.invalidate(), i::invalidate);
    private Payload setupPayload = EmptyPayload.INSTANCE;
    private String metadataMimeType = "application/binary";
    private String dataMimeType = "application/binary";
    private SocketAcceptor acceptor = (setup, sendingSocket) -> Mono.just((Object)new AbstractRSocket(){});
    private InitializingInterceptorRegistry interceptors = new InitializingInterceptorRegistry();
    private Duration keepAliveInterval = Duration.ofSeconds(20L);
    private Duration keepAliveMaxLifeTime = Duration.ofSeconds(90L);
    private Retry retrySpec;
    private Resume resume;
    private Supplier<Leases<?>> leasesSupplier;
    private int mtu = 0;
    private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
    private Consumer<Throwable> errorConsumer = ex -> {};

    private RSocketConnector() {
    }

    public static RSocketConnector create() {
        return new RSocketConnector();
    }

    public static Mono<RSocket> connectWith(ClientTransport transport) {
        return RSocketConnector.create().connect(() -> transport);
    }

    public RSocketConnector setupPayload(Payload payload) {
        this.setupPayload = payload;
        return this;
    }

    public RSocketConnector dataMimeType(String dataMimeType) {
        this.dataMimeType = dataMimeType;
        return this;
    }

    public RSocketConnector metadataMimeType(String metadataMimeType) {
        this.metadataMimeType = metadataMimeType;
        return this;
    }

    public RSocketConnector interceptors(Consumer<InterceptorRegistry> consumer) {
        consumer.accept(this.interceptors);
        return this;
    }

    public RSocketConnector acceptor(SocketAcceptor acceptor) {
        this.acceptor = acceptor;
        return this;
    }

    public RSocketConnector keepAlive(Duration interval, Duration maxLifeTime) {
        if (!interval.negated().isNegative()) {
            throw new IllegalArgumentException("`interval` for keepAlive must be > 0");
        }
        if (!maxLifeTime.negated().isNegative()) {
            throw new IllegalArgumentException("`maxLifeTime` for keepAlive must be > 0");
        }
        this.keepAliveInterval = interval;
        this.keepAliveMaxLifeTime = maxLifeTime;
        return this;
    }

    public RSocketConnector reconnect(Retry retrySpec) {
        this.retrySpec = Objects.requireNonNull(retrySpec);
        return this;
    }

    public RSocketConnector resume(Resume resume) {
        this.resume = resume;
        return this;
    }

    public RSocketConnector lease(Supplier<Leases<? extends LeaseStats>> supplier) {
        this.leasesSupplier = supplier;
        return this;
    }

    public RSocketConnector fragment(int mtu) {
        if (mtu > 0 && mtu < 64 || mtu < 0) {
            String msg = String.format("smallest allowed mtu size is %d bytes, provided: %d", 64, mtu);
            throw new IllegalArgumentException(msg);
        }
        this.mtu = mtu;
        return this;
    }

    public RSocketConnector payloadDecoder(PayloadDecoder payloadDecoder) {
        Objects.requireNonNull(payloadDecoder);
        this.payloadDecoder = payloadDecoder;
        return this;
    }

    @Deprecated
    public RSocketConnector errorConsumer(Consumer<Throwable> errorConsumer) {
        Objects.requireNonNull(errorConsumer);
        this.errorConsumer = errorConsumer;
        return this;
    }

    public Mono<RSocket> connect(ClientTransport transport) {
        return this.connect(() -> transport);
    }

    public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
        Mono connectionMono = Mono.fromSupplier(transportSupplier).flatMap(t -> t.connect(this.mtu));
        return (Mono)connectionMono.flatMap(connection -> {
            DuplexConnection wrappedConnection;
            KeepAliveHandler keepAliveHandler;
            ByteBuf resumeToken;
            if (this.resume != null) {
                resumeToken = this.resume.getTokenSupplier().get();
                ClientRSocketSession session = new ClientRSocketSession((DuplexConnection)connection, this.resume.getSessionDuration(), this.resume.getRetry(), this.resume.getStoreFactory(CLIENT_TAG).apply((ByteBuf)resumeToken), this.resume.getStreamTimeout(), this.resume.isCleanupStoreOnKeepAlive()).continueWith((Mono<DuplexConnection>)connectionMono).resumeToken(resumeToken);
                keepAliveHandler = new KeepAliveHandler.ResumableKeepAliveHandler(session.resumableConnection());
                wrappedConnection = session.resumableConnection();
            } else {
                resumeToken = Unpooled.EMPTY_BUFFER;
                keepAliveHandler = new KeepAliveHandler.DefaultKeepAliveHandler((Closeable)connection);
                wrappedConnection = connection;
            }
            ClientServerInputMultiplexer multiplexer = new ClientServerInputMultiplexer(wrappedConnection, this.interceptors, true);
            boolean leaseEnabled = this.leasesSupplier != null;
            Leases<?> leases = leaseEnabled ? this.leasesSupplier.get() : null;
            RequesterLeaseHandler requesterLeaseHandler = leaseEnabled ? new RequesterLeaseHandler.Impl(CLIENT_TAG, leases.receiver()) : RequesterLeaseHandler.None;
            RSocketRequester rSocketRequester = new RSocketRequester(multiplexer.asClientConnection(), this.payloadDecoder, this.errorConsumer, StreamIdSupplier.clientSupplier(), this.mtu, (int)this.keepAliveInterval.toMillis(), (int)this.keepAliveMaxLifeTime.toMillis(), keepAliveHandler, requesterLeaseHandler);
            RSocket wrappedRSocketRequester = this.interceptors.initRequester(rSocketRequester);
            ByteBuf setupFrame = SetupFrameFlyweight.encode(wrappedConnection.alloc(), leaseEnabled, (int)this.keepAliveInterval.toMillis(), (int)this.keepAliveMaxLifeTime.toMillis(), resumeToken, this.metadataMimeType, this.dataMimeType, this.setupPayload);
            DefaultConnectionSetupPayload setup = new DefaultConnectionSetupPayload(setupFrame);
            return this.interceptors.initSocketAcceptor(this.acceptor).accept(setup, wrappedRSocketRequester).flatMap(rSocketHandler -> {
                RSocket wrappedRSocketHandler = this.interceptors.initResponder((RSocket)rSocketHandler);
                ResponderLeaseHandler responderLeaseHandler = leaseEnabled ? new ResponderLeaseHandler.Impl<LeaseStats>(CLIENT_TAG, wrappedConnection.alloc(), leases.sender(), this.errorConsumer, leases.stats()) : ResponderLeaseHandler.None;
                RSocketResponder rSocketResponder = new RSocketResponder(multiplexer.asServerConnection(), wrappedRSocketHandler, this.payloadDecoder, this.errorConsumer, responderLeaseHandler, this.mtu);
                return wrappedConnection.sendOne(setupFrame).thenReturn((Object)wrappedRSocketRequester);
            });
        }).as(source -> {
            if (this.retrySpec != null) {
                return new ReconnectMono<RSocket>(source.retryWhen(this.retrySpec), Disposable::dispose, INVALIDATE_FUNCTION);
            }
            return source;
        });
    }
}

