/*
 * Decompiled with CFR 0.152.
 */
package com.azure.search.documents.implementation.batching;

import com.azure.core.exception.HttpResponseException;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.JsonSerializer;
import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.search.documents.implementation.SearchIndexClientImpl;
import com.azure.search.documents.implementation.batching.IndexBatchResponse;
import com.azure.search.documents.implementation.batching.IndexingDocumentManager;
import com.azure.search.documents.implementation.batching.SearchBatchingUtils;
import com.azure.search.documents.implementation.batching.TryTrackingIndexAction;
import com.azure.search.documents.implementation.converters.IndexActionConverter;
import com.azure.search.documents.implementation.models.IndexAction;
import com.azure.search.documents.implementation.util.Utility;
import com.azure.search.documents.models.IndexBatchException;
import com.azure.search.documents.models.IndexDocumentsResult;
import com.azure.search.documents.models.IndexingResult;
import com.azure.search.documents.options.OnActionAddedOptions;
import com.azure.search.documents.options.OnActionErrorOptions;
import com.azure.search.documents.options.OnActionSentOptions;
import com.azure.search.documents.options.OnActionSucceededOptions;
import java.time.Duration;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import reactor.util.function.Tuple2;

public final class SearchIndexingPublisher<T> {
    private static final ClientLogger LOGGER = new ClientLogger(SearchIndexingPublisher.class);
    private static final ExecutorService EXECUTOR = SearchIndexingPublisher.getThreadPoolWithShutdownHook();
    private final SearchIndexClientImpl restClient;
    private final JsonSerializer serializer;
    private final boolean autoFlush;
    private int batchSize;
    private final int maxRetries;
    private final long throttlingDelayNanos;
    private final long maxThrottlingDelayNanos;
    private final Consumer<OnActionAddedOptions<T>> onActionAdded;
    private final Consumer<OnActionSentOptions<T>> onActionSent;
    private final Consumer<OnActionSucceededOptions<T>> onActionSucceeded;
    private final Consumer<OnActionErrorOptions<T>> onActionError;
    private final Function<T, String> documentKeyRetriever;
    private final Function<Integer, Integer> scaleDownFunction = size -> size / 2;
    private final IndexingDocumentManager<T> documentManager;
    private final ReentrantLock lock = new ReentrantLock(true);
    volatile AtomicInteger backoffCount = new AtomicInteger();
    volatile Duration currentRetryDelay = Duration.ZERO;

    public SearchIndexingPublisher(SearchIndexClientImpl restClient, JsonSerializer serializer, Function<T, String> documentKeyRetriever, boolean autoFlush, int initialBatchActionCount, int maxRetriesPerAction, Duration throttlingDelay, Duration maxThrottlingDelay, Consumer<OnActionAddedOptions<T>> onActionAdded, Consumer<OnActionSucceededOptions<T>> onActionSucceeded, Consumer<OnActionErrorOptions<T>> onActionError, Consumer<OnActionSentOptions<T>> onActionSent) {
        this.documentKeyRetriever = Objects.requireNonNull(documentKeyRetriever, "'documentKeyRetriever' cannot be null");
        this.restClient = restClient;
        this.serializer = serializer;
        this.documentManager = new IndexingDocumentManager();
        this.autoFlush = autoFlush;
        this.batchSize = initialBatchActionCount;
        this.maxRetries = maxRetriesPerAction;
        this.throttlingDelayNanos = throttlingDelay.toNanos();
        this.maxThrottlingDelayNanos = maxThrottlingDelay.compareTo(throttlingDelay) < 0 ? this.throttlingDelayNanos : maxThrottlingDelay.toNanos();
        this.onActionAdded = onActionAdded;
        this.onActionSent = onActionSent;
        this.onActionSucceeded = onActionSucceeded;
        this.onActionError = onActionError;
    }

