/*
 * Decompiled with CFR 0.152.
 */
package com.mageddo.tobby.replicator;

import com.mageddo.db.ConnectionUtils;
import com.mageddo.db.QueryTimeoutException;
import com.mageddo.tobby.Locker;
import com.mageddo.tobby.UncheckedSQLException;
import com.mageddo.tobby.internal.utils.StopWatch;
import com.mageddo.tobby.replicator.BufferedReplicator;
import com.mageddo.tobby.replicator.IteratorFactory;
import com.mageddo.tobby.replicator.ReplicatorConfig;
import com.mageddo.tobby.replicator.ReplicatorContextVars;
import com.mageddo.tobby.replicator.StreamingIterator;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Replicators {
    private static final Logger log = LoggerFactory.getLogger(Replicators.class);
    private final ReplicatorConfig config;
    private final IteratorFactory iteratorFactory;
    private final Locker locker;

    public Replicators(ReplicatorConfig config, IteratorFactory iteratorFactory, Locker locker) {
        this.config = config;
        this.iteratorFactory = iteratorFactory;
        this.locker = locker;
    }

    public boolean replicateLocking() {
        boolean bl;
        block9: {
            log.info("status=replicateLocking");
            DataSource dataSource = this.config.getDataSource();
            Connection conn = dataSource.getConnection();
            try {
                ConnectionUtils.useTransaction(conn, () -> {
                    this.locker.lock(conn);
                    this.replicate(conn);
                });
                bl = true;
                if (conn == null) break block9;
            }
            catch (Throwable throwable) {
                try {
                    if (conn != null) {
                        try {
                            conn.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (QueryTimeoutException e) {
                    log.info("status=lostLocking");
                    return false;
                }
                catch (SQLException e) {
                    throw new UncheckedSQLException(e);
                }
            }
            conn.close();
        }
        return bl;
    }

    public void replicate() {
        this.replicate(null);
    }

    void replicate(Connection readConn) {
        LocalDateTime lastTimeProcessed = LocalDateTime.now();
        int totalProcessed = 0;
        log.info("status=replication-started");
        int wave = 1;
        while (true) {
            StopWatch stopWatch = StopWatch.createStarted();
            int processed = this.processWave(wave, readConn);
            if (processed != 0) {
                lastTimeProcessed = LocalDateTime.now();
            }
            totalProcessed += processed;
            if (stopWatch.getDuration().toMillis() >= 1000L) {
                log.info("wave={}, status=wave-ended, processed={}, time={}, avg={}, totalProcessed={}", new Object[]{StopWatch.display(wave), StopWatch.display(processed), stopWatch.getDisplayTime(), this.safeDivide(stopWatch, processed), StopWatch.display(totalProcessed)});
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("wave={}, status=wave-ended, processed={}, time={}, totalProcessed={}", new Object[]{StopWatch.display(wave), StopWatch.display(processed), stopWatch.getDisplayTime(), StopWatch.display(totalProcessed)});
                }
                if (wave % 10000 == 0) {
                    log.info("wave={}, status=waveReporting, totalProcessed={}", (Object)StopWatch.display(wave), (Object)StopWatch.display(totalProcessed));
                }
            }
            ReplicatorContextVars contextVars = ReplicatorContextVars.builder().waveDuration(stopWatch.getDuration()).waveProcessed(processed).durationSinceLastTimeProcessed(Duration.ofMillis(this.millisPassed(lastTimeProcessed))).wave(wave).build();
            if (this.shouldStop(contextVars)) {
                log.info("status=replicatorIsConsideredIdle, action=exiting, contextVars={}", (Object)contextVars);
                return;
            }
            if (!this.shouldRun(lastTimeProcessed)) {
                log.info("status=idleTimedOut, lastTimeProcessed={}, idleTimeout={}", (Object)lastTimeProcessed, (Object)this.config.getIdleTimeout());
                return;
            }
            ++wave;
        }
    }

    private boolean shouldStop(ReplicatorContextVars contextVars) {
        return this.config.getStopPredicate().test(contextVars);
    }

    private long safeDivide(StopWatch stopWatch, int processed) {
        if (processed == 0) {
            return 0L;
        }
        return stopWatch.getTime() / (long)processed;
    }

    private int processWave(int wave, Connection readConnParam) {
        if (log.isDebugEnabled()) {
            log.debug("wave={}, status=loading-wave", (Object)wave);
        }
        Connection readConn = this.chooseReadConnection(readConnParam);
        try {
            int n;
            block13: {
                Connection writeConn = this.getConnection();
                try {
                    n = ConnectionUtils.useTransaction(writeConn, () -> {
                        BufferedReplicator bufferedReplicator = new BufferedReplicator(this.config.getProducer(), this.config.getBufferSize(), wave);
                        StreamingIterator replicator = this.iteratorFactory.create(bufferedReplicator, readConn, writeConn, this.config);
                        return replicator.iterate(readConn);
                    });
                    if (writeConn == null) break block13;
                }
                catch (Throwable throwable) {
                    try {
                        if (writeConn != null) {
                            try {
                                writeConn.close();
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                            }
                        }
                        throw throwable;
                    }
                    catch (SQLException e) {
                        throw new UncheckedSQLException(e);
                    }
                }
                writeConn.close();
            }
            return n;
        }
        finally {
            if (readConnParam == null) {
                ConnectionUtils.quietClose(readConn);
            }
        }
    }

    private Connection chooseReadConnection(Connection readConnParam) {
        if (readConnParam != null) {
            return readConnParam;
        }
        return this.getConnection();
    }

    private Connection getConnection() {
        try {
            return this.config.getDataSource().getConnection();
        }
        catch (SQLException e) {
            throw new UncheckedSQLException(e);
        }
    }

    private boolean shouldRun(LocalDateTime lastTimeProcessed) {
        return this.config.getIdleTimeout() == Duration.ZERO || this.millisPassed(lastTimeProcessed) < this.config.getIdleTimeout().toMillis();
    }

    private long millisPassed(LocalDateTime lastTimeProcessed) {
        return ChronoUnit.MILLIS.between(lastTimeProcessed, LocalDateTime.now());
    }
}

