/*
 * Decompiled with CFR 0.152.
 */
package io.questdb.cutlass.pgwire;

import io.questdb.MessageBus;
import io.questdb.WorkerPoolAwareConfiguration;
import io.questdb.cairo.CairoEngine;
import io.questdb.cutlass.pgwire.BadProtocolException;
import io.questdb.cutlass.pgwire.PGConnectionContext;
import io.questdb.cutlass.pgwire.PGJobContext;
import io.questdb.cutlass.pgwire.PGWireConfiguration;
import io.questdb.griffin.FunctionFactoryCache;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.EagerThreadSetup;
import io.questdb.mp.Job;
import io.questdb.mp.WorkerPool;
import io.questdb.network.IOContextFactory;
import io.questdb.network.IODispatcher;
import io.questdb.network.IODispatchers;
import io.questdb.network.IORequestProcessor;
import io.questdb.network.PeerDisconnectedException;
import io.questdb.network.PeerIsSlowToReadException;
import io.questdb.network.PeerIsSlowToWriteException;
import io.questdb.std.Misc;
import io.questdb.std.ThreadLocal;
import io.questdb.std.WeakObjectPool;
import java.io.Closeable;
import org.jetbrains.annotations.Nullable;

public class PGWireServer
implements Closeable {
    private static final Log LOG = LogFactory.getLog(PGWireServer.class);
    private final IODispatcher<PGConnectionContext> dispatcher;
    private final PGConnectionContextFactory contextFactory;
    private final WorkerPool workerPool;
    private final MessageBus messageBus;

    public PGWireServer(PGWireConfiguration configuration, CairoEngine engine, WorkerPool workerPool, boolean workerPoolLocal, MessageBus messageBus, FunctionFactoryCache functionFactoryCache) {
        this.messageBus = messageBus;
        this.contextFactory = new PGConnectionContextFactory(engine, configuration, messageBus, workerPool.getWorkerCount());
        this.dispatcher = IODispatchers.create(configuration.getDispatcherConfiguration(), this.contextFactory);
        workerPool.assign(this.dispatcher);
        int n = workerPool.getWorkerCount();
        for (int i = 0; i < n; ++i) {
            final PGJobContext jobContext = new PGJobContext(configuration, engine, messageBus, functionFactoryCache);
            workerPool.assign(i, new Job(){
                private final IORequestProcessor<PGConnectionContext> processor = (operation, context) -> {
                    try {
                        jobContext.handleClientOperation((PGConnectionContext)context, operation);
                        context.getDispatcher().registerChannel((PGConnectionContext)context, 1);
                    }
                    catch (PeerIsSlowToWriteException e) {
                        context.getDispatcher().registerChannel((PGConnectionContext)context, 1);
                    }
                    catch (PeerIsSlowToReadException e) {
                        context.getDispatcher().registerChannel((PGConnectionContext)context, 4);
                    }
                    catch (BadProtocolException | PeerDisconnectedException e) {
                        context.getDispatcher().disconnect((PGConnectionContext)context);
                    }
                };

                @Override
                public boolean run(int workerId) {
                    return PGWireServer.this.dispatcher.processIOQueue(this.processor);
                }
            });
            workerPool.assign(i, () -> {
                Misc.free(jobContext);
                this.contextFactory.closeContextPool();
            });
        }
        this.workerPool = workerPoolLocal ? workerPool : null;
    }

    @Nullable
    public static PGWireServer create(PGWireConfiguration configuration, WorkerPool sharedWorkerPool, Log log, CairoEngine cairoEngine, FunctionFactoryCache functionFactoryCache) {
        return WorkerPoolAwareConfiguration.create(configuration, sharedWorkerPool, log, cairoEngine, (conf, engine, workerPool, local, bus, functionFactoryCache1) -> new PGWireServer((PGWireConfiguration)conf, cairoEngine, workerPool, local, bus, functionFactoryCache1), functionFactoryCache);
    }

    @Override
    public void close() {
        if (this.workerPool != null) {
            this.workerPool.halt();
        }
        Misc.free(this.contextFactory);
        Misc.free(this.dispatcher);
        if (this.workerPool != null) {
            Misc.free(this.messageBus);
        }
    }

    private static class PGConnectionContextFactory
    implements IOContextFactory<PGConnectionContext>,
    Closeable,
    EagerThreadSetup {
        private final ThreadLocal<WeakObjectPool<PGConnectionContext>> contextPool = new ThreadLocal<WeakObjectPool>(() -> new WeakObjectPool<PGConnectionContext>(() -> new PGConnectionContext(engine, configuration, messageBus, workerCount), configuration.getConnectionPoolInitialCapacity()));
        private boolean closed = false;

        public PGConnectionContextFactory(CairoEngine engine, PGWireConfiguration configuration, @Nullable MessageBus messageBus, int workerCount) {
        }

        @Override
        public void close() {
            this.closed = true;
        }

        @Override
        public PGConnectionContext newInstance(long fd, IODispatcher<PGConnectionContext> dispatcher) {
            return ((PGConnectionContext)((WeakObjectPool)this.contextPool.get()).pop()).of(fd, dispatcher);
        }

        @Override
        public void done(PGConnectionContext context) {
            if (this.closed) {
                Misc.free(context);
            } else {
                context.of(-1L, null);
                ((WeakObjectPool)this.contextPool.get()).push(context);
                LOG.debug().$("pushed").$();
            }
        }

        @Override
        public void setup() {
            this.contextPool.get();
        }

        private void closeContextPool() {
            Misc.free(this.contextPool.get());
            LOG.info().$("closed").$();
        }
    }
}

