/*
 * Decompiled with CFR 0.152.
 */
package com.mageddo.tobby.replicator.idempotencestrategy.batchdelete;

import com.mageddo.db.ConnectionUtils;
import com.mageddo.tobby.ProducedRecord;
import com.mageddo.tobby.RecordDAO;
import com.mageddo.tobby.internal.utils.BatchThread;
import com.mageddo.tobby.internal.utils.StopWatch;
import com.mageddo.tobby.internal.utils.Threads;
import com.mageddo.tobby.replicator.BatchSender;
import com.mageddo.tobby.replicator.Replicator;
import com.mageddo.tobby.replicator.ReplicatorConfig;
import com.mageddo.tobby.replicator.StreamingIterator;
import com.mageddo.tobby.replicator.idempotencestrategy.batchdelete.DeleteMode;
import com.mageddo.tobby.replicator.idempotencestrategy.batchdelete.RecordDeleter;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.sql.DataSource;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class BatchParallelDeleteIdempotenceBasedReplicator
implements Replicator,
StreamingIterator {
    private static final Logger log = LoggerFactory.getLogger(BatchParallelDeleteIdempotenceBasedReplicator.class);
    private final List<ProducedRecord> buffer = new ArrayList<ProducedRecord>();
    private final RecordDAO recordDAO;
    private final DataSource dataSource;
    private final BatchSender batchSender;
    private final ExecutorService pool;
    private final RecordDeleter recordDeleter;
    private final Config config;

    @Inject
    public BatchParallelDeleteIdempotenceBasedReplicator(RecordDAO recordDAO, DataSource dataSource, BatchSender batchSender, RecordDeleter recordDeleter, Config config) {
        this.recordDAO = recordDAO;
        this.dataSource = dataSource;
        this.batchSender = batchSender;
        this.recordDeleter = recordDeleter;
        this.config = config;
        this.pool = Threads.newPool(config.getThreads());
    }

    @Override
    public boolean send(ProducedRecord record) {
        boolean exhausted;
        this.buffer.add(record);
        boolean bl = exhausted = this.buffer.size() >= this.config.getBufferSize();
        if (exhausted) {
            this.flush();
        }
        return exhausted;
    }

    @Override
    public void flush() {
        List<ProducedRecord> records;
        if (this.buffer.isEmpty()) {
            if (log.isTraceEnabled()) {
                log.trace("status=nothingToFlush");
            }
            return;
        }
        StopWatch flushTimer = StopWatch.createStarted();
        int from = 0;
        BatchThread<Void> batchThread = new BatchThread<Void>();
        while (!(records = this.subList(from)).isEmpty()) {
            from += records.size();
            batchThread.add(() -> {
                try (Connection connection = this.dataSource.getConnection();){
                    ConnectionUtils.useTransaction(connection, () -> {
                        StopWatch stopWatch = StopWatch.createStarted();
                        this.recordDeleter.delete(connection, records, this.config.getDeleteMode());
                        this.batchSender.send(records);
                        if (log.isDebugEnabled()) {
                            log.debug("status=replicated, records={}, time={}", (Object)records.size(), (Object)stopWatch.getDisplayTime());
                        }
                    });
                    Void void_ = null;
                    return void_;
                }
            });
        }
        Threads.executeAndGet(this.pool, batchThread.getCallables());
        if (log.isDebugEnabled()) {
            log.debug("status=flushed, records={}, time={}", (Object)this.buffer.size(), (Object)flushTimer.getDisplayTime());
        }
        this.buffer.clear();
    }

    @Override
    public int iterate(Connection readConn) {
        AtomicInteger counter = new AtomicInteger();
        this.recordDAO.iterateOverRecords(readConn, this.config.getFetchSize(), record -> {
            counter.incrementAndGet();
            this.send((ProducedRecord)record);
        });
        this.flush();
        return counter.get();
    }

    private List<ProducedRecord> subList(int from) {
        return this.buffer.stream().skip(from).limit(this.config.getThreadBufferSize()).collect(Collectors.toList());
    }

    public static final class Config {
        @NonNull
        private final DeleteMode deleteMode;
        private final int fetchSize;
        private final int bufferSize;
        private final int threadBufferSize;
        private final int threads;

        public static Config from(ReplicatorConfig config) {
            return Config.builder().fetchSize(config.getFetchSize()).bufferSize(config.getInt("replicators.batch-parallel-delete.buffer-size")).deleteMode(DeleteMode.valueOf(config.get("replicators.batch-parallel-delete.delete-mode"))).threads(config.getInt("replicators.batch-parallel-delete.threads")).threadBufferSize(config.getInt("replicators.batch-parallel-delete.thread-buffer-size")).build();
        }

        Config(@NonNull DeleteMode deleteMode, int fetchSize, int bufferSize, int threadBufferSize, int threads) {
            if (deleteMode == null) {
                throw new NullPointerException("deleteMode is marked non-null but is null");
            }
            this.deleteMode = deleteMode;
            this.fetchSize = fetchSize;
            this.bufferSize = bufferSize;
            this.threadBufferSize = threadBufferSize;
            this.threads = threads;
        }

        public static ConfigBuilder builder() {
            return new ConfigBuilder();
        }

        @NonNull
        public DeleteMode getDeleteMode() {
            return this.deleteMode;
        }

        public int getFetchSize() {
            return this.fetchSize;
        }

        public int getBufferSize() {
            return this.bufferSize;
        }

        public int getThreadBufferSize() {
            return this.threadBufferSize;
        }

        public int getThreads() {
            return this.threads;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Config)) {
                return false;
            }
            Config other = (Config)o;
            if (this.getFetchSize() != other.getFetchSize()) {
                return false;
            }
            if (this.getBufferSize() != other.getBufferSize()) {
                return false;
            }
            if (this.getThreadBufferSize() != other.getThreadBufferSize()) {
                return false;
            }
            if (this.getThreads() != other.getThreads()) {
                return false;
            }
            DeleteMode this$deleteMode = this.getDeleteMode();
            DeleteMode other$deleteMode = other.getDeleteMode();
            return !(this$deleteMode == null ? other$deleteMode != null : !((Object)((Object)this$deleteMode)).equals((Object)other$deleteMode));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + this.getFetchSize();
            result = result * 59 + this.getBufferSize();
            result = result * 59 + this.getThreadBufferSize();
            result = result * 59 + this.getThreads();
            DeleteMode $deleteMode = this.getDeleteMode();
            result = result * 59 + ($deleteMode == null ? 43 : ((Object)((Object)$deleteMode)).hashCode());
            return result;
        }

        public String toString() {
            return "BatchParallelDeleteIdempotenceBasedReplicator.Config(deleteMode=" + (Object)((Object)this.getDeleteMode()) + ", fetchSize=" + this.getFetchSize() + ", bufferSize=" + this.getBufferSize() + ", threadBufferSize=" + this.getThreadBufferSize() + ", threads=" + this.getThreads() + ")";
        }

        public static class ConfigBuilder {
            private DeleteMode deleteMode;
            private int fetchSize;
            private int bufferSize;
            private int threadBufferSize;
            private int threads;

            ConfigBuilder() {
            }

            public ConfigBuilder deleteMode(@NonNull DeleteMode deleteMode) {
                if (deleteMode == null) {
                    throw new NullPointerException("deleteMode is marked non-null but is null");
                }
                this.deleteMode = deleteMode;
                return this;
            }

            public ConfigBuilder fetchSize(int fetchSize) {
                this.fetchSize = fetchSize;
                return this;
            }

            public ConfigBuilder bufferSize(int bufferSize) {
                this.bufferSize = bufferSize;
                return this;
            }

            public ConfigBuilder threadBufferSize(int threadBufferSize) {
                this.threadBufferSize = threadBufferSize;
                return this;
            }

            public ConfigBuilder threads(int threads) {
                this.threads = threads;
                return this;
            }

            public Config build() {
                return new Config(this.deleteMode, this.fetchSize, this.bufferSize, this.threadBufferSize, this.threads);
            }

            public String toString() {
                return "BatchParallelDeleteIdempotenceBasedReplicator.Config.ConfigBuilder(deleteMode=" + (Object)((Object)this.deleteMode) + ", fetchSize=" + this.fetchSize + ", bufferSize=" + this.bufferSize + ", threadBufferSize=" + this.threadBufferSize + ", threads=" + this.threads + ")";
            }
        }
    }
}

