/*
 * Decompiled with CFR 0.152.
 */
package io.questdb.cutlass.http.processors;

import io.questdb.MessageBus;
import io.questdb.cairo.CairoEngine;
import io.questdb.cairo.CairoError;
import io.questdb.cairo.CairoException;
import io.questdb.cairo.EntryUnavailableException;
import io.questdb.cairo.sql.InsertMethod;
import io.questdb.cairo.sql.InsertStatement;
import io.questdb.cairo.sql.ReaderOutOfDateException;
import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.cutlass.http.HttpChunkedResponseSocket;
import io.questdb.cutlass.http.HttpConnectionContext;
import io.questdb.cutlass.http.HttpException;
import io.questdb.cutlass.http.HttpRequestHeader;
import io.questdb.cutlass.http.HttpRequestProcessor;
import io.questdb.cutlass.http.LocalValue;
import io.questdb.cutlass.http.ex.RetryOperationException;
import io.questdb.cutlass.http.processors.JsonQueryProcessorConfiguration;
import io.questdb.cutlass.http.processors.JsonQueryProcessorState;
import io.questdb.cutlass.http.processors.QueryCache;
import io.questdb.cutlass.text.Utf8Exception;
import io.questdb.griffin.CompiledQuery;
import io.questdb.griffin.FunctionFactoryCache;
import io.questdb.griffin.HttpSqlExecutionInterruptor;
import io.questdb.griffin.SqlCompiler;
import io.questdb.griffin.SqlException;
import io.questdb.griffin.SqlExecutionContextImpl;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.network.NoSpaceLeftInResponseBufferException;
import io.questdb.network.PeerDisconnectedException;
import io.questdb.network.PeerIsSlowToReadException;
import io.questdb.network.ServerDisconnectException;
import io.questdb.std.Chars;
import io.questdb.std.FlyweightMessageContainer;
import io.questdb.std.Misc;
import io.questdb.std.NanosecondClock;
import io.questdb.std.Numbers;
import io.questdb.std.NumericException;
import io.questdb.std.ObjList;
import io.questdb.std.str.DirectByteCharSequence;
import io.questdb.std.str.Path;
import java.io.Closeable;
import org.jetbrains.annotations.Nullable;

