/*
 * Decompiled with CFR 0.152.
 */
package io.greptime;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.google.protobuf.MessageLite;
import io.greptime.Query;
import io.greptime.RouterClient;
import io.greptime.Util;
import io.greptime.common.Display;
import io.greptime.common.Endpoint;
import io.greptime.common.Lifecycle;
import io.greptime.common.util.Clock;
import io.greptime.common.util.Ensures;
import io.greptime.common.util.MetricsUtil;
import io.greptime.common.util.SerializingExecutor;
import io.greptime.flight.AsyncExecCallOption;
import io.greptime.flight.GreptimeFlightClient;
import io.greptime.flight.GreptimeRequest;
import io.greptime.models.Err;
import io.greptime.models.QueryOk;
import io.greptime.models.QueryRequest;
import io.greptime.models.Result;
import io.greptime.models.SelectRows;
import io.greptime.options.QueryOptions;
import io.greptime.rpc.Context;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.arrow.flight.CallHeaders;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.FlightCallHeaders;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.HeaderCallOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryClient
implements Query,
Lifecycle<QueryOptions>,
Display {
    private static final Logger LOG = LoggerFactory.getLogger(QueryClient.class);
    private static final AtomicLong QUERY_ID = new AtomicLong(0L);
    private QueryOptions opts;
    private RouterClient routerClient;
    private Executor asyncPool;

    public boolean init(QueryOptions opts) {
        this.opts = (QueryOptions)Ensures.ensureNonNull((Object)opts, (String)"null `QueryClient.opts`");
        this.routerClient = this.opts.getRouterClient();
        Executor pool = this.opts.getAsyncPool();
        this.asyncPool = pool != null ? pool : new SerializingExecutor("query_client");
        return true;
    }

    public void shutdownGracefully() {
    }

    @Override
    public CompletableFuture<Result<QueryOk, Err>> query(QueryRequest req, Context ctx) {
        Ensures.ensureNonNull((Object)req, (String)"null `request");
        ctx.with("QueryId", (Object)QUERY_ID.incrementAndGet());
        long startCall = Clock.defaultClock().getTick();
        ctx.with("QueryStart", (Object)startCall);
        return this.query0(req, ctx, 0).whenCompleteAsync((r, e) -> {
            InnerMetricHelper.readQps().mark();
            if (r == null || !r.isOk()) {
                InnerMetricHelper.readFailureNum().mark();
            }
        }, this.asyncPool);
    }

    private CompletableFuture<Result<QueryOk, Err>> query0(QueryRequest req, Context ctx, int retries) {
        InnerMetricHelper.readByRetries(retries).mark();
        return ((CompletableFuture)this.routerClient.route().thenComposeAsync(endpoint -> this.queryFrom((Endpoint)endpoint, req, ctx, retries), this.asyncPool)).thenComposeAsync(r -> {
            if (r.isOk()) {
                LOG.debug("Success to read from {}, ok={}.", (Object)"GreptimeDB", r.getOk());
                return Util.completedCf(r);
            }
            Err err = (Err)r.getErr();
            LOG.warn("Failed to read from {}, retries={}, err={}.", new Object[]{"GreptimeDB", retries, err});
            if (retries > this.opts.getMaxRetries()) {
                LOG.error("Retried {} times still failed.", (Object)retries);
                return Util.completedCf(r);
            }
            if (Util.shouldNotRetry(err)) {
                return Util.completedCf(r);
            }
            return this.query0(req, ctx, retries + 1);
        }, this.asyncPool);
    }

    private CompletableFuture<Result<QueryOk, Err>> queryFrom(Endpoint endpoint, QueryRequest req, Context ctx, int retries) {
        GreptimeFlightClient flightClient = this.routerClient.getFlightClient(endpoint);
        if (this.opts.getAuthInfo() != null) {
            req.setAuthInfo(this.opts.getAuthInfo());
        }
        GreptimeRequest request = new GreptimeRequest((MessageLite)req.into());
        FlightCallHeaders headers = new FlightCallHeaders();
        headers.insert("retries", String.valueOf(retries));
        HeaderCallOption headerOption = new HeaderCallOption((CallHeaders)headers);
        AsyncExecCallOption execOption = new AsyncExecCallOption(this.asyncPool);
        FlightStream stream = flightClient.doRequest(request, new CallOption[]{headerOption, execOption}).getStream();
        ctx.with("Endpoint", (Object)endpoint);
        SelectRows.DefaultSelectRows rows = new SelectRows.DefaultSelectRows(ctx, InnerMetricHelper.readRowsNum(), stream);
        return Util.completedCf(Result.ok(QueryOk.ok(req.getQl(), rows)));
    }

    public void display(Display.Printer out) {
        out.println((Object)"--- QueryClient ---").print((Object)"maxRetries=").println((Object)this.opts.getMaxRetries()).print((Object)"asyncPool=").println((Object)this.asyncPool);
    }

    public String toString() {
        return "QueryClient{opts=" + this.opts + ", routerClient=" + this.routerClient + ", asyncPool=" + this.asyncPool + '}';
    }

    static final class InnerMetricHelper {
        static final Histogram READ_ROWS_NUM = MetricsUtil.histogram((Object)"read_rows_num");
        static final Meter READ_FAILURE_NUM = MetricsUtil.meter((Object)"read_failure_num");
        static final Meter READ_QPS = MetricsUtil.meter((Object)"read_qps");

        InnerMetricHelper() {
        }

        static Histogram readRowsNum() {
            return READ_ROWS_NUM;
        }

        static Meter readFailureNum() {
            return READ_FAILURE_NUM;
        }

        static Meter readQps() {
            return READ_QPS;
        }

        static Meter readByRetries(int retries) {
            return MetricsUtil.meter((Object[])new Object[]{"read_by_retries", Math.min(3, retries)});
        }
    }
}

