/*
 * Decompiled with CFR 0.152.
 */
package io.questdb.cutlass.line.tcp;

import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.CairoEngine;
import io.questdb.cairo.CairoException;
import io.questdb.cairo.CairoSecurityContext;
import io.questdb.cairo.EntryUnavailableException;
import io.questdb.cairo.TableReader;
import io.questdb.cairo.TableReaderMetadata;
import io.questdb.cairo.TableStructure;
import io.questdb.cairo.TableUtils;
import io.questdb.cairo.TableWriter;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cairo.vm.AppendOnlyVirtualMemory;
import io.questdb.cutlass.line.LineProtoTimestampAdapter;
import io.questdb.cutlass.line.tcp.LineTcpConnectionContext;
import io.questdb.cutlass.line.tcp.LineTcpReceiverConfiguration;
import io.questdb.cutlass.line.tcp.NewLineProtoParser;
import io.questdb.cutlass.line.tcp.SimpleReadWriteLock;
import io.questdb.cutlass.line.tcp.SymbolCache;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.Barrier;
import io.questdb.mp.FanOut;
import io.questdb.mp.Job;
import io.questdb.mp.MPSequence;
import io.questdb.mp.RingQueue;
import io.questdb.mp.SCSequence;
import io.questdb.mp.Sequence;
import io.questdb.mp.WorkerPool;
import io.questdb.network.IODispatcher;
import io.questdb.network.IORequestProcessor;
import io.questdb.std.CharSequenceObjHashMap;
import io.questdb.std.Chars;
import io.questdb.std.Misc;
import io.questdb.std.ObjIntHashMap;
import io.questdb.std.ObjList;
import io.questdb.std.Unsafe;
import io.questdb.std.Vect;
import io.questdb.std.datetime.microtime.MicrosecondClock;
import io.questdb.std.datetime.millitime.MillisecondClock;
import io.questdb.std.str.DirectCharSink;
import io.questdb.std.str.FloatingDirectCharSink;
import io.questdb.std.str.Path;
import io.questdb.tasks.TelemetryTask;
import java.io.Closeable;
import java.util.Arrays;
import java.util.concurrent.locks.ReadWriteLock;
import org.jetbrains.annotations.NotNull;

