/*
 * Decompiled with CFR 0.152.
 */
package io.questdb.cutlass.line.tcp;

import io.questdb.WorkerPoolAwareConfiguration;
import io.questdb.cairo.CairoEngine;
import io.questdb.cutlass.line.tcp.AggressiveRecvLineTcpConnectionContext;
import io.questdb.cutlass.line.tcp.AuthDb;
import io.questdb.cutlass.line.tcp.LineTcpAuthConnectionContext;
import io.questdb.cutlass.line.tcp.LineTcpConnectionContext;
import io.questdb.cutlass.line.tcp.LineTcpMeasurementScheduler;
import io.questdb.cutlass.line.tcp.LineTcpReceiverConfiguration;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.EagerThreadSetup;
import io.questdb.mp.WorkerPool;
import io.questdb.network.IOContextFactory;
import io.questdb.network.IODispatcher;
import io.questdb.network.IODispatchers;
import io.questdb.std.Misc;
import io.questdb.std.ObjList;
import io.questdb.std.ObjectFactory;
import io.questdb.std.ThreadLocal;
import io.questdb.std.WeakObjectPool;
import io.questdb.std.str.Path;
import java.io.Closeable;
import org.jetbrains.annotations.Nullable;

public class LineTcpServer
implements Closeable {
    private static final Log LOG = LogFactory.getLog(LineTcpServer.class);
    private final IODispatcher<LineTcpConnectionContext> dispatcher;
    private final LineTcpConnectionContextFactory contextFactory;
    private final LineTcpMeasurementScheduler scheduler;
    private final ObjList<WorkerPool> dedicatedPools;

    public LineTcpServer(LineTcpReceiverConfiguration lineConfiguration, CairoEngine engine, WorkerPool ioWorkerPool, WorkerPool writerWorkerPool, ObjList<WorkerPool> dedicatedPools) {
        this.contextFactory = new LineTcpConnectionContextFactory(lineConfiguration);
        this.dispatcher = IODispatchers.create(lineConfiguration.getNetDispatcherConfiguration(), this.contextFactory);
        this.dedicatedPools = dedicatedPools;
        ioWorkerPool.assign(this.dispatcher);
        this.scheduler = new LineTcpMeasurementScheduler(lineConfiguration, engine, ioWorkerPool, this.dispatcher, writerWorkerPool);
        Closeable cleaner = () -> this.contextFactory.closeContextPool();
        int n = ioWorkerPool.getWorkerCount();
        for (int i = 0; i < n; ++i) {
            ioWorkerPool.assign(i, cleaner);
        }
    }

    @Nullable
    public static LineTcpServer create(LineTcpReceiverConfiguration lineConfiguration, WorkerPool sharedWorkerPool, Log log, CairoEngine cairoEngine) {
        if (!lineConfiguration.isEnabled()) {
            return null;
        }
        ObjList<WorkerPool> dedicatedPools = new ObjList<WorkerPool>(2);
        WorkerPool ioWorkerPool = WorkerPoolAwareConfiguration.configureWorkerPool(lineConfiguration.getIOWorkerPoolConfiguration(), sharedWorkerPool);
        WorkerPool writerWorkerPool = WorkerPoolAwareConfiguration.configureWorkerPool(lineConfiguration.getWriterWorkerPoolConfiguration(), sharedWorkerPool);
        if (ioWorkerPool != sharedWorkerPool) {
            ioWorkerPool.assignCleaner(Path.CLEANER);
            dedicatedPools.add(ioWorkerPool);
        }
        if (writerWorkerPool != sharedWorkerPool) {
            writerWorkerPool.assignCleaner(Path.CLEANER);
            dedicatedPools.add(writerWorkerPool);
        }
        LineTcpServer lineTcpServer = new LineTcpServer(lineConfiguration, cairoEngine, ioWorkerPool, writerWorkerPool, dedicatedPools);
        if (ioWorkerPool != sharedWorkerPool) {
            ioWorkerPool.start(log);
        }
        if (writerWorkerPool != sharedWorkerPool) {
            writerWorkerPool.start(log);
        }
        return lineTcpServer;
    }

    @Override
    public void close() {
        int sz = this.dedicatedPools.size();
        for (int n = 0; n < sz; ++n) {
            this.dedicatedPools.get(n).halt();
        }
        Misc.free(this.scheduler);
        Misc.free(this.contextFactory);
        Misc.free(this.dispatcher);
    }

    private class LineTcpConnectionContextFactory
    implements IOContextFactory<LineTcpConnectionContext>,
    Closeable,
    EagerThreadSetup {
        private final ThreadLocal<WeakObjectPool<LineTcpConnectionContext>> contextPool;
        private boolean closed = false;

        public LineTcpConnectionContextFactory(LineTcpReceiverConfiguration configuration) {
            ObjectFactory<LineTcpConnectionContext> factory;
            if (null == configuration.getAuthDbPath()) {
                factory = configuration.isIOAggressiveRecv() ? () -> new AggressiveRecvLineTcpConnectionContext(configuration, LineTcpServer.this.scheduler) : () -> new LineTcpConnectionContext(configuration, LineTcpServer.this.scheduler);
            } else {
                AuthDb authDb = new AuthDb(configuration);
                factory = () -> new LineTcpAuthConnectionContext(configuration, authDb, LineTcpServer.this.scheduler);
            }
            this.contextPool = new ThreadLocal<WeakObjectPool>(() -> new WeakObjectPool(factory, configuration.getConnectionPoolInitialCapacity()));
        }

        @Override
        public void close() {
            this.closed = true;
        }

        @Override
        public LineTcpConnectionContext newInstance(long fd, IODispatcher<LineTcpConnectionContext> dispatcher) {
            return ((LineTcpConnectionContext)((WeakObjectPool)this.contextPool.get()).pop()).of(fd, dispatcher);
        }

        @Override
        public void done(LineTcpConnectionContext context) {
            if (this.closed) {
                Misc.free(context);
            } else {
                context.of(-1L, null);
                ((WeakObjectPool)this.contextPool.get()).push(context);
                LOG.debug().$("pushed").$();
            }
        }

        @Override
        public void setup() {
            this.contextPool.get();
        }

        private void closeContextPool() {
            Misc.free(this.contextPool.get());
            LOG.info().$("closed").$();
        }
    }
}

