/*
 * Decompiled with CFR 0.152.
 */
package io.questdb.cutlass.line.tcp;

import io.questdb.cutlass.line.tcp.LineTcpMeasurementScheduler;
import io.questdb.cutlass.line.tcp.LineTcpReceiverConfiguration;
import io.questdb.cutlass.line.tcp.NewLineProtoParser;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.network.IOContext;
import io.questdb.network.IODispatcher;
import io.questdb.network.NetworkFacade;
import io.questdb.std.Mutable;
import io.questdb.std.Unsafe;
import io.questdb.std.Vect;
import io.questdb.std.datetime.millitime.MillisecondClock;
import io.questdb.std.str.DirectByteCharSequence;
import io.questdb.std.str.FloatingDirectCharSink;

class LineTcpConnectionContext
implements IOContext,
Mutable {
    private static final Log LOG = LogFactory.getLog(LineTcpConnectionContext.class);
    private static final long QUEUE_FULL_LOG_HYSTERESIS_IN_MS = 10000L;
    protected final NetworkFacade nf;
    private final LineTcpMeasurementScheduler scheduler;
    private final MillisecondClock milliClock;
    private final DirectByteCharSequence byteCharSequence = new DirectByteCharSequence();
    protected long fd;
    protected IODispatcher<LineTcpConnectionContext> dispatcher;
    protected long recvBufStart;
    protected long recvBufEnd;
    protected long recvBufPos;
    protected boolean peerDisconnected;
    private long lastQueueFullLogMillis = 0L;
    private final NewLineProtoParser protoParser = new NewLineProtoParser();
    private boolean goodMeasurement;
    protected long recvBufStartOfMeasurement;
    private final FloatingDirectCharSink charSink = new FloatingDirectCharSink();

    LineTcpConnectionContext(LineTcpReceiverConfiguration configuration, LineTcpMeasurementScheduler scheduler) {
        this.nf = configuration.getNetworkFacade();
        this.scheduler = scheduler;
        this.milliClock = configuration.getMillisecondClock();
        this.recvBufStart = Unsafe.malloc(configuration.getNetMsgBufferSize());
        this.recvBufEnd = this.recvBufStart + (long)configuration.getNetMsgBufferSize();
        this.clear();
    }

    @Override
    public void clear() {
        this.recvBufPos = this.recvBufStart;
        this.peerDisconnected = false;
        this.resetParser();
    }

    private void resetParser() {
        this.protoParser.of(this.recvBufStart);
        this.goodMeasurement = true;
        this.recvBufStartOfMeasurement = this.recvBufStart;
    }

    @Override
    public void close() {
        this.fd = -1L;
        Unsafe.free(this.recvBufStart, this.recvBufEnd - this.recvBufStart);
        this.recvBufPos = 0L;
        this.recvBufEnd = 0L;
        this.recvBufStart = 0L;
        this.protoParser.close();
        this.charSink.close();
    }

    @Override
    public long getFd() {
        return this.fd;
    }

    @Override
    public boolean invalid() {
        return this.fd == -1L;
    }

    public IODispatcher<LineTcpConnectionContext> getDispatcher() {
        return this.dispatcher;
    }

    private boolean checkQueueFullLogHysteresis() {
        long millis = this.milliClock.getTicks();
        if (millis - this.lastQueueFullLogMillis >= 10000L) {
            this.lastQueueFullLogMillis = millis;
            return true;
        }
        return false;
    }

    protected final boolean compactBuffer(long recvBufNewStart) {
        assert (recvBufNewStart <= this.recvBufPos);
        if (recvBufNewStart > this.recvBufStart) {
            int len = (int)(this.recvBufPos - recvBufNewStart);
            if (len > 0) {
                Vect.memcpy(recvBufNewStart, this.recvBufStart, len);
            }
            this.recvBufPos = this.recvBufStart + (long)len;
            return true;
        }
        return false;
    }

    private void doHandleDisconnectEvent() {
        if (this.protoParser.getBufferAddress() == this.recvBufEnd) {
            LOG.error().$('[').$(this.fd).$("] buffer overflow [msgBufferSize=").$(this.recvBufEnd - this.recvBufStart).$(']').$();
            return;
        }
        if (this.peerDisconnected) {
            if (this.recvBufPos != this.recvBufStart) {
                LOG.info().$('[').$(this.fd).$("] peer disconnected with partial measurement, ").$(this.recvBufPos - this.recvBufStart).$(" unprocessed bytes").$();
            } else {
                LOG.info().$('[').$(this.fd).$("] peer disconnected").$();
            }
        }
    }

    IOContextResult handleIO(LineTcpMeasurementScheduler.NetworkIOJob netIoJob) {
        this.read();
        return this.parseMeasurements(netIoJob);
    }

    protected final IOContextResult parseMeasurements(LineTcpMeasurementScheduler.NetworkIOJob netIoJob) {
        try {
            while (true) {
                NewLineProtoParser.ParseResult rc = this.goodMeasurement ? this.protoParser.parseMeasurement(this.recvBufPos) : this.protoParser.skipMeasurement(this.recvBufPos);
                switch (rc) {
                    case MEASUREMENT_COMPLETE: {
                        if (this.goodMeasurement) {
                            if (this.scheduler.tryButCouldNotCommit(netIoJob, this.protoParser, this.charSink)) {
                                if (this.checkQueueFullLogHysteresis()) {
                                    LOG.debug().$('[').$(this.fd).$("] queue full").$();
                                }
                                return IOContextResult.QUEUE_FULL;
                            }
                        } else {
                            int position = (int)(this.protoParser.getBufferAddress() - this.recvBufStartOfMeasurement);
                            LOG.error().$('[').$(this.fd).$("] could not parse measurement, code ").$((Object)this.protoParser.getErrorCode()).$(" at ").$(position).$(" line (may be mangled due to partial parsing) is ").$(this.byteCharSequence.of(this.recvBufStartOfMeasurement, this.protoParser.getBufferAddress())).$();
                            this.goodMeasurement = true;
                        }
                        this.protoParser.startNextMeasurement();
                        this.recvBufStartOfMeasurement = this.protoParser.getBufferAddress();
                        if (this.recvBufStartOfMeasurement != this.recvBufPos) break;
                        this.recvBufPos = this.recvBufStart;
                        this.protoParser.of(this.recvBufStart);
                        break;
                    }
                    case ERROR: {
                        this.goodMeasurement = false;
                        break;
                    }
                    case BUFFER_UNDERFLOW: {
                        if (this.recvBufPos == this.recvBufEnd) {
                            if (!this.compactBuffer(this.recvBufStartOfMeasurement)) {
                                this.doHandleDisconnectEvent();
                                return IOContextResult.NEEDS_DISCONNECT;
                            }
                            this.resetParser();
                        }
                        if (this.read()) break;
                        if (this.peerDisconnected) {
                            return IOContextResult.NEEDS_DISCONNECT;
                        }
                        return IOContextResult.NEEDS_READ;
                    }
                }
            }
        }
        catch (RuntimeException ex) {
            LOG.error().$('[').$(this.fd).$("] could not process line data").$(ex).$();
            return IOContextResult.NEEDS_DISCONNECT;
        }
    }

    LineTcpConnectionContext of(long clientFd, IODispatcher<LineTcpConnectionContext> dispatcher) {
        this.fd = clientFd;
        this.dispatcher = dispatcher;
        this.clear();
        return this;
    }

    protected final boolean read() {
        int bufferRemaining;
        int orig = bufferRemaining = (int)(this.recvBufEnd - this.recvBufPos);
        if (bufferRemaining > 0 && !this.peerDisconnected) {
            int nRead = this.nf.recv(this.fd, this.recvBufPos, bufferRemaining);
            if (nRead > 0) {
                this.recvBufPos += (long)nRead;
                bufferRemaining -= nRead;
            } else {
                this.peerDisconnected = nRead < 0;
            }
            return bufferRemaining < orig;
        }
        return !this.peerDisconnected;
    }

    static enum IOContextResult {
        NEEDS_READ,
        NEEDS_WRITE,
        QUEUE_FULL,
        NEEDS_DISCONNECT;

    }
}