class LineTcpMeasurementScheduler
implements Closeable {
    private static final Log LOG = LogFactory.getLog(LineTcpMeasurementScheduler.class);
    private static final int REBALANCE_EVENT_ID = -1;
    private static final int INCOMPLETE_EVENT_ID = -2;
    private static final int RELEASE_WRITER_EVENT_ID = -3;
    private static final int[] DEFAULT_COLUMN_TYPES = new int[7];
    private final CairoEngine engine;
    private final CairoSecurityContext securityContext;
    private final CairoConfiguration cairoConfiguration;
    private final MillisecondClock milliClock;
    private final RingQueue<LineTcpMeasurementEvent> queue;
    private final ReadWriteLock tableUpdateDetailsLock = new SimpleReadWriteLock();
    private final CharSequenceObjHashMap<TableUpdateDetails> tableUpdateDetailsByTableName;
    private final CharSequenceObjHashMap<TableUpdateDetails> idleTableUpdateDetailsByTableName;
    private final int[] loadByThread;
    private final int nUpdatesPerLoadRebalance;
    private final double maxLoadRatio;
    private final long maintenanceInterval;
    private final long writerIdleTimeout;
    private final int defaultPartitionBy;
    private final NetworkIOJob[] netIoJobs;
    private final TableStructureAdapter tableStructureAdapter = new TableStructureAdapter();
    private final Path path = new Path();
    private final AppendOnlyVirtualMemory mem = new AppendOnlyVirtualMemory();
    private Sequence pubSeq;
    private int nLoadCheckCycles = 0;
    private int nRebalances = 0;

    LineTcpMeasurementScheduler(LineTcpReceiverConfiguration lineConfiguration, CairoEngine engine, WorkerPool ioWorkerPool, IODispatcher<LineTcpConnectionContext> dispatcher, WorkerPool writerWorkerPool) {
        this.engine = engine;
        this.securityContext = lineConfiguration.getCairoSecurityContext();
        this.cairoConfiguration = engine.getConfiguration();
        this.milliClock = this.cairoConfiguration.getMillisecondClock();
        this.netIoJobs = new NetworkIOJob[ioWorkerPool.getWorkerCount()];
        for (int i = 0; i < ioWorkerPool.getWorkerCount(); ++i) {
            NetworkIOJob netIoJob;
            this.netIoJobs[i] = netIoJob = this.createNetworkIOJob(dispatcher, i);
            ioWorkerPool.assign(i, netIoJob);
            ioWorkerPool.assign(i, netIoJob::close);
        }
        this.tableUpdateDetailsByTableName = new CharSequenceObjHashMap();
        this.idleTableUpdateDetailsByTableName = new CharSequenceObjHashMap();
        this.loadByThread = new int[writerWorkerPool.getWorkerCount()];
        int maxMeasurementSize = lineConfiguration.getMaxMeasurementSize();
        int queueSize = lineConfiguration.getWriterQueueCapacity();
        this.queue = new RingQueue<LineTcpMeasurementEvent>(() -> new LineTcpMeasurementEvent(maxMeasurementSize, lineConfiguration.getMicrosecondClock(), lineConfiguration.getTimestampAdapter()), queueSize);
        this.pubSeq = new MPSequence(queueSize);
        int nWriterThreads = writerWorkerPool.getWorkerCount();
        if (nWriterThreads > 1) {
            FanOut fanOut = new FanOut(new Barrier[0]);
            for (int n = 0; n < nWriterThreads; ++n) {
                SCSequence subSeq = new SCSequence();
                fanOut.and(subSeq);
                WriterJob writerJob = new WriterJob(n, subSeq);
                writerWorkerPool.assign(n, writerJob);
                writerWorkerPool.assign(n, () -> writerJob.close());
            }
            this.pubSeq.then(fanOut).then(this.pubSeq);
        } else {
            SCSequence subSeq = new SCSequence();
            this.pubSeq.then(subSeq).then(this.pubSeq);
            WriterJob writerJob = new WriterJob(0, subSeq);
            writerWorkerPool.assign(0, writerJob);
            writerWorkerPool.assign(0, () -> writerJob.close());
        }
        this.nUpdatesPerLoadRebalance = lineConfiguration.getNUpdatesPerLoadRebalance();
        this.maxLoadRatio = lineConfiguration.getMaxLoadRatio();
        this.maintenanceInterval = lineConfiguration.getMaintenanceInterval();
        this.defaultPartitionBy = lineConfiguration.getDefaultPartitionBy();
        this.writerIdleTimeout = lineConfiguration.getWriterIdleTimeout();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        if (null != this.pubSeq) {
            this.pubSeq = null;
            this.tableUpdateDetailsLock.writeLock().lock();
            try {
                ObjList<CharSequence> tableNames = this.tableUpdateDetailsByTableName.keys();
                int sz = tableNames.size();
                for (int n = 0; n < sz; ++n) {
                    TableUpdateDetails updateDetails = this.tableUpdateDetailsByTableName.get(tableNames.get(n));
                    if (updateDetails.assignedToJob) continue;
                    updateDetails.close();
                }
                this.tableUpdateDetailsByTableName.clear();
                this.idleTableUpdateDetailsByTableName.clear();
            }
            finally {
                this.tableUpdateDetailsLock.writeLock().unlock();
            }
            for (int n = 0; n < this.queue.getCapacity(); ++n) {
                this.queue.get(n).close();
            }
            this.path.close();
            this.mem.close();
        }
    }

    @NotNull
    private TableUpdateDetails assignTableToThread(String tableName, int keyIndex) {
        this.calcThreadLoad();
        int leastLoad = Integer.MAX_VALUE;
        int threadId = 0;
        for (int n = 0; n < this.loadByThread.length; ++n) {
            if (this.loadByThread[n] >= leastLoad) continue;
            leastLoad = this.loadByThread[n];
            threadId = n;
        }
        TableUpdateDetails tableUpdateDetails = new TableUpdateDetails(tableName, threadId, this.netIoJobs);
        this.tableUpdateDetailsByTableName.putAt(keyIndex, tableName, tableUpdateDetails);
        LOG.info().$("assigned ").$(tableName).$(" to thread ").$(threadId).$();
        return tableUpdateDetails;
    }

    private void calcThreadLoad() {
        Arrays.fill(this.loadByThread, 0);
        ObjList<CharSequence> tableNames = this.tableUpdateDetailsByTableName.keys();
        int sz = tableNames.size();
        for (int n = 0; n < sz; ++n) {
            TableUpdateDetails stats = this.tableUpdateDetailsByTableName.get(tableNames.get(n));
            int n2 = stats.writerThreadId;
            this.loadByThread[n2] = this.loadByThread[n2] + stats.nUpdates;
        }
    }

    protected NetworkIOJob createNetworkIOJob(IODispatcher<LineTcpConnectionContext> dispatcher, int workerId) {
        return new NetworkIOJobImpl(dispatcher, workerId);
    }

    int[] getLoadByThread() {
        return this.loadByThread;
    }

    int getNLoadCheckCycles() {
        return this.nLoadCheckCycles;
    }

    int getNRebalances() {
        return this.nRebalances;
    }

    long getNextPublisherEventSequence() {
        long seq;
        assert (this.isOpen());
        while ((seq = this.pubSeq.next()) == -2L) {
        }
        return seq;
    }

    private boolean isOpen() {
        return null != this.pubSeq;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void loadRebalance() {
        long seq;
        TableUpdateDetails tableToMove;
        int toThreadId;
        int fromThreadId;
        ObjList<CharSequence> tableNames;
        block9: {
            String leastLoadedTableName;
            int lowestLoadedThreadId;
            int highestLoadedThreadId;
            LOG.debug().$("load check [cycle=").$(++this.nLoadCheckCycles).$(']').$();
            this.calcThreadLoad();
            tableNames = this.tableUpdateDetailsByTableName.keys();
            fromThreadId = -1;
            toThreadId = -1;
            tableToMove = null;
            int maxLoad = Integer.MAX_VALUE;
            while (true) {
                double loadRatio;
                int highestLoad = Integer.MIN_VALUE;
                highestLoadedThreadId = -1;
                int lowestLoad = Integer.MAX_VALUE;
                lowestLoadedThreadId = -1;
                for (int n = 0; n < this.loadByThread.length; ++n) {
                    if (this.loadByThread[n] >= maxLoad) continue;
                    if (highestLoad < this.loadByThread[n]) {
                        highestLoad = this.loadByThread[n];
                        highestLoadedThreadId = n;
                    }
                    if (lowestLoad <= this.loadByThread[n]) continue;
                    lowestLoad = this.loadByThread[n];
                    lowestLoadedThreadId = n;
                }
                if (highestLoadedThreadId == -1 || lowestLoadedThreadId == -1 || highestLoadedThreadId == lowestLoadedThreadId || (loadRatio = (double)highestLoad / (double)lowestLoad) < this.maxLoadRatio) break block9;
                int nTables = 0;
                lowestLoad = Integer.MAX_VALUE;
                leastLoadedTableName = null;
                int sz = tableNames.size();
                for (int n = 0; n < sz; ++n) {
                    TableUpdateDetails stats = this.tableUpdateDetailsByTableName.get(tableNames.get(n));
                    if (stats.writerThreadId != highestLoadedThreadId || stats.nUpdates <= 0) continue;
                    ++nTables;
                    if (stats.nUpdates >= lowestLoad) continue;
                    lowestLoad = stats.nUpdates;
                    leastLoadedTableName = stats.tableName;
                }
                if (nTables >= 2) break;
                maxLoad = highestLoad;
            }
            fromThreadId = highestLoadedThreadId;
            toThreadId = lowestLoadedThreadId;
            tableToMove = this.tableUpdateDetailsByTableName.get(leastLoadedTableName);
        }
        int sz = tableNames.size();
        for (int n = 0; n < sz; ++n) {
            TableUpdateDetails stats = this.tableUpdateDetailsByTableName.get(tableNames.get(n));
            stats.nUpdates = 0;
        }
        if (null != tableToMove && (seq = this.getNextPublisherEventSequence()) >= 0L) {
            try {
                LineTcpMeasurementEvent event = this.queue.get(seq);
                event.threadId = -2;
                event.createRebalanceEvent(fromThreadId, toThreadId, tableToMove);
                tableToMove.writerThreadId = toThreadId;
                LOG.info().$("rebalance cycle, requesting table move [cycle=").$(this.nLoadCheckCycles).$(", nRebalances=").$(++this.nRebalances).$(", table=").$(tableToMove.tableName).$(", fromThreadId=").$(fromThreadId).$(", toThreadId=").$(toThreadId).$(']').$();
            }
            finally {
                this.pubSeq.done(seq);
            }
        }
    }

    private TableUpdateDetails startNewMeasurementEvent(NetworkIOJob netIoJob, NewLineProtoParser protoParser) {
        TableUpdateDetails tableUpdateDetails = netIoJob.getTableUpdateDetails(protoParser.getMeasurementName());
        if (null != tableUpdateDetails) {
            return tableUpdateDetails;
        }
        return this.startNewMeasurementEvent0(netIoJob, protoParser);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TableUpdateDetails startNewMeasurementEvent0(NetworkIOJob netIoJob, NewLineProtoParser protoParser) {
        this.tableUpdateDetailsLock.writeLock().lock();
        try {
            TableUpdateDetails tableUpdateDetails;
            int keyIndex = this.tableUpdateDetailsByTableName.keyIndex(protoParser.getMeasurementName());
            if (keyIndex < 0) {
                tableUpdateDetails = this.tableUpdateDetailsByTableName.valueAt(keyIndex);
            } else {
                String tableName = protoParser.getMeasurementName().toString();
                int status = this.engine.getStatus(this.securityContext, this.path, tableName, 0, tableName.length());
                if (status != 0) {
                    LOG.info().$("creating table [tableName=").$(tableName).$(']').$();
                    this.engine.createTable(this.securityContext, this.mem, this.path, this.tableStructureAdapter.of(tableName, protoParser));
                }
                if ((keyIndex = this.idleTableUpdateDetailsByTableName.keyIndex(tableName)) < 0) {
                    LOG.info().$("idle table going active [tableName=").$(tableName).$(']').$();
                    tableUpdateDetails = this.idleTableUpdateDetailsByTableName.valueAt(keyIndex);
                    this.idleTableUpdateDetailsByTableName.removeAt(keyIndex);
                    this.tableUpdateDetailsByTableName.put(tableUpdateDetails.tableName, tableUpdateDetails);
                } else {
                    TelemetryTask.doStoreTelemetry(this.engine, (short)102, (short)5);
                    tableUpdateDetails = this.assignTableToThread(tableName, keyIndex);
                }
            }
            netIoJob.addTableUpdateDetails(tableUpdateDetails);
            TableUpdateDetails tableUpdateDetails2 = tableUpdateDetails;
            return tableUpdateDetails2;
        }
        finally {
            this.tableUpdateDetailsLock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean tryButCouldNotCommit(NetworkIOJob netIoJob, NewLineProtoParser protoParser, FloatingDirectCharSink charSink) {
        long seq;
        TableUpdateDetails tableUpdateDetails;
        try {
            tableUpdateDetails = this.startNewMeasurementEvent(netIoJob, protoParser);
        }
        catch (EntryUnavailableException ex) {
            LOG.info().$("could not get table writer [tableName=").$(protoParser.getMeasurementName()).$(", ex=").$(ex.getFlyweightMessage()).$(']').$();
            return true;
        }
        catch (CairoException ex) {
            LOG.info().$("could not create table [tableName=").$(protoParser.getMeasurementName()).$(", ex=").$(ex.getFlyweightMessage()).$(']').$();
            return false;
        }
        if (null != tableUpdateDetails && (seq = this.getNextPublisherEventSequence()) >= 0L) {
            try {
                LineTcpMeasurementEvent event = this.queue.get(seq);
                event.threadId = -2;
                TableUpdateDetails.ThreadLocalDetails localDetails = tableUpdateDetails.startNewMeasurementEvent(netIoJob.getWorkerId());
                event.createMeasurementEvent(tableUpdateDetails, localDetails, protoParser, charSink);
                boolean bl = false;
                return bl;
            }
            finally {
                this.pubSeq.done(seq);
                if (++tableUpdateDetails.nUpdates > this.nUpdatesPerLoadRebalance && this.tableUpdateDetailsLock.writeLock().tryLock()) {
                    try {
                        this.loadRebalance();
                    }
                    finally {
                        this.tableUpdateDetailsLock.writeLock().unlock();
                    }
                }
            }
        }
        return true;
    }

    static {
        LineTcpMeasurementScheduler.DEFAULT_COLUMN_TYPES[0] = 11;
        LineTcpMeasurementScheduler.DEFAULT_COLUMN_TYPES[1] = 9;
        LineTcpMeasurementScheduler.DEFAULT_COLUMN_TYPES[2] = 5;
        LineTcpMeasurementScheduler.DEFAULT_COLUMN_TYPES[3] = 10;
        LineTcpMeasurementScheduler.DEFAULT_COLUMN_TYPES[4] = 0;
        LineTcpMeasurementScheduler.DEFAULT_COLUMN_TYPES[5] = 12;
    }

    private class TableStructureAdapter
    implements TableStructure {
        private CharSequence tableName;
        private NewLineProtoParser protoParser;

        private TableStructureAdapter() {
        }

        @Override
        public int getColumnCount() {
            return this.protoParser.getnEntities() + 1;
        }

        @Override
        public CharSequence getColumnName(int columnIndex) {
            assert (columnIndex <= this.getColumnCount());
            if (columnIndex == this.getTimestampIndex()) {
                return "timestamp";
            }
            String colName = this.protoParser.getEntity(columnIndex).getName().toString();
            if (TableUtils.isValidColumnName(colName)) {
                return colName;
            }
            throw CairoException.instance(0).put("column name contains invalid characters [colName=").put(colName).put(']');
        }

        @Override
        public int getColumnType(int columnIndex) {
            if (columnIndex == this.getTimestampIndex()) {
                return 7;
            }
            return DEFAULT_COLUMN_TYPES[this.protoParser.getEntity(columnIndex).getType()];
        }

        @Override
        public int getIndexBlockCapacity(int columnIndex) {
            return 0;
        }

        @Override
        public boolean isIndexed(int columnIndex) {
            return false;
        }

        @Override
        public boolean isSequential(int columnIndex) {
            return false;
        }

        @Override
        public int getPartitionBy() {
            return LineTcpMeasurementScheduler.this.defaultPartitionBy;
        }

        @Override
        public boolean getSymbolCacheFlag(int columnIndex) {
            return LineTcpMeasurementScheduler.this.cairoConfiguration.getDefaultSymbolCacheFlag();
        }

        @Override
        public int getSymbolCapacity(int columnIndex) {
            return LineTcpMeasurementScheduler.this.cairoConfiguration.getDefaultSymbolCapacity();
        }

        @Override
        public CharSequence getTableName() {
            return this.tableName;
        }

        @Override
        public int getTimestampIndex() {
            return this.protoParser.getnEntities();
        }

        @Override
        public int getO3MaxUncommittedRows() {
            return LineTcpMeasurementScheduler.this.cairoConfiguration.getO3MaxUncommittedRows();
        }

        @Override
        public long getO3CommitHysteresisInMicros() {
            return LineTcpMeasurementScheduler.this.cairoConfiguration.getO3CommitHysteresis();
        }

        TableStructureAdapter of(CharSequence tableName, NewLineProtoParser protoParser) {
            this.tableName = tableName;
            this.protoParser = protoParser;
            return this;
        }
    }

    class NetworkIOJobImpl
    implements NetworkIOJob,
    Job {
        private final IODispatcher<LineTcpConnectionContext> dispatcher;
        private final int workerId;
        private final CharSequenceObjHashMap<TableUpdateDetails> localTableUpdateDetailsByTableName = new CharSequenceObjHashMap();
        private final ObjList<SymbolCache> unusedSymbolCaches = new ObjList();
        private LineTcpConnectionContext busyContext = null;
        private final IORequestProcessor<LineTcpConnectionContext> onRequest = this::onRequest;
        private long lastMaintenanceJobMillis = 0L;

        NetworkIOJobImpl(IODispatcher<LineTcpConnectionContext> dispatcher, int workerId) {
            this.dispatcher = dispatcher;
            this.workerId = workerId;
        }

        @Override
        public void addTableUpdateDetails(TableUpdateDetails tableUpdateDetails) {
            this.localTableUpdateDetailsByTableName.put(tableUpdateDetails.tableName, tableUpdateDetails);
            tableUpdateDetails.nNetworkIoWorkers++;
            LOG.info().$("network IO thread using table [workerId=").$(this.workerId).$(", tableName=").$(tableUpdateDetails.tableName).$(", nNetworkIoWorkers=").$(tableUpdateDetails.nNetworkIoWorkers).$(']').$();
        }

        @Override
        public void close() {
            Misc.freeObjList(this.unusedSymbolCaches);
        }

        @Override
        public TableUpdateDetails getTableUpdateDetails(CharSequence tableName) {
            return this.localTableUpdateDetailsByTableName.get(tableName);
        }

        @Override
        public ObjList<SymbolCache> getUnusedSymbolCaches() {
            return this.unusedSymbolCaches;
        }

        @Override
        public int getWorkerId() {
            return this.workerId;
        }

        @Override
        public boolean run(int workerId) {
            long millis;
            assert (this.workerId == workerId);
            boolean busy = false;
            if (this.busyContext != null) {
                if (this.handleIO(this.busyContext)) {
                    return true;
                }
                LOG.debug().$("context is no longer waiting on a full queue [fd=").$(this.busyContext.getFd()).$(']').$();
                this.busyContext = null;
                busy = true;
            }
            if (this.dispatcher.processIOQueue(this.onRequest)) {
                busy = true;
            }
            if ((millis = LineTcpMeasurementScheduler.this.milliClock.getTicks()) - this.lastMaintenanceJobMillis > LineTcpMeasurementScheduler.this.maintenanceInterval && !(busy = this.doMaintenance(millis))) {
                this.lastMaintenanceJobMillis = millis;
            }
            return busy;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean doMaintenance(long millis) {
            int sz = this.localTableUpdateDetailsByTableName.size();
            for (int n = 0; n < sz; ++n) {
                TableUpdateDetails tableUpdateDetails = this.localTableUpdateDetailsByTableName.get(this.localTableUpdateDetailsByTableName.keys().get(n));
                if (millis - tableUpdateDetails.lastMeasurementMillis < LineTcpMeasurementScheduler.this.writerIdleTimeout) continue;
                LineTcpMeasurementScheduler.this.tableUpdateDetailsLock.writeLock().lock();
                try {
                    if (tableUpdateDetails.nNetworkIoWorkers == 1) {
                        long seq = LineTcpMeasurementScheduler.this.getNextPublisherEventSequence();
                        if (seq > -1L) {
                            LineTcpMeasurementEvent event = (LineTcpMeasurementEvent)LineTcpMeasurementScheduler.this.queue.get(seq);
                            event.createReleaseWriterEvent(tableUpdateDetails);
                            this.removeTableUpdateDetails(tableUpdateDetails);
                            LineTcpMeasurementScheduler.this.tableUpdateDetailsByTableName.remove(tableUpdateDetails.tableName);
                            LineTcpMeasurementScheduler.this.idleTableUpdateDetailsByTableName.put(tableUpdateDetails.tableName, tableUpdateDetails);
                            LineTcpMeasurementScheduler.this.pubSeq.done(seq);
                        }
                        boolean bl = true;
                        return bl;
                    }
                    this.removeTableUpdateDetails(tableUpdateDetails);
                    boolean bl = sz > 1;
                    return bl;
                }
                finally {
                    LineTcpMeasurementScheduler.this.tableUpdateDetailsLock.writeLock().unlock();
                }
            }
            return false;
        }

        private boolean handleIO(LineTcpConnectionContext context) {
            if (!context.invalid()) {
                switch (context.handleIO(this)) {
                    case NEEDS_READ: {
                        context.getDispatcher().registerChannel(context, 1);
                        return false;
                    }
                    case NEEDS_WRITE: {
                        context.getDispatcher().registerChannel(context, 4);
                        return false;
                    }
                    case QUEUE_FULL: {
                        return true;
                    }
                    case NEEDS_DISCONNECT: {
                        context.getDispatcher().disconnect(context);
                        return false;
                    }
                }
            }
            return false;
        }

        private void onRequest(int operation, LineTcpConnectionContext context) {
            if (this.handleIO(context)) {
                this.busyContext = context;
                LOG.debug().$("context is waiting on a full queue [fd=").$(context.getFd()).$(']').$();
            }
        }

        private void removeTableUpdateDetails(TableUpdateDetails tableUpdateDetails) {
            this.localTableUpdateDetailsByTableName.remove(tableUpdateDetails.tableName);
            tableUpdateDetails.nNetworkIoWorkers--;
            tableUpdateDetails.localDetailsArray[this.workerId].clear();
            LOG.info().$("network IO thread released table [workerId=").$(this.workerId).$(", tableName=").$(tableUpdateDetails.tableName).$(", nNetworkIoWorkers=").$(tableUpdateDetails.nNetworkIoWorkers).I$();
        }
    }

    private class WriterJob
    implements Job {
        private final int workerId;
        private final Sequence sequence;
        private final AppendOnlyVirtualMemory appendMemory = new AppendOnlyVirtualMemory();
        private final Path path = new Path();
        private final DirectCharSink charSink = new DirectCharSink(64L);
        private final FloatingDirectCharSink floatingCharSink = new FloatingDirectCharSink();
        private final ObjList<TableUpdateDetails> assignedTables = new ObjList();
        private long lastMaintenanceMillis = 0L;

        private WriterJob(int id, Sequence sequence) {
            this.workerId = id;
            this.sequence = sequence;
        }

        @Override
        public boolean run(int workerId) {
            assert (this.workerId == workerId);
            boolean busy = this.drainQueue();
            this.doMaintenance();
            return busy;
        }

        private void close() {
            LOG.info().$("line protocol writer closing [threadId=").$(this.workerId).$(']').$();
            for (int n = 0; n < LineTcpMeasurementScheduler.this.queue.getCapacity() && this.run(this.workerId); ++n) {
            }
            Misc.free(this.appendMemory);
            Misc.free(this.path);
            Misc.free(this.charSink);
            Misc.free(this.floatingCharSink);
            Misc.freeObjList(this.assignedTables);
            this.assignedTables.clear();
        }

        private void doMaintenance() {
            long millis = LineTcpMeasurementScheduler.this.milliClock.getTicks();
            if (millis - this.lastMaintenanceMillis < LineTcpMeasurementScheduler.this.maintenanceInterval) {
                return;
            }
            this.lastMaintenanceMillis = millis;
            int sz = this.assignedTables.size();
            for (int n = 0; n < sz; ++n) {
                this.assignedTables.getQuick(n).handleWriterThreadMaintenance(millis);
            }
        }

        private boolean drainQueue() {
            boolean busy = false;
            while (true) {
                boolean eventProcessed;
                long cursor;
                if ((cursor = this.sequence.next()) < 0L) {
                    if (cursor != -1L) continue;
                    return busy;
                }
                busy = true;
                LineTcpMeasurementEvent event = (LineTcpMeasurementEvent)LineTcpMeasurementScheduler.this.queue.get(cursor);
                if (event.threadId == this.workerId) {
                    if (!event.tableUpdateDetails.assignedToJob) {
                        this.assignedTables.add(event.tableUpdateDetails);
                        event.tableUpdateDetails.assignedToJob = true;
                        LOG.info().$("assigned table to writer thread [tableName=").$(((LineTcpMeasurementEvent)event).tableUpdateDetails.tableName).$(", threadId=").$(this.workerId).I$();
                    }
                    event.processMeasurementEvent(this);
                    eventProcessed = true;
                } else {
                    switch (event.threadId) {
                        case -1: {
                            eventProcessed = this.processRebalance(event);
                            break;
                        }
                        case -3: {
                            eventProcessed = this.processReleaseWriter(event);
                            break;
                        }
                        default: {
                            eventProcessed = true;
                        }
                    }
                }
                if (!eventProcessed) break;
                this.sequence.done(cursor);
            }
            return false;
        }

        private boolean processRebalance(LineTcpMeasurementEvent event) {
            if (event.rebalanceToThreadId == this.workerId) {
                if (event.rebalanceReleasedByFromThread) {
                    LOG.info().$("rebalance cycle, new thread ready [threadId=").$(this.workerId).$(", table=").$(((LineTcpMeasurementEvent)event).tableUpdateDetails.tableName).$(']').$();
                    return true;
                }
                return false;
            }
            if (event.rebalanceFromThreadId == this.workerId) {
                int sz = this.assignedTables.size();
                for (int n = 0; n < sz; ++n) {
                    if (this.assignedTables.get(n) != event.tableUpdateDetails) continue;
                    this.assignedTables.remove(n);
                    break;
                }
                LOG.info().$("rebalance cycle, old thread finished [threadId=").$(this.workerId).$(", table=").$(((LineTcpMeasurementEvent)event).tableUpdateDetails.tableName).I$();
                event.tableUpdateDetails.switchThreads();
                event.rebalanceReleasedByFromThread = true;
            }
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean processReleaseWriter(LineTcpMeasurementEvent event) {
            LineTcpMeasurementScheduler.this.tableUpdateDetailsLock.readLock().lock();
            try {
                if (event.tableUpdateDetails.writerThreadId != this.workerId) {
                    boolean bl = true;
                    return bl;
                }
                TableUpdateDetails tableUpdateDetails = event.tableUpdateDetails;
                if (LineTcpMeasurementScheduler.this.tableUpdateDetailsByTableName.keyIndex(tableUpdateDetails.tableName) < 0) {
                    boolean bl = true;
                    return bl;
                }
                LOG.info().$("releasing writer, its been idle since ").$ts(tableUpdateDetails.lastMeasurementMillis * 1000L).$("[tableName=").$(tableUpdateDetails.tableName).I$();
                tableUpdateDetails.handleWriterRelease();
            }
            finally {
                LineTcpMeasurementScheduler.this.tableUpdateDetailsLock.readLock().unlock();
            }
            return true;
        }
    }

    class TableUpdateDetails
    implements Closeable {
        final String tableName;
        private final ThreadLocalDetails[] localDetailsArray;
        private int writerThreadId;
        private int nUpdates = 0;
        private TableWriter writer;
        private boolean assignedToJob = false;
        private long lastMeasurementMillis = Long.MAX_VALUE;
        private long lastCommitMillis;
        private int nNetworkIoWorkers = 0;

        private TableUpdateDetails(String tableName, int writerThreadId, NetworkIOJob[] netIoJobs) {
            this.tableName = tableName;
            this.writerThreadId = writerThreadId;
            int n = netIoJobs.length;
            this.localDetailsArray = new ThreadLocalDetails[n];
            for (int i = 0; i < n; ++i) {
                this.localDetailsArray[i] = new ThreadLocalDetails(netIoJobs[i].getUnusedSymbolCaches());
            }
            this.lastCommitMillis = LineTcpMeasurementScheduler.this.milliClock.getTicks();
        }

        @Override
        public void close() {
            if (this.writerThreadId != Integer.MIN_VALUE) {
                LOG.info().$("closing table [tableName=").$(this.tableName).$(']').$();
                if (null != this.writer) {
                    this.writer.commit();
                    this.writer.close();
                    this.writer = null;
                }
                for (int n = 0; n < this.localDetailsArray.length; ++n) {
                    this.localDetailsArray[n].close();
                    this.localDetailsArray[n] = null;
                }
                this.writerThreadId = Integer.MIN_VALUE;
            }
        }

        int getSymbolIndex(ThreadLocalDetails localDetails, int colIndex, CharSequence symValue) {
            if (colIndex >= 0) {
                return localDetails.getSymbolIndex(colIndex, symValue);
            }
            return -2;
        }

        TableWriter getWriter() {
            if (null != this.writer) {
                return this.writer;
            }
            this.writer = LineTcpMeasurementScheduler.this.engine.getWriter(LineTcpMeasurementScheduler.this.securityContext, this.tableName);
            return this.writer;
        }

        void handleRowAppended() {
            if (this.writer.checkMaxAndCommitHysteresis()) {
                this.lastCommitMillis = LineTcpMeasurementScheduler.this.milliClock.getTicks();
            }
        }

        void handleWriterRelease() {
            if (null != this.writer) {
                LOG.debug().$("release commit [table=").$(this.writer.getTableName()).I$();
                this.writer.commit();
                this.writer = Misc.free(this.writer);
                this.lastCommitMillis = LineTcpMeasurementScheduler.this.milliClock.getTicks();
            }
        }

        void handleWriterThreadMaintenance(long ticks) {
            if (ticks - this.lastCommitMillis < LineTcpMeasurementScheduler.this.maintenanceInterval) {
                return;
            }
            if (null != this.writer) {
                LOG.debug().$("maintenance commit [table=").$(this.writer.getTableName()).I$();
                this.writer.commit();
                this.lastCommitMillis = LineTcpMeasurementScheduler.this.milliClock.getTicks();
            }
        }

        ThreadLocalDetails startNewMeasurementEvent(int workerId) {
            ThreadLocalDetails localDetails = this.localDetailsArray[workerId];
            this.lastMeasurementMillis = LineTcpMeasurementScheduler.this.milliClock.getTicks();
            return localDetails;
        }

        void switchThreads() {
            this.assignedToJob = false;
            this.handleWriterRelease();
        }

        private class ThreadLocalDetails
        implements Closeable {
            private final Path path = new Path();
            private final ObjIntHashMap<CharSequence> columnIndexByName = new ObjIntHashMap();
            private final ObjList<SymbolCache> symbolCacheByColumnIndex = new ObjList();
            private final ObjList<SymbolCache> unusedSymbolCaches;

            ThreadLocalDetails(ObjList<SymbolCache> unusedSymbolCaches) {
                this.unusedSymbolCaches = unusedSymbolCaches;
            }

            @Override
            public void close() {
                Misc.freeObjList(this.symbolCacheByColumnIndex);
                Misc.free(this.path);
            }

            private SymbolCache addSymbolCache(int colIndex) {
                try (TableReader reader = LineTcpMeasurementScheduler.this.engine.getReader(AllowAllCairoSecurityContext.INSTANCE, TableUpdateDetails.this.tableName);){
                    SymbolCache symCache;
                    this.path.of(LineTcpMeasurementScheduler.this.cairoConfiguration.getRoot()).concat(TableUpdateDetails.this.tableName);
                    int lastUnusedSymbolCacheIndex = this.unusedSymbolCaches.size() - 1;
                    if (lastUnusedSymbolCacheIndex > -1) {
                        symCache = this.unusedSymbolCaches.get(lastUnusedSymbolCacheIndex);
                        this.unusedSymbolCaches.remove(lastUnusedSymbolCacheIndex);
                    } else {
                        symCache = new SymbolCache();
                    }
                    int symIndex = this.resolveSymbolIndex(reader.getMetadata(), colIndex);
                    symCache.of(LineTcpMeasurementScheduler.this.cairoConfiguration, this.path, reader.getMetadata().getColumnName(colIndex), symIndex);
                    this.symbolCacheByColumnIndex.extendAndSet(colIndex, symCache);
                    SymbolCache symbolCache = symCache;
                    return symbolCache;
                }
            }

            void clear() {
                this.columnIndexByName.clear();
                int sz = this.symbolCacheByColumnIndex.size();
                for (int n = 0; n < sz; ++n) {
                    SymbolCache symCache = this.symbolCacheByColumnIndex.getQuick(n);
                    if (null == symCache) continue;
                    symCache.close();
                    this.unusedSymbolCaches.add(symCache);
                }
                this.symbolCacheByColumnIndex.clear();
            }

            int getColumnIndex(CharSequence colName) {
                int colIndex = this.columnIndexByName.get(colName);
                if (colIndex != -1) {
                    return colIndex;
                }
                return this.getColumnIndex0(colName);
            }

            private int getColumnIndex0(CharSequence colName) {
                try (TableReader reader = LineTcpMeasurementScheduler.this.engine.getReader(AllowAllCairoSecurityContext.INSTANCE, TableUpdateDetails.this.tableName);){
                    TableReaderMetadata metadata = reader.getMetadata();
                    int colIndex = metadata.getColumnIndexQuiet(colName);
                    if (colIndex < 0) {
                        int n = -1;
                        return n;
                    }
                    this.columnIndexByName.clear();
                    int sz = metadata.getColumnCount();
                    for (int n = 0; n < sz; ++n) {
                        this.columnIndexByName.put(metadata.getColumnName(n), n);
                    }
                    int n = colIndex;
                    return n;
                }
            }

            int getSymbolIndex(int colIndex, CharSequence symValue) {
                SymbolCache symCache = this.symbolCacheByColumnIndex.getQuiet(colIndex);
                if (null == symCache) {
                    symCache = this.addSymbolCache(colIndex);
                }
                return symCache.getSymIndex(symValue);
            }

            private int resolveSymbolIndex(TableReaderMetadata metadata, int colIndex) {
                int symIndex = 0;
                for (int n = 0; n < colIndex; ++n) {
                    if (metadata.getColumnType(n) != 11) continue;
                    ++symIndex;
                }
                return symIndex;
            }
        }
    }

    private class LineTcpMeasurementEvent
    implements Closeable {
        private final MicrosecondClock clock;
        private final LineProtoTimestampAdapter timestampAdapter;
        private final long bufSize;
        private int threadId;
        private TableUpdateDetails tableUpdateDetails;
        private long bufLo;
        private int rebalanceFromThreadId;
        private int rebalanceToThreadId;
        private volatile boolean rebalanceReleasedByFromThread;

        private LineTcpMeasurementEvent(int maxMeasurementSize, MicrosecondClock clock, LineProtoTimestampAdapter timestampAdapter) {
            this.bufSize = (long)(maxMeasurementSize / 4) * 13L;
            this.bufLo = Unsafe.malloc(this.bufSize);
            this.clock = clock;
            this.timestampAdapter = timestampAdapter;
        }

        @Override
        public void close() {
            Unsafe.free(this.bufLo, this.bufSize);
            this.tableUpdateDetails = Misc.free(this.tableUpdateDetails);
            this.bufLo = 0L;
        }

        void createMeasurementEvent(TableUpdateDetails tableUpdateDetails, TableUpdateDetails.ThreadLocalDetails localDetails, NewLineProtoParser protoParser, FloatingDirectCharSink floatingCharSink) {
            this.threadId = -2;
            this.tableUpdateDetails = tableUpdateDetails;
            long timestamp = protoParser.getTimestamp();
            if (timestamp != Long.MIN_VALUE) {
                timestamp = this.timestampAdapter.getMicros(timestamp);
            }
            long bufPos = this.bufLo;
            Unsafe.getUnsafe().putLong(bufPos, timestamp);
            int nEntities = protoParser.getnEntities();
            Unsafe.getUnsafe().putInt(bufPos += 8L, nEntities);
            bufPos += 4L;
            block7: for (int nEntity = 0; nEntity < nEntities; ++nEntity) {
                assert (bufPos < this.bufLo + this.bufSize + 6L);
                NewLineProtoParser.ProtoEntity entity = protoParser.getEntity(nEntity);
                int colIndex = localDetails.getColumnIndex(entity.getName());
                if (colIndex < 0) {
                    int colNameLen = entity.getName().length();
                    Unsafe.getUnsafe().putInt(bufPos, -1 * colNameLen);
                    Vect.memcpy(entity.getName().getLo(), bufPos += 4L, colNameLen);
                    bufPos += (long)colNameLen;
                } else {
                    Unsafe.getUnsafe().putInt(bufPos, colIndex);
                    bufPos += 4L;
                }
                byte entityType = entity.getType();
                switch (entityType) {
                    case 0: {
                        long tmpBufPos = bufPos;
                        int l = entity.getValue().length();
                        long hi = (bufPos += 5L) + 2L * (long)l;
                        floatingCharSink.of(bufPos, hi);
                        if (!Chars.utf8Decode(entity.getValue().getLo(), entity.getValue().getHi(), floatingCharSink)) {
                            throw CairoException.instance(0).put("invalid UTF8 in value for ").put(entity.getName());
                        }
                        int symIndex = tableUpdateDetails.getSymbolIndex(localDetails, colIndex, floatingCharSink);
                        if (symIndex != -2) {
                            bufPos = tmpBufPos;
                            Unsafe.getUnsafe().putByte(bufPos, (byte)6);
                            Unsafe.getUnsafe().putInt(++bufPos, symIndex);
                            bufPos += 4L;
                            continue block7;
                        }
                        Unsafe.getUnsafe().putByte(tmpBufPos, entity.getType());
                        Unsafe.getUnsafe().putInt(++tmpBufPos, l);
                        bufPos = hi;
                        continue block7;
                    }
                    case 2: {
                        Unsafe.getUnsafe().putByte(bufPos, entity.getType());
                        Unsafe.getUnsafe().putLong(++bufPos, entity.getIntegerValue());
                        bufPos += 8L;
                        continue block7;
                    }
                    case 1: {
                        Unsafe.getUnsafe().putByte(bufPos, entity.getType());
                        Unsafe.getUnsafe().putDouble(++bufPos, entity.getFloatValue());
                        bufPos += 8L;
                        continue block7;
                    }
                    case 3: 
                    case 5: {
                        Unsafe.getUnsafe().putByte(bufPos, entity.getType());
                        int l = entity.getValue().length();
                        Unsafe.getUnsafe().putInt(++bufPos, l);
                        long hi = (bufPos += 4L) + 2L * (long)l;
                        floatingCharSink.of(bufPos, hi);
                        if (!Chars.utf8Decode(entity.getValue().getLo(), entity.getValue().getHi(), floatingCharSink)) {
                            throw CairoException.instance(0).put("invalid UTF8 in value for ").put(entity.getName());
                        }
                        bufPos = hi;
                        continue block7;
                    }
                    case 4: {
                        Unsafe.getUnsafe().putByte(bufPos, entity.getType());
                        Unsafe.getUnsafe().putByte(++bufPos, (byte)(entity.getBooleanValue() ? 1 : 0));
                        ++bufPos;
                    }
                }
            }
            this.threadId = tableUpdateDetails.writerThreadId;
        }

        void createRebalanceEvent(int fromThreadId, int toThreadId, TableUpdateDetails tableUpdateDetails) {
            this.threadId = -1;
            this.rebalanceFromThreadId = fromThreadId;
            this.rebalanceToThreadId = toThreadId;
            this.tableUpdateDetails = tableUpdateDetails;
            this.rebalanceReleasedByFromThread = false;
        }

        void createReleaseWriterEvent(TableUpdateDetails tableUpdateDetails) {
            this.threadId = -3;
            this.tableUpdateDetails = tableUpdateDetails;
        }

        void processMeasurementEvent(WriterJob job) {
            block33: {
                TableWriter.Row row = null;
                try {
                    TableWriter writer = this.tableUpdateDetails.getWriter();
                    long bufPos = this.bufLo;
                    long timestamp = Unsafe.getUnsafe().getLong(bufPos);
                    bufPos += 8L;
                    if (timestamp == Long.MIN_VALUE) {
                        timestamp = this.clock.getTicks();
                    }
                    row = writer.newRow(timestamp);
                    int nEntities = Unsafe.getUnsafe().getInt(bufPos);
                    long firstEntityBufPos = bufPos += 4L;
                    block23: for (int nEntity = 0; nEntity < nEntities; ++nEntity) {
                        byte entityType;
                        int colIndex = Unsafe.getUnsafe().getInt(bufPos);
                        bufPos += 4L;
                        if (colIndex >= 0) {
                            entityType = Unsafe.getUnsafe().getByte(bufPos);
                            ++bufPos;
                        } else {
                            int colNameLen = -1 * colIndex;
                            long nameLo = bufPos;
                            long nameHi = bufPos + (long)colNameLen;
                            job.charSink.clear();
                            if (!Chars.utf8Decode(nameLo, nameHi, job.charSink)) {
                                throw CairoException.instance(0).put("invalid UTF8 in column name ").put(job.floatingCharSink.asCharSequence(nameLo, nameHi));
                            }
                            bufPos = nameHi;
                            entityType = Unsafe.getUnsafe().getByte(bufPos);
                            ++bufPos;
                            colIndex = writer.getMetadata().getColumnIndexQuiet(job.charSink);
                            if (colIndex < 0) {
                                row.cancel();
                                row = null;
                                int colType = DEFAULT_COLUMN_TYPES[entityType];
                                if (!TableUtils.isValidInfluxColumnName(job.charSink)) {
                                    throw CairoException.instance(0).put("invalid column name [table=").put(writer.getTableName()).put(", columnName=").put(job.charSink).put(']');
                                }
                                writer.addColumn(job.charSink, colType);
                                bufPos = firstEntityBufPos;
                                nEntity = -1;
                                row = writer.newRow(timestamp);
                                continue;
                            }
                        }
                        switch (entityType) {
                            case 0: {
                                int len = Unsafe.getUnsafe().getInt(bufPos);
                                long hi = (bufPos += 4L) + 2L * (long)len;
                                job.floatingCharSink.asCharSequence(bufPos, hi);
                                int symIndex = writer.getSymbolIndex(colIndex, job.floatingCharSink);
                                row.putSymIndex(colIndex, symIndex);
                                bufPos = hi;
                                continue block23;
                            }
                            case 6: {
                                int symIndex = Unsafe.getUnsafe().getInt(bufPos);
                                bufPos += 4L;
                                row.putSymIndex(colIndex, symIndex);
                                continue block23;
                            }
                            case 2: {
                                int colType = writer.getMetadata().getColumnType(colIndex);
                                long v = Unsafe.getUnsafe().getLong(bufPos);
                                bufPos += 8L;
                                switch (colType) {
                                    case 5: {
                                        row.putLong(colIndex, v);
                                        continue block23;
                                    }
                                    case 4: {
                                        if (v < Integer.MIN_VALUE || v > Integer.MAX_VALUE) {
                                            throw CairoException.instance(0).put("line protocol integer is out of int bounds [columnIndex=").put(colIndex).put(", v=").put(v).put(']');
                                        }
                                        row.putInt(colIndex, (int)v);
                                        continue block23;
                                    }
                                    case 2: {
                                        if (v < -32768L || v > 32767L) {
                                            throw CairoException.instance(0).put("line protocol integer is out of short bounds [columnIndex=").put(colIndex).put(", v=").put(v).put(']');
                                        }
                                        row.putShort(colIndex, (short)v);
                                        continue block23;
                                    }
                                    case 1: {
                                        if (v < -128L || v > 127L) {
                                            throw CairoException.instance(0).put("line protocol integer is out of byte bounds [columnIndex=").put(colIndex).put(", v=").put(v).put(']');
                                        }
                                        row.putByte(colIndex, (byte)v);
                                        continue block23;
                                    }
                                    case 7: {
                                        row.putTimestamp(colIndex, v);
                                        continue block23;
                                    }
                                    case 6: {
                                        row.putDate(colIndex, v);
                                        continue block23;
                                    }
                                }
                                throw CairoException.instance(0).put("expected a line protocol integer [entityType=").put(entityType).put(']');
                            }
                            case 1: {
                                double v = Unsafe.getUnsafe().getDouble(bufPos);
                                bufPos += 8L;
                                int colType = writer.getMetadata().getColumnType(colIndex);
                                switch (colType) {
                                    case 9: {
                                        row.putDouble(colIndex, v);
                                        continue block23;
                                    }
                                    case 8: {
                                        row.putFloat(colIndex, (float)v);
                                        continue block23;
                                    }
                                }
                                throw CairoException.instance(0).put("expected a line protocol float [entityType=").put(entityType).put(']');
                            }
                            case 4: {
                                byte b = Unsafe.getUnsafe().getByte(bufPos);
                                ++bufPos;
                                row.putBool(colIndex, b == 1);
                                continue block23;
                            }
                            case 3: {
                                int len = Unsafe.getUnsafe().getInt(bufPos);
                                long hi = (bufPos += 4L) + 2L * (long)len;
                                job.floatingCharSink.asCharSequence(bufPos, hi);
                                row.putStr(colIndex, job.floatingCharSink);
                                bufPos = hi;
                                continue block23;
                            }
                            case 5: {
                                int len = Unsafe.getUnsafe().getInt(bufPos);
                                long hi = (bufPos += 4L) + 2L * (long)len;
                                job.floatingCharSink.asCharSequence(bufPos, hi);
                                row.putLong256(colIndex, job.floatingCharSink);
                                bufPos = hi;
                                continue block23;
                            }
                            default: {
                                throw new UnsupportedOperationException("entityType " + entityType + " is not implemented!");
                            }
                        }
                    }
                    row.append();
                    this.tableUpdateDetails.handleRowAppended();
                }
                catch (CairoException ex) {
                    LOG.error().$("could not write line protocol measurement [tableName=").$(this.tableUpdateDetails.tableName).$(", ex=").$(ex.getFlyweightMessage()).$(", errno=").$(ex.getErrno()).I$();
                    if (row == null) break block33;
                    row.cancel();
                }
            }
        }
    }

    static interface NetworkIOJob
    extends Job {
        public void addTableUpdateDetails(TableUpdateDetails var1);

        public void close();

        public TableUpdateDetails getTableUpdateDetails(CharSequence var1);

        public ObjList<SymbolCache> getUnusedSymbolCaches();

        public int getWorkerId();
    }
}