public class JsonQueryProcessor
implements HttpRequestProcessor,
Closeable {
    private static final LocalValue<JsonQueryProcessorState> LV = new LocalValue();
    private static final Log LOG = LogFactory.getLog(JsonQueryProcessor.class);
    protected final ObjList<QueryExecutor> queryExecutors = new ObjList();
    private final SqlCompiler compiler;
    private final JsonQueryProcessorConfiguration configuration;
    private final SqlExecutionContextImpl sqlExecutionContext;
    private final Path path = new Path();
    private final NanosecondClock nanosecondClock;
    private final HttpSqlExecutionInterruptor interruptor;

    public JsonQueryProcessor(JsonQueryProcessorConfiguration configuration, CairoEngine engine, @Nullable MessageBus messageBus, int workerCount) {
        this(configuration, engine, messageBus, workerCount, (FunctionFactoryCache)null);
    }

    public JsonQueryProcessor(JsonQueryProcessorConfiguration configuration, CairoEngine engine, @Nullable MessageBus messageBus, int workerCount, @Nullable FunctionFactoryCache functionFactoryCache) {
        this(configuration, engine, messageBus, workerCount, new SqlCompiler(engine, messageBus, functionFactoryCache));
    }

    public JsonQueryProcessor(JsonQueryProcessorConfiguration configuration, CairoEngine engine, @Nullable MessageBus messageBus, int workerCount, SqlCompiler sqlCompiler) {
        this.configuration = configuration;
        this.compiler = sqlCompiler;
        QueryExecutor sendConfirmation = JsonQueryProcessor::sendConfirmation;
        this.queryExecutors.extendAndSet(1, this::executeNewSelect);
        this.queryExecutors.extendAndSet(2, this::executeInsert);
        this.queryExecutors.extendAndSet(3, sendConfirmation);
        this.queryExecutors.extendAndSet(4, sendConfirmation);
        this.queryExecutors.extendAndSet(5, sendConfirmation);
        this.queryExecutors.extendAndSet(6, sendConfirmation);
        this.queryExecutors.extendAndSet(7, sendConfirmation);
        this.queryExecutors.extendAndSet(12, sendConfirmation);
        this.queryExecutors.extendAndSet(8, sendConfirmation);
        this.queryExecutors.extendAndSet(9, sendConfirmation);
        this.queryExecutors.extendAndSet(10, sendConfirmation);
        this.queryExecutors.extendAndSet(11, JsonQueryProcessor::cannotCopyRemote);
        this.queryExecutors.extendAndSet(13, sendConfirmation);
        this.sqlExecutionContext = new SqlExecutionContextImpl(engine, workerCount, messageBus);
        this.nanosecondClock = engine.getConfiguration().getNanosecondClock();
        this.interruptor = new HttpSqlExecutionInterruptor(configuration.getInterruptorConfiguration());
    }

    @Override
    public void close() {
        Misc.free(this.compiler);
        Misc.free(this.path);
        Misc.free(this.interruptor);
    }

    public void execute0(JsonQueryProcessorState state) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
        state.startExecutionTimer();
        HttpConnectionContext context = state.getHttpConnectionContext();
        this.sqlExecutionContext.with(context.getCairoSecurityContext(), null, null, context.getFd(), this.interruptor.of(context.getFd()));
        state.info().$("exec [q='").utf8(state.getQuery()).$("']").$();
        RecordCursorFactory factory = QueryCache.getInstance().poll(state.getQuery());
        try {
            if (factory != null) {
                try {
                    this.sqlExecutionContext.storeTelemetry((short)1, (short)2);
                    this.executeCachedSelect(state, factory, this.configuration.getKeepAliveHeader());
                }
                catch (ReaderOutOfDateException e) {
                    Misc.free(factory);
                    this.compileQuery(state);
                }
            } else {
                this.compileQuery(state);
            }
        }
        catch (SqlException e) {
            JsonQueryProcessor.syntaxError(context.getChunkedResponseSocket(), e, state, this.configuration.getKeepAliveHeader());
            JsonQueryProcessor.readyForNextRequest(context);
        }
        catch (EntryUnavailableException e) {
            LOG.info().$("[fd=").$(context.getFd()).$("] Resource busy, will retry").$();
            throw RetryOperationException.INSTANCE;
        }
        catch (CairoError | CairoException e) {
            this.internalError(context.getChunkedResponseSocket(), ((FlyweightMessageContainer)((Object)e)).getFlyweightMessage(), e, state);
            JsonQueryProcessor.readyForNextRequest(context);
        }
        catch (PeerDisconnectedException | PeerIsSlowToReadException e) {
            throw e;
        }
        catch (Throwable e) {
            state.error().$("Uh-oh. Error!").$(e).$();
            throw ServerDisconnectException.INSTANCE;
        }
    }

    @Override
    public void onRequestComplete(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
        JsonQueryProcessorState state = LV.get(context);
        if (state == null) {
            state = new JsonQueryProcessorState(context, this.nanosecondClock, this.configuration.getFloatScale(), this.configuration.getDoubleScale());
            LV.set(context, state);
        }
        state.setRnd(null);
        if (this.parseUrl(state, this.configuration.getKeepAliveHeader())) {
            this.execute0(state);
        } else {
            JsonQueryProcessor.readyForNextRequest(context);
        }
    }

    @Override
    public void resumeSend(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException {
        JsonQueryProcessorState state = LV.get(context);
        if (state != null) {
            this.sqlExecutionContext.with(context.getCairoSecurityContext(), null, state.getRnd(), context.getFd(), this.interruptor.of(context.getFd()));
            JsonQueryProcessor.doResumeSend(state, context);
        }
    }

    @Override
    public void parkRequest(HttpConnectionContext context) {
        JsonQueryProcessorState state = LV.get(context);
        if (state != null) {
            state.setRnd(this.sqlExecutionContext.getRandom());
        }
    }

    private static void doResumeSend(JsonQueryProcessorState state, HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException {
        if (state.noCursor()) {
            return;
        }
        LOG.debug().$("resume [fd=").$(context.getFd()).$(']').$();
        HttpChunkedResponseSocket socket = context.getChunkedResponseSocket();
        while (true) {
            try {
                state.resume(socket);
            }
            catch (NoSpaceLeftInResponseBufferException ignored) {
                if (socket.resetToBookmark()) {
                    socket.sendChunk(false);
                    continue;
                }
                state.logBufferTooSmall();
                throw PeerDisconnectedException.INSTANCE;
            }
            break;
        }
        JsonQueryProcessor.readyForNextRequest(context);
    }

    private static void cannotCopyRemote(JsonQueryProcessorState state, CompiledQuery cc, CharSequence keepAliveHeader) throws SqlException {
        throw SqlException.$(0, "copy from STDIN is not supported over REST");
    }

    protected static void header(HttpChunkedResponseSocket socket, CharSequence keepAliveHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
        socket.status(200, "application/json; charset=utf-8");
        socket.headers().setKeepAlive(keepAliveHeader);
        socket.sendHeader();
    }

    private static void readyForNextRequest(HttpConnectionContext context) {
        LOG.info().$("all sent [fd=").$(context.getFd()).$(", lastRequestBytesSent=").$(context.getLastRequestBytesSent()).$(", nCompletedRequests=").$(context.getNCompletedRequests() + 1).$(", totalBytesSent=").$(context.getTotalBytesSent()).$(']').$();
    }

    protected static void sendConfirmation(JsonQueryProcessorState state, CompiledQuery cq, CharSequence keepAliveHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
        HttpConnectionContext context = state.getHttpConnectionContext();
        HttpChunkedResponseSocket socket = context.getChunkedResponseSocket();
        JsonQueryProcessor.header(socket, keepAliveHeader);
        socket.put('{').putQuoted("ddl").put(':').putQuoted("OK").put('}');
        socket.sendChunk(true);
        JsonQueryProcessor.readyForNextRequest(context);
    }

    static void sendException(HttpChunkedResponseSocket socket, int position, CharSequence message, CharSequence query, CharSequence keepAliveHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
        JsonQueryProcessor.header(socket, keepAliveHeader);
        JsonQueryProcessorState.prepareExceptionJson(socket, position, message, query);
    }

    private static void syntaxError(HttpChunkedResponseSocket socket, SqlException sqlException, JsonQueryProcessorState state, CharSequence keepAliveHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
        state.logSyntaxError(sqlException);
        JsonQueryProcessor.sendException(socket, sqlException.getPosition(), sqlException.getFlyweightMessage(), state.getQuery(), keepAliveHeader);
    }

    private void compileQuery(JsonQueryProcessorState state) throws SqlException, PeerDisconnectedException, PeerIsSlowToReadException {
        long nanos = this.nanosecondClock.getTicks();
        CompiledQuery cc = this.compiler.compile(state.getQuery(), this.sqlExecutionContext);
        this.sqlExecutionContext.storeTelemetry(cc.getType(), (short)2);
        state.setCompilerNanos(this.nanosecondClock.getTicks() - nanos);
        this.queryExecutors.getQuick(cc.getType()).execute(state, cc, this.configuration.getKeepAliveHeader());
    }

    private void executeCachedSelect(JsonQueryProcessorState state, RecordCursorFactory factory, CharSequence keepAliveHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
        state.setCompilerNanos(0L);
        state.logExecuteCached();
        this.executeSelect(state, factory, keepAliveHeader);
    }

    @Override
    public void onRequestRetry(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
        JsonQueryProcessorState state = LV.get(context);
        this.execute0(state);
    }

    @Override
    public void failRequest(HttpConnectionContext context, HttpException e) throws PeerDisconnectedException, PeerIsSlowToReadException {
        JsonQueryProcessorState state = LV.get(context);
        this.internalError(context.getChunkedResponseSocket(), e.getFlyweightMessage(), e, state);
        JsonQueryProcessor.readyForNextRequest(context);
    }

    private void executeInsert(JsonQueryProcessorState state, CompiledQuery cc, CharSequence keepAliveHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
        InsertStatement insertStatement = cc.getInsertStatement();
        try (InsertMethod insertMethod = insertStatement.createMethod(this.sqlExecutionContext);){
            insertMethod.execute();
            insertMethod.commit();
        }
        JsonQueryProcessor.sendConfirmation(state, cc, keepAliveHeader);
    }

    private void executeNewSelect(JsonQueryProcessorState state, CompiledQuery cc, CharSequence keepAliveHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
        state.logExecuteNew();
        RecordCursorFactory factory = cc.getRecordCursorFactory();
        this.executeSelect(state, factory, keepAliveHeader);
    }

    private void executeSelect(JsonQueryProcessorState state, RecordCursorFactory factory, CharSequence keepAliveHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
        HttpConnectionContext context = state.getHttpConnectionContext();
        try {
            if (state.of(factory, this.sqlExecutionContext)) {
                JsonQueryProcessor.header(context.getChunkedResponseSocket(), keepAliveHeader);
                JsonQueryProcessor.doResumeSend(state, context);
            } else {
                JsonQueryProcessor.readyForNextRequest(context);
            }
        }
        catch (CairoException ex) {
            state.setQueryCacheable(ex.isCacheable());
            throw ex;
        }
    }

    private void internalError(HttpChunkedResponseSocket socket, CharSequence message, Throwable e, JsonQueryProcessorState state) throws PeerDisconnectedException, PeerIsSlowToReadException {
        if (e instanceof CairoException && ((CairoException)e).isInterruption()) {
            state.info().$("query cancelled [q=`").utf8(state.getQuery()).$("`, reason=`").$(((CairoException)e).getFlyweightMessage()).$("`]").$();
        } else {
            state.error().$("internal error [q=`").utf8(state.getQuery()).$("`, ex=").$(e).$(']').$();
        }
        JsonQueryProcessor.sendException(socket, 0, message, state.getQuery(), this.configuration.getKeepAliveHeader());
    }

    private boolean parseUrl(JsonQueryProcessorState state, CharSequence keepAliveHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
        HttpRequestHeader header = state.getHttpConnectionContext().getRequestHeader();
        DirectByteCharSequence query = header.getUrlParam("query");
        if (query == null || query.length() == 0) {
            state.info().$("Empty query header received. Sending empty reply.").$();
            JsonQueryProcessor.sendException(state.getHttpConnectionContext().getChunkedResponseSocket(), 0, "No query text", query, keepAliveHeader);
            return false;
        }
        long skip = 0L;
        long stop = Long.MAX_VALUE;
        DirectByteCharSequence limit = header.getUrlParam("limit");
        if (limit != null) {
            int sepPos = Chars.indexOf(limit, ',');
            try {
                if (sepPos > 0) {
                    skip = Numbers.parseLong(limit, 0, sepPos) - 1L;
                    if (sepPos + 1 < limit.length()) {
                        stop = Numbers.parseLong(limit, sepPos + 1, limit.length());
                    }
                } else {
                    stop = Numbers.parseLong(limit);
                }
            }
            catch (NumericException numericException) {
                // empty catch block
            }
        }
        if (stop < 0L) {
            stop = 0L;
        }
        if (skip < 0L) {
            skip = 0L;
        }
        if (stop - skip > this.configuration.getMaxQueryResponseRowLimit()) {
            stop = skip + this.configuration.getMaxQueryResponseRowLimit();
        }
        try {
            state.configure(header, query, skip, stop);
        }
        catch (Utf8Exception e) {
            state.info().$("Bad UTF8 encoding").$();
            JsonQueryProcessor.sendException(state.getHttpConnectionContext().getChunkedResponseSocket(), 0, "Bad UTF8 encoding in query text", query, keepAliveHeader);
            return false;
        }
        return true;
    }

    @FunctionalInterface
    public static interface QueryExecutor {
        public void execute(JsonQueryProcessorState var1, CompiledQuery var2, CharSequence var3) throws PeerDisconnectedException, PeerIsSlowToReadException, SqlException;
    }
}

