package org.apache.flink.runtime.io.network.netty;

import java.net.ProtocolException;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/ZeroCopyNettyMessageDecoder.class */
public class ZeroCopyNettyMessageDecoder extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(ZeroCopyNettyMessageDecoder.class);
    private static final int INITIAL_MESSAGE_HEADER_BUFFER_LENGTH = 128;
    private final NetworkBufferAllocator networkBufferAllocator;
    private NettyMessage currentNettyMessage;
    private int remainingMessageHeaderToCopy = -1;
    private byte msgId = -1;
    private int remainingBufferSize = -1;
    private final ByteBuf frameHeaderBuffer = Unpooled.directBuffer(9, 9);
    private final ByteBuf messageHeaderBuffer = Unpooled.directBuffer(128);

    /* JADX INFO: Access modifiers changed from: package-private */
    public ZeroCopyNettyMessageDecoder(NetworkBufferAllocator networkBufferAllocator) {
        this.networkBufferAllocator = networkBufferAllocator;
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!(obj instanceof ByteBuf)) {
            channelHandlerContext.fireChannelRead(obj);
            return;
        }
        ByteBuf byteBuf = (ByteBuf) obj;
        while (byteBuf.readableBytes() > 0) {
            try {
                if (this.frameHeaderBuffer.isWritable()) {
                    copyToTargetBuffer(byteBuf, this.frameHeaderBuffer, this.frameHeaderBuffer.writableBytes());
                    if (this.frameHeaderBuffer.isWritable()) {
                        break;
                    } else {
                        decodeFrameHeader();
                    }
                }
                if (this.remainingMessageHeaderToCopy > 0 || (this.remainingMessageHeaderToCopy == 0 && this.messageHeaderBuffer.writerIndex() == 0)) {
                    this.remainingMessageHeaderToCopy -= copyToTargetBuffer(byteBuf, this.messageHeaderBuffer, this.remainingMessageHeaderToCopy);
                    if (this.remainingMessageHeaderToCopy != 0) {
                        break;
                    }
                    this.currentNettyMessage = decodeNettyMessage();
                    if (this.msgId != 0) {
                        channelHandlerContext.fireChannelRead(this.currentNettyMessage);
                        clearState();
                    }
                }
                if (readOrDiscardBufferResponse(byteBuf)) {
                    channelHandlerContext.fireChannelRead(this.currentNettyMessage);
                    clearState();
                }
            } catch (Throwable th) {
                byteBuf.release();
                throw th;
            }
        }
        Preconditions.checkState(!byteBuf.isReadable(), "Not all data of the received buffer consumed.");
        byteBuf.release();
    }

    private void decodeFrameHeader() {
        int readInt = this.frameHeaderBuffer.readInt();
        Preconditions.checkState(readInt >= 0, "The length field of current message must be non-negative");
        Preconditions.checkState(this.frameHeaderBuffer.readInt() == -1159983106, "Network stream corrupted: received incorrect magic number.");
        this.msgId = this.frameHeaderBuffer.readByte();
        if (this.msgId != 0) {
            this.remainingMessageHeaderToCopy = readInt - 9;
        } else {
            this.remainingMessageHeaderToCopy = 29;
        }
        if (this.messageHeaderBuffer.capacity() < this.remainingMessageHeaderToCopy) {
            this.messageHeaderBuffer.capacity(this.remainingMessageHeaderToCopy);
        }
    }

    private NettyMessage decodeNettyMessage() throws Exception {
        NettyMessage readFrom;
        switch (this.msgId) {
            case 0:
                Preconditions.checkState(this.networkBufferAllocator != null, "buffer allocator is required to decode BufferResponse");
                readFrom = NettyMessage.BufferResponse.readFrom(this.messageHeaderBuffer, this.networkBufferAllocator);
                break;
            case 1:
                readFrom = NettyMessage.ErrorResponse.readFrom(this.messageHeaderBuffer);
                break;
            case 2:
                readFrom = NettyMessage.PartitionRequest.readFrom(this.messageHeaderBuffer);
                break;
            case 3:
                readFrom = NettyMessage.TaskEventRequest.readFrom(this.messageHeaderBuffer, getClass().getClassLoader());
                break;
            case 4:
                readFrom = NettyMessage.CancelPartitionRequest.readFrom(this.messageHeaderBuffer);
                break;
            case 5:
                readFrom = NettyMessage.CloseRequest.readFrom(this.messageHeaderBuffer);
                break;
            case 6:
                readFrom = NettyMessage.AddCredit.readFrom(this.messageHeaderBuffer);
                break;
            default:
                throw new ProtocolException("Received unknown message from producer: " + this.messageHeaderBuffer);
        }
        return readFrom;
    }

    private boolean readOrDiscardBufferResponse(ByteBuf byteBuf) {
        Preconditions.checkState(this.currentNettyMessage != null && (this.currentNettyMessage instanceof NettyMessage.BufferResponse));
        NettyMessage.BufferResponse bufferResponse = (NettyMessage.BufferResponse) this.currentNettyMessage;
        if (bufferResponse.dataBufferSize == 0) {
            return true;
        }
        ByteBuf buffer = bufferResponse.getBuffer();
        if (this.remainingBufferSize < 0) {
            this.remainingBufferSize = bufferResponse.dataBufferSize;
        }
        if (buffer != null) {
            this.remainingBufferSize -= copyToTargetBuffer(byteBuf, buffer, this.remainingBufferSize);
        } else {
            int min = Math.min(byteBuf.readableBytes(), this.remainingBufferSize);
            byteBuf.readerIndex(byteBuf.readerIndex() + min);
            this.remainingBufferSize -= min;
        }
        return this.remainingBufferSize == 0;
    }

    private void clearState() {
        this.frameHeaderBuffer.clear();
        this.messageHeaderBuffer.clear();
        this.remainingMessageHeaderToCopy = -1;
        this.msgId = (byte) -1;
        this.currentNettyMessage = null;
        this.remainingBufferSize = -1;
    }

    private int copyToTargetBuffer(ByteBuf byteBuf, ByteBuf byteBuf2, int i) {
        int min = Math.min(byteBuf.readableBytes(), i);
        Preconditions.checkState(byteBuf2.writableBytes() >= min, "There is not enough space to copy " + min + " bytes, writable = " + byteBuf2.writableBytes());
        byteBuf2.writeBytes(byteBuf, min);
        return min;
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        LOG.info("Channel get inactive, currentNettyMessage = {}", this.currentNettyMessage);
        super.channelInactive(channelHandlerContext);
        if (this.currentNettyMessage != null && (this.currentNettyMessage instanceof NettyMessage.BufferResponse) && ((NettyMessage.BufferResponse) this.currentNettyMessage).getBuffer() != null) {
            ((NettyMessage.BufferResponse) this.currentNettyMessage).getBuffer().release();
        }
        this.currentNettyMessage = null;
        this.frameHeaderBuffer.release();
        this.messageHeaderBuffer.release();
    }

    @VisibleForTesting
    public NettyMessage getCurrentNettyMessage() {
        return this.currentNettyMessage;
    }
}
