/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.fragmentation;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.fragmentation.FrameReassembler;
import io.rsocket.frame.FrameLengthFlyweight;
import java.util.Objects;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

public class ReassemblyDuplexConnection
implements DuplexConnection {
    private final DuplexConnection delegate;
    private final FrameReassembler frameReassembler;
    private final boolean decodeLength;

    public ReassemblyDuplexConnection(DuplexConnection delegate, boolean decodeLength) {
        Objects.requireNonNull(delegate, "delegate must not be null");
        this.decodeLength = decodeLength;
        this.delegate = delegate;
        this.frameReassembler = new FrameReassembler(delegate.alloc());
        delegate.onClose().doFinally(s -> this.frameReassembler.dispose()).subscribe();
    }

    @Override
    public Mono<Void> send(Publisher<ByteBuf> frames) {
        return this.delegate.send(frames);
    }

    @Override
    public Mono<Void> sendOne(ByteBuf frame) {
        return this.delegate.sendOne(frame);
    }

    private ByteBuf decode(ByteBuf frame) {
        if (this.decodeLength) {
            return FrameLengthFlyweight.frame(frame).retain();
        }
        return frame;
    }

    @Override
    public Flux<ByteBuf> receive() {
        return this.delegate.receive().handle((byteBuf, sink) -> {
            ByteBuf decode = this.decode((ByteBuf)byteBuf);
            this.frameReassembler.reassembleFrame(decode, (SynchronousSink<ByteBuf>)sink);
        });
    }

    @Override
    public ByteBufAllocator alloc() {
        return this.delegate.alloc();
    }

    @Override
    public Mono<Void> onClose() {
        return this.delegate.onClose();
    }

    public void dispose() {
        this.delegate.dispose();
    }
}

