/*
 * Decompiled with CFR 0.152.
 */
package io.greptime;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.google.common.util.concurrent.RateLimiter;
import io.greptime.RouterClient;
import io.greptime.Status;
import io.greptime.StreamWriter;
import io.greptime.Util;
import io.greptime.Write;
import io.greptime.common.Display;
import io.greptime.common.Endpoint;
import io.greptime.common.Lifecycle;
import io.greptime.common.util.Clock;
import io.greptime.common.util.Ensures;
import io.greptime.common.util.MetricsUtil;
import io.greptime.common.util.SerializingExecutor;
import io.greptime.errors.LimitedException;
import io.greptime.errors.ServerException;
import io.greptime.errors.StreamException;
import io.greptime.limit.AbstractLimiter;
import io.greptime.limit.LimitedPolicy;
import io.greptime.limit.WriteLimiter;
import io.greptime.models.Err;
import io.greptime.models.Result;
import io.greptime.models.WriteOk;
import io.greptime.models.WriteRows;
import io.greptime.models.WriteRowsHelper;
import io.greptime.options.WriteOptions;
import io.greptime.rpc.Context;
import io.greptime.rpc.Observer;
import io.greptime.v1.Common;
import io.greptime.v1.Database;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WriteClient
implements Write,
Lifecycle<WriteOptions>,
Display {
    private static final Logger LOG = LoggerFactory.getLogger(WriteClient.class);
    private WriteOptions opts;
    private RouterClient routerClient;
    private Executor asyncPool;
    private WriteLimiter writeLimiter;

    public boolean init(WriteOptions opts) {
        this.opts = (WriteOptions)Ensures.ensureNonNull((Object)opts, (String)"null `WriteClient.opts`");
        this.routerClient = this.opts.getRouterClient();
        Executor pool = this.opts.getAsyncPool();
        this.asyncPool = pool != null ? pool : new SerializingExecutor("write_client");
        this.writeLimiter = new DefaultWriteLimiter(this.opts.getMaxInFlightWriteRows(), this.opts.getLimitedPolicy());
        return true;
    }

    public void shutdownGracefully() {
    }

    @Override
    public CompletableFuture<Result<WriteOk, Err>> writeBatch(Collection<WriteRows> rows, Context ctx) {
        Ensures.ensureNonNull(rows, (String)"null `rows`");
        Ensures.ensure((!rows.isEmpty() ? 1 : 0) != 0, (Object)"empty `rows`");
        long startCall = Clock.defaultClock().getTick();
        return this.writeLimiter.acquireAndDo(rows, () -> this.write0(rows, ctx, 0).whenCompleteAsync((r, e) -> {
            InnerMetricHelper.writeQps().mark();
            if (r != null) {
                if (Util.isRwLogging()) {
                    LOG.info("Write to {}, duration={} ms, result={}.", new Object[]{"GreptimeDB", Clock.defaultClock().duration(startCall), r});
                }
                if (r.isOk()) {
                    WriteOk ok = (WriteOk)r.getOk();
                    InnerMetricHelper.writeRowsSuccessNum().update(ok.getSuccess());
                    InnerMetricHelper.writeRowsFailureNum().update(ok.getFailure());
                    return;
                }
            }
            InnerMetricHelper.writeFailureNum().mark();
        }, this.asyncPool));
    }

    @Override
    public StreamWriter<WriteRows, WriteOk> streamWriter(int maxPointsPerSecond, Context ctx) {
        int permitsPerSecond = maxPointsPerSecond > 0 ? maxPointsPerSecond : this.opts.getDefaultStreamMaxWritePointsPerSecond();
        final CompletableFuture respFuture = new CompletableFuture();
        return (StreamWriter)((CompletableFuture)((CompletableFuture)this.routerClient.route().thenApply(endpoint -> this.streamWriteTo((Endpoint)endpoint, ctx, (Observer<WriteOk>)Util.toObserver(respFuture)))).thenApply(reqObserver -> new RateLimitingStreamWriter((Observer)reqObserver, permitsPerSecond, (Observer)reqObserver){
            final /* synthetic */ Observer val$reqObserver;
            {
                this.val$reqObserver = observer2;
                super((Observer<WriteRows>)observer, permitsPerSecond);
            }

            @Override
            public StreamWriter<WriteRows, WriteOk> write(WriteRows rows) {
                if (respFuture.isCompletedExceptionally()) {
                    respFuture.getNow(null);
                }
                return super.write(rows);
            }

            @Override
            public CompletableFuture<WriteOk> completed() {
                this.val$reqObserver.onCompleted();
                return respFuture;
            }
        })).join();
    }

    private CompletableFuture<Result<WriteOk, Err>> write0(Collection<WriteRows> rows, Context ctx, int retries) {
        InnerMetricHelper.writeByRetries(retries).mark();
        return ((CompletableFuture)this.routerClient.route().thenComposeAsync(endpoint -> this.writeTo((Endpoint)endpoint, rows, ctx, retries), this.asyncPool)).thenComposeAsync(r -> {
            if (r.isOk()) {
                LOG.debug("Success to write to {}, ok={}.", (Object)"GreptimeDB", r.getOk());
                return Util.completedCf(r);
            }
            Err err = (Err)r.getErr();
            LOG.warn("Failed to write to {}, retries={}, err={}.", new Object[]{"GreptimeDB", retries, err});
            if (retries + 1 > this.opts.getMaxRetries()) {
                LOG.error("Retried {} times still failed.", (Object)retries);
                return Util.completedCf(r);
            }
            if (Util.shouldNotRetry(err)) {
                return Util.completedCf(r);
            }
            return this.write0(rows, ctx, retries + 1);
        }, this.asyncPool);
    }

    private CompletableFuture<Result<WriteOk, Err>> writeTo(Endpoint endpoint, Collection<WriteRows> rows, Context ctx, int retries) {
        Collection tableNames = rows.stream().map(WriteRows::tableName).collect(Collectors.toList());
        Database.GreptimeRequest req = WriteRowsHelper.toGreptimeRequest(tableNames, rows, this.opts.getAuthInfo());
        ctx.with("retries", (Object)retries);
        CompletableFuture future = this.routerClient.invoke(endpoint, req, ctx);
        return future.thenApplyAsync(resp -> {
            Common.ResponseHeader header = resp.getHeader();
            Common.Status status = header.getStatus();
            int statusCode = status.getStatusCode();
            if (Status.isSuccess(statusCode)) {
                int affectedRows = resp.getAffectedRows().getValue();
                return WriteOk.ok(affectedRows, 0, tableNames).mapToResult();
            }
            return Err.writeErr(statusCode, new ServerException(status.getErrMsg()), endpoint, rows).mapToResult();
        }, this.asyncPool);
    }

    private Observer<WriteRows> streamWriteTo(Endpoint endpoint, Context ctx, final Observer<WriteOk> respObserver) {
        final Observer<Database.GreptimeRequest> rpcObserver = this.routerClient.invokeClientStreaming(endpoint, Database.GreptimeRequest.getDefaultInstance(), ctx, new Observer<Database.GreptimeResponse>(){

            public void onNext(Database.GreptimeResponse resp) {
                int affectedRows = resp.getAffectedRows().getValue();
                Result<WriteOk, Err> ret = WriteOk.ok(affectedRows, 0, null).mapToResult();
                if (ret.isOk()) {
                    respObserver.onNext((Object)ret.getOk());
                } else {
                    respObserver.onError((Throwable)new StreamException(String.valueOf(ret.getErr())));
                }
            }

            public void onError(Throwable err) {
                respObserver.onError(err);
            }

            public void onCompleted() {
                respObserver.onCompleted();
            }
        });
        return new Observer<WriteRows>(){

            public void onNext(WriteRows rows) {
                Database.GreptimeRequest req = WriteRowsHelper.toGreptimeRequest(rows, WriteClient.this.opts.getAuthInfo());
                rpcObserver.onNext((Object)req);
            }

            public void onError(Throwable err) {
                rpcObserver.onError(err);
            }

            public void onCompleted() {
                rpcObserver.onCompleted();
            }
        };
    }

    public void display(Display.Printer out) {
        out.println((Object)"--- WriteClient ---").print((Object)"maxRetries=").println((Object)this.opts.getMaxRetries()).print((Object)"asyncPool=").println((Object)this.asyncPool);
    }

    public String toString() {
        return "WriteClient{opts=" + this.opts + ", routerClient=" + this.routerClient + ", asyncPool=" + this.asyncPool + '}';
    }

    static abstract class RateLimitingStreamWriter
    implements StreamWriter<WriteRows, WriteOk> {
        private final Observer<WriteRows> observer;
        private final RateLimiter rateLimiter;

        RateLimitingStreamWriter(Observer<WriteRows> observer, double permitsPerSecond) {
            this.observer = observer;
            this.rateLimiter = permitsPerSecond > 0.0 ? RateLimiter.create((double)permitsPerSecond) : null;
        }

        @Override
        public StreamWriter<WriteRows, WriteOk> write(WriteRows rows) {
            Ensures.ensureNonNull((Object)rows, (String)"null `rows`");
            if (this.rateLimiter != null) {
                double timeSpent = this.rateLimiter.acquire(rows.pointCount());
                InnerMetricHelper.writeStreamLimiterTimeSpent().update((long)timeSpent);
            }
            this.observer.onNext((Object)rows);
            return this;
        }
    }

    static class DefaultWriteLimiter
    extends WriteLimiter {
        public DefaultWriteLimiter(int maxInFlight, LimitedPolicy policy) {
            super(maxInFlight, policy, "write_limiter_acquire");
        }

        @Override
        public int calculatePermits(Collection<WriteRows> in) {
            return in.stream().map(WriteRows::rowCount).reduce(0, Integer::sum);
        }

        @Override
        public Result<WriteOk, Err> rejected(Collection<WriteRows> in, AbstractLimiter.RejectedState state) {
            String errMsg = String.format("Write limited by client, acquirePermits=%d, maxPermits=%d, availablePermits=%d.", state.acquirePermits(), state.maxPermits(), state.availablePermits());
            return Result.err(Err.writeErr(503, new LimitedException(errMsg), null, in));
        }
    }

    static final class InnerMetricHelper {
        static final Histogram WRITE_ROWS_SUCCESS_NUM = MetricsUtil.histogram((Object)"write_rows_success_num");
        static final Histogram WRITE_ROWS_FAILURE_NUM = MetricsUtil.histogram((Object)"write_rows_failure_num");
        static final Histogram WRITE_STREAM_LIMITER_TIME_SPENT = MetricsUtil.histogram((Object)"write_stream_limiter_time_spent");
        static final Meter WRITE_FAILURE_NUM = MetricsUtil.meter((Object)"write_failure_num");
        static final Meter WRITE_QPS = MetricsUtil.meter((Object)"write_qps");

        InnerMetricHelper() {
        }

        static Histogram writeRowsSuccessNum() {
            return WRITE_ROWS_SUCCESS_NUM;
        }

        static Histogram writeRowsFailureNum() {
            return WRITE_ROWS_FAILURE_NUM;
        }

        static Histogram writeStreamLimiterTimeSpent() {
            return WRITE_STREAM_LIMITER_TIME_SPENT;
        }

        static Meter writeFailureNum() {
            return WRITE_FAILURE_NUM;
        }

        static Meter writeQps() {
            return WRITE_QPS;
        }

        static Meter writeByRetries(int retries) {
            return MetricsUtil.meter((Object[])new Object[]{"write_by_retries", Math.min(3, retries)});
        }
    }
}