    public Collection<com.azure.search.documents.models.IndexAction<T>> getActions() {
        return this.documentManager.getActions();
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public Duration getCurrentRetryDelay() {
        return this.currentRetryDelay;
    }

    public void addActions(Collection<com.azure.search.documents.models.IndexAction<T>> actions, Duration timeout, Context context, Runnable rescheduleFlush) {
        Tuple2<Integer, Boolean> batchSizeAndAvailable = this.documentManager.addAndCheckForBatch(actions, this.documentKeyRetriever, this.onActionAdded, this.batchSize);
        LOGGER.verbose("Actions added, new pending queue size: {}.", new Object[]{batchSizeAndAvailable.getT1()});
        if (this.autoFlush && ((Boolean)batchSizeAndAvailable.getT2()).booleanValue()) {
            rescheduleFlush.run();
            LOGGER.verbose("Adding documents triggered batch size limit, sending documents for indexing.");
            this.flush(false, false, timeout, context);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void flush(boolean awaitLock, boolean isClose, Duration timeout, Context context) {
        if (awaitLock) {
            this.lock.lock();
            try {
                this.flushLoop(isClose, timeout, context);
            }
            finally {
                this.lock.unlock();
            }
        } else if (this.lock.tryLock()) {
            try {
                this.flushLoop(isClose, timeout, context);
            }
            finally {
                this.lock.unlock();
            }
        } else {
            LOGGER.verbose("Batch already in-flight and not waiting for completion. Performing no-op.");
        }
    }

    private void flushLoop(boolean isClosed, Duration timeout, Context context) {
        if (timeout != null && !timeout.isNegative() && !timeout.isZero()) {
            AtomicReference batchActions = new AtomicReference();
            Future<?> future = EXECUTOR.submit(() -> this.flushLoopHelper(isClosed, context, batchActions));
            try {
                CoreUtils.getResultWithTimeout(future, (Duration)timeout);
            }
            catch (ExecutionException e) {
                Throwable realCause = e.getCause();
                if (realCause instanceof Error) {
                    throw (Error)realCause;
                }
                if (realCause instanceof RuntimeException) {
                    throw LOGGER.logExceptionAsError((RuntimeException)realCause);
                }
                throw LOGGER.logExceptionAsError(new RuntimeException(realCause));
            }
            catch (InterruptedException e) {
                throw LOGGER.logExceptionAsError(new RuntimeException(e));
            }
            catch (TimeoutException e) {
                this.documentManager.reinsertCancelledActions((List)batchActions.get());
                throw LOGGER.logExceptionAsError(new RuntimeException(e));
            }
        } else {
            this.flushLoopHelper(isClosed, context, null);
        }
    }

    private void flushLoopHelper(boolean isClosed, Context context, AtomicReference<List<TryTrackingIndexAction<T>>> batchActions) {
        List<TryTrackingIndexAction<T>> batch = this.documentManager.tryCreateBatch(this.batchSize, true);
        if (batchActions != null) {
            batchActions.set(batch);
        }
        IndexBatchResponse response = this.processBatch(batch, context);
        while (response != null && (batch = this.documentManager.tryCreateBatch(this.batchSize, isClosed)) != null) {
            if (batchActions != null) {
                batchActions.set(batch);
            }
            response = this.processBatch(batch, context);
        }
    }

    private IndexBatchResponse processBatch(List<TryTrackingIndexAction<T>> batchActions, Context context) {
        if (CoreUtils.isNullOrEmpty(batchActions)) {
            return null;
        }
        List<IndexAction> convertedActions = batchActions.stream().map(action -> IndexActionConverter.map(action.getAction(), (ObjectSerializer)this.serializer)).collect(Collectors.toList());
        IndexBatchResponse response = this.sendBatch(convertedActions, batchActions, context);
        this.handleResponse(batchActions, response);
        return response;
    }

    private IndexBatchResponse sendBatch(List<IndexAction> actions, List<TryTrackingIndexAction<T>> batchActions, Context context) {
        LOGGER.verbose("Sending a batch of size {}.", new Object[]{batchActions.size()});
        if (this.onActionSent != null) {
            batchActions.forEach(action -> this.onActionSent.accept(new OnActionSentOptions(action.getAction())));
        }
        if (!this.currentRetryDelay.isZero() && !this.currentRetryDelay.isNegative()) {
            SearchIndexingPublisher.sleep(this.currentRetryDelay.toMillis());
        }
        try {
            Response<IndexDocumentsResult> batchCall = Utility.indexDocumentsWithResponse(this.restClient, actions, true, context, LOGGER);
            return new IndexBatchResponse(batchCall.getStatusCode(), ((IndexDocumentsResult)batchCall.getValue()).getResults(), actions.size(), false);
        }
        catch (IndexBatchException exception) {
            return new IndexBatchResponse(207, exception.getIndexingResults(), actions.size(), true);
        }
        catch (HttpResponseException exception) {
            int statusCode = exception.getResponse().getStatusCode();
            if (statusCode == 413) {
                int previousBatchSize = Math.min(this.batchSize, actions.size());
                this.batchSize = Math.max(1, this.scaleDownFunction.apply(previousBatchSize));
                LOGGER.verbose("Scaling down batch size due to 413 (Payload too large) response.{}Scaled down from {} to {}", new Object[]{System.lineSeparator(), previousBatchSize, this.batchSize});
                int actionCount = actions.size();
                if (actionCount == 1) {
                    return new IndexBatchResponse(statusCode, null, actionCount, true);
                }
                int splitOffset = Math.min(actions.size(), this.batchSize);
                List<TryTrackingIndexAction<T>> batchActionsToRemove = batchActions.subList(splitOffset, batchActions.size());
                this.documentManager.reinsertFailedActions(batchActionsToRemove);
                batchActionsToRemove.clear();
                return this.sendBatch(actions.subList(0, splitOffset), batchActions, context);
            }
            return new IndexBatchResponse(statusCode, null, actions.size(), true);
        }
        catch (Exception e) {
            return new IndexBatchResponse(0, null, actions.size(), true);
        }
    }

    private void handleResponse(List<TryTrackingIndexAction<T>> actions, IndexBatchResponse batchResponse) {
        boolean has503;
        if (batchResponse.getStatusCode() == 413 && batchResponse.getCount() == 1) {
            com.azure.search.documents.models.IndexAction<T> action = actions.get(0).getAction();
            if (this.onActionError != null) {
                this.onActionError.accept(new OnActionErrorOptions<T>(action).setThrowable(SearchBatchingUtils.createDocumentTooLargeException()));
            }
            return;
        }
        LinkedList actionsToRetry = new LinkedList();
        boolean bl = has503 = batchResponse.getStatusCode() == 503;
        if (batchResponse.getResults() == null) {
            actionsToRetry.addAll(actions);
        } else {
            for (IndexingResult result : batchResponse.getResults()) {
                String key = result.getKey();
                TryTrackingIndexAction action = actions.stream().filter(a -> key.equals(a.getKey())).findFirst().orElse(null);
                if (action == null) {
                    LOGGER.warning("Unable to correlate result key {} to initial document.", new Object[]{key});
                    continue;
                }
                if (SearchBatchingUtils.isSuccess(result.getStatusCode())) {
                    if (this.onActionSucceeded == null) continue;
                    this.onActionSucceeded.accept(new OnActionSucceededOptions(action.getAction()));
                    continue;
                }
                if (SearchBatchingUtils.isRetryable(result.getStatusCode())) {
                    has503 |= result.getStatusCode() == 503;
                    if (action.getTryCount() < this.maxRetries) {
                        action.incrementTryCount();
                        actionsToRetry.add(action);
                        continue;
                    }
                    if (this.onActionError == null) continue;
                    this.onActionError.accept(new OnActionErrorOptions(action.getAction()).setThrowable(SearchBatchingUtils.createDocumentHitRetryLimitException()).setIndexingResult(result));
                    continue;
                }
                if (this.onActionError == null) continue;
                this.onActionError.accept(new OnActionErrorOptions(action.getAction()).setIndexingResult(result));
            }
        }
        if (has503) {
            this.currentRetryDelay = SearchBatchingUtils.calculateRetryDelay(this.backoffCount.getAndIncrement(), this.throttlingDelayNanos, this.maxThrottlingDelayNanos);
        } else {
            this.backoffCount.set(0);
            this.currentRetryDelay = Duration.ZERO;
        }
        if (!CoreUtils.isNullOrEmpty(actionsToRetry)) {
            this.documentManager.reinsertFailedActions(actionsToRetry);
        }
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private static ExecutorService getThreadPoolWithShutdownHook() {
        return CoreUtils.addShutdownHookSafely((ExecutorService)Executors.newCachedThreadPool(), (Duration)Duration.ofSeconds(5L));
    }
}

