/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.exceptions.RejectedResumeException;
import io.rsocket.frame.ErrorFrameFlyweight;
import io.rsocket.frame.ResumeFrameFlyweight;
import io.rsocket.frame.ResumeOkFrameFlyweight;
import io.rsocket.resume.RSocketSession;
import io.rsocket.resume.ResumableDuplexConnection;
import io.rsocket.resume.ResumableFramesStore;
import io.rsocket.resume.ResumeStateException;
import java.time.Duration;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;

public class ServerRSocketSession
implements RSocketSession<DuplexConnection> {
    private static final Logger logger = LoggerFactory.getLogger(ServerRSocketSession.class);
    private final ResumableDuplexConnection resumableConnection;
    private final FluxProcessor<DuplexConnection, DuplexConnection> newConnections = ReplayProcessor.create((int)0);
    private final ByteBufAllocator allocator;
    private final ByteBuf resumeToken;

    public ServerRSocketSession(DuplexConnection duplexConnection, Duration resumeSessionDuration, Duration resumeStreamTimeout, Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory, ByteBuf resumeToken, boolean cleanupStoreOnKeepAlive) {
        this.allocator = duplexConnection.alloc();
        this.resumeToken = resumeToken;
        this.resumableConnection = new ResumableDuplexConnection("server", duplexConnection, resumeStoreFactory.apply((ByteBuf)resumeToken), resumeStreamTimeout, cleanupStoreOnKeepAlive);
        Mono timeout = this.resumableConnection.connectionErrors().flatMap(err -> {
            logger.debug("Starting session timeout due to error", err);
            return this.newConnections.next().doOnNext(c -> logger.debug("Connection after error: {}", c)).timeout(resumeSessionDuration);
        }).then().cast(DuplexConnection.class);
        this.newConnections.mergeWith((Publisher)timeout).subscribe(connection -> {
            this.reconnect((DuplexConnection)connection);
            logger.debug("Server ResumableConnection reconnected: {}", connection);
        }, err -> {
            logger.debug("Server ResumableConnection reconnect timeout");
            this.resumableConnection.dispose();
        });
    }

    public ServerRSocketSession continueWith(DuplexConnection connectionFactory) {
        logger.debug("Server continued with connection: {}", (Object)connectionFactory);
        this.newConnections.onNext((Object)connectionFactory);
        return this;
    }

    @Override
    public ServerRSocketSession resumeWith(ByteBuf resumeFrame) {
        logger.debug("Resume FRAME received");
        long remotePos = ServerRSocketSession.remotePos(resumeFrame);
        long remoteImpliedPos = ServerRSocketSession.remoteImpliedPos(resumeFrame);
        resumeFrame.release();
        this.resumableConnection.resume(remotePos, remoteImpliedPos, pos -> pos.flatMap(impliedPos -> this.sendFrame(ResumeOkFrameFlyweight.encode(this.allocator, impliedPos))).onErrorResume(err -> this.sendFrame(ErrorFrameFlyweight.encode(this.allocator, 0, ServerRSocketSession.errorFrameThrowable(err))).then(Mono.fromRunnable(this.resumableConnection::dispose)).then(Mono.never())));
        return this;
    }

    @Override
    public void reconnect(DuplexConnection connection) {
        this.resumableConnection.reconnect(connection);
    }

    @Override
    public ResumableDuplexConnection resumableConnection() {
        return this.resumableConnection;
    }

    @Override
    public ByteBuf token() {
        return this.resumeToken;
    }

    private Mono<Void> sendFrame(ByteBuf frame) {
        logger.debug("Sending Resume frame: {}", (Object)frame);
        return this.resumableConnection.sendOne(frame).onErrorResume(e -> Mono.empty());
    }

    private static long remotePos(ByteBuf resumeFrame) {
        return ResumeFrameFlyweight.firstAvailableClientPos(resumeFrame);
    }

    private static long remoteImpliedPos(ByteBuf resumeFrame) {
        return ResumeFrameFlyweight.lastReceivedServerPos(resumeFrame);
    }

    private static RejectedResumeException errorFrameThrowable(Throwable err) {
        String msg;
        if (err instanceof ResumeStateException) {
            ResumeStateException resumeException = (ResumeStateException)err;
            msg = String.format("resumption_pos=[ remote: { pos: %d, impliedPos: %d }, local: { pos: %d, impliedPos: %d }]", resumeException.getRemotePos(), resumeException.getRemoteImpliedPos(), resumeException.getLocalPos(), resumeException.getLocalImpliedPos());
        } else {
            msg = String.format("resume_internal_error: %s", err.getMessage());
        }
        return new RejectedResumeException(msg);
    }
}

