/*
 * Decompiled with CFR 0.152.
 */
package com.mageddo.tobby;

import com.mageddo.db.DB;
import com.mageddo.db.DuplicatedRecordException;
import com.mageddo.tobby.ProducedRecord;
import com.mageddo.tobby.ProducerRecord;
import com.mageddo.tobby.RecordDAO;
import com.mageddo.tobby.UncheckedSQLException;
import com.mageddo.tobby.converter.HeadersConverter;
import com.mageddo.tobby.converter.ProducedRecordConverter;
import com.mageddo.tobby.internal.utils.Base64;
import com.mageddo.tobby.internal.utils.StopWatch;
import com.mageddo.tobby.internal.utils.Validator;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecordDAOGeneric
implements RecordDAO {
    private static final Logger log = LoggerFactory.getLogger(RecordDAOGeneric.class);
    private final DB db;
    private final ExecutorService pool;

    public RecordDAOGeneric(DB db, ExecutorService pool) {
        this.db = db;
        this.pool = pool;
    }

    @Override
    public ProducedRecord save(Connection connection, ProducerRecord record) {
        StopWatch stopWatch = StopWatch.createStarted();
        StringBuilder sql = new StringBuilder().append("INSERT INTO TTO_RECORD ( \n").append("  IDT_TTO_RECORD, NAM_TOPIC, NUM_PARTITION, \n").append("  TXT_KEY, TXT_VALUE, TXT_HEADERS \n").append(") VALUES ( \n").append("  ?, ?, ?, \n").append("  ?, ?, ? \n").append(") \n");
        try {
            ProducedRecord producedRecord;
            block12: {
                PreparedStatement stm = connection.prepareStatement(sql.toString());
                try {
                    UUID id = this.fillStatement(record, stm);
                    stm.executeUpdate();
                    producedRecord = ProducedRecordConverter.from(id, record);
                    if (stm == null) break block12;
                }
                catch (Throwable throwable) {
                    try {
                        if (stm != null) {
                            try {
                                stm.close();
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                            }
                        }
                        throw throwable;
                    }
                    catch (SQLException e) {
                        throw new UncheckedSQLException(e);
                    }
                }
                stm.close();
            }
            return producedRecord;
        }
        finally {
            if (log.isTraceEnabled()) {
                log.trace("status=save, statementTime={}", (Object)stopWatch.getTime());
            }
        }
    }

    /*
     * Enabled aggressive exception aggregation
     */
    @Override
    public ProducedRecord find(Connection connection, UUID id) {
        String sql = "SELECT * FROM TTO_RECORD WHERE IDT_TTO_RECORD = ?";
        try (PreparedStatement stm = connection.prepareStatement("SELECT * FROM TTO_RECORD WHERE IDT_TTO_RECORD = ?");){
            ProducedRecord producedRecord;
            block18: {
                ResultSet rs;
                block16: {
                    ProducedRecord producedRecord2;
                    block17: {
                        stm.setString(1, id.toString());
                        rs = stm.executeQuery();
                        try {
                            if (!rs.next()) break block16;
                            producedRecord2 = ProducedRecordConverter.map(rs);
                            if (rs == null) break block17;
                        }
                        catch (Throwable throwable) {
                            if (rs != null) {
                                try {
                                    rs.close();
                                }
                                catch (Throwable throwable2) {
                                    throwable.addSuppressed(throwable2);
                                }
                            }
                            throw throwable;
                        }
                        rs.close();
                    }
                    return producedRecord2;
                }
                producedRecord = null;
                if (rs == null) break block18;
                rs.close();
            }
            return producedRecord;
        }
        catch (SQLException e) {
            throw new UncheckedSQLException(e);
        }
    }

    @Override
    public void iterateNotProcessedRecordsUsingInsertIdempotence(Connection connection, int fetchSize, Consumer<ProducedRecord> consumer, LocalDateTime from) {
        String sql = "SELECT * FROM TTO_RECORD R \n" + "WHERE DAT_CREATED > ? \n" + "AND DAT_CREATED < ? \n" + "AND NOT EXISTS ( \n" + "  SELECT 1 FROM TTO_RECORD_PROCESSED \n" + "  WHERE IDT_TTO_RECORD = R.IDT_TTO_RECORD \n" + "  AND DAT_CREATED > ? \n" + "  AND DAT_CREATED < ? \n" + ") \n" + "ORDER BY DAT_CREATED ASC \n";
        StopWatch stopWatch = StopWatch.createStarted();
        try (PreparedStatement stm = this.createStreamingStatement(connection, sql, fetchSize);){
            LocalDateTime to = LocalDateTime.now().plusDays(2L);
            Timestamp toTimestamp = Timestamp.valueOf(to);
            stm.setTimestamp(1, Timestamp.valueOf(from));
            stm.setTimestamp(2, toTimestamp);
            stm.setTimestamp(3, Timestamp.valueOf(from));
            stm.setTimestamp(4, toTimestamp);
            try (ResultSet rs = stm.executeQuery();){
                log.info("status=queryExecuted, time={}, from={}, to={}", new Object[]{stopWatch.getDisplayTime(), from, to});
                while (rs.next()) {
                    consumer.accept(ProducedRecordConverter.map(rs));
                }
            }
        }
        catch (SQLException e) {
            throw new UncheckedSQLException(e);
        }
    }

    @Override
    public void acquireInserting(Connection connection, UUID id) {
        String sql = "INSERT INTO TTO_RECORD_PROCESSED (IDT_TTO_RECORD) VALUES (?)";
        try (PreparedStatement stm = connection.prepareStatement("INSERT INTO TTO_RECORD_PROCESSED (IDT_TTO_RECORD) VALUES (?)");){
            stm.setString(1, String.valueOf(id));
            stm.executeUpdate();
        }
        catch (SQLException e) {
            throw DuplicatedRecordException.check(this.db, id, e);
        }
    }

    @Override
    public void iterateOverRecords(Connection connection, int fetchSize, Consumer<ProducedRecord> consumer) {
        StopWatch stopWatch = StopWatch.createStarted();
        try (PreparedStatement stm = this.createStreamingStatement(connection, "SELECT * FROM TTO_RECORD", fetchSize);
             ResultSet rs = stm.executeQuery();){
            if (log.isDebugEnabled()) {
                log.debug("status=queryExecuted, time={}", (Object)stopWatch.getDisplayTime());
            }
            while (rs.next()) {
                consumer.accept(ProducedRecordConverter.map(rs));
            }
        }
        catch (SQLException e) {
            throw new UncheckedSQLException(e);
        }
    }

    @Override
    public void acquireDeletingUsingThreads(Connection connection, List<UUID> recordIds) {
        if (recordIds.isEmpty()) {
            if (log.isTraceEnabled()) {
                log.trace("m=acquireDeletingUsingThreads, status=noRecordsToDelete");
            }
            return;
        }
        StopWatch stopWatch = StopWatch.createStarted();
        List<Future> promises = recordIds.stream().map(id -> this.pool.submit(() -> this.acquireDeleting(connection, (UUID)id))).collect(Collectors.toList());
        promises.forEach(future -> {
            try {
                future.get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new UncheckedSQLException(new SQLException(e));
            }
        });
        if (log.isDebugEnabled()) {
            log.debug("status=acquireDeletingUsingThreads, records={}, time={}", (Object)recordIds.size(), (Object)stopWatch.getDisplayTime());
        }
    }

    @Override
    public void acquireDeletingUsingIn(Connection connection, List<UUID> recordIds) {
        StopWatch stopWatch = StopWatch.createStarted();
        if (recordIds.isEmpty()) {
            if (log.isTraceEnabled()) {
                log.trace("status=noRecordsToDelete");
            }
            return;
        }
        int skip = 0;
        while (true) {
            List<String> params = this.subList(recordIds, skip);
            StringBuilder sql = new StringBuilder("DELETE FROM TTO_RECORD WHERE IDT_TTO_RECORD IN (").append(this.buildBinds(params)).append(")");
            if (params.isEmpty()) break;
            try (PreparedStatement stm = connection.prepareStatement(sql.toString());){
                skip += params.size();
                for (int i = 1; i <= params.size(); ++i) {
                    stm.setString(i, params.get(i - 1));
                }
                int affected = stm.executeUpdate();
                Validator.isTrue(affected == params.size(), "Didn't delete all records, expected=%d, actual=%d", params.size(), affected);
            }
            catch (SQLException e) {
                throw new UncheckedSQLException(e);
            }
            if (!log.isDebugEnabled()) continue;
            log.debug("status=acquireDeleteUsingIn, records={}, time={}", (Object)recordIds.size(), (Object)stopWatch.getDisplayTime());
        }
    }

    @Override
    public void acquireDeletingUsingBatch(Connection connection, List<UUID> recordIds) {
        if (recordIds.isEmpty()) {
            if (log.isTraceEnabled()) {
                log.trace("status=noRecordsToDelete");
            }
            return;
        }
        StopWatch stopWatch = StopWatch.createStarted();
        try (Statement stm = connection.createStatement();){
            for (UUID recordId : recordIds) {
                stm.addBatch(String.format("DELETE FROM TTO_RECORD WHERE IDT_TTO_RECORD = '%s'", recordId));
            }
            int affected = stm.executeBatch().length;
            Validator.isTrue(affected == recordIds.size(), "Couldn't delete records, expected=%d, records=%s", affected, recordIds);
        }
        catch (SQLException e) {
            throw new UncheckedSQLException(e);
        }
        if (log.isDebugEnabled()) {
            log.debug("status=batchDeleted, records={}, time={}", (Object)recordIds.size(), (Object)stopWatch.getDisplayTime());
        }
    }

    @Override
    public void acquireDeleting(Connection connection, UUID id) {
        String sql = "DELETE FROM TTO_RECORD WHERE IDT_TTO_RECORD = ? ";
        try (PreparedStatement stm = connection.prepareStatement("DELETE FROM TTO_RECORD WHERE IDT_TTO_RECORD = ? ");){
            stm.setString(1, String.valueOf(id));
            int affected = stm.executeUpdate();
            if (log.isTraceEnabled()) {
                log.trace("m=acquireDeleting, status=deleted, id={}, affected={}", (Object)id, (Object)affected);
            }
            Validator.isTrue(affected == 1, "Couldn't delete record: %s", id);
        }
        catch (SQLException e) {
            throw new UncheckedSQLException(e);
        }
    }

    private PreparedStatement createStreamingStatement(Connection con, String sql, int fetchSize) throws SQLException {
        PreparedStatement stm = con.prepareStatement(sql, 1003, 1007);
        stm.setFetchSize(fetchSize);
        return stm;
    }

    private UUID fillStatement(ProducerRecord record, PreparedStatement stm) throws SQLException {
        UUID id = UUID.randomUUID();
        stm.setString(1, id.toString());
        stm.setString(2, record.getTopic());
        stm.setObject(3, record.getPartition());
        stm.setString(4, Base64.encodeToString(record.getKey()));
        stm.setString(5, Base64.encodeToString(record.getValue()));
        stm.setString(6, HeadersConverter.encodeBase64(record.getHeaders()));
        return id;
    }

    private List<String> subList(List<UUID> recordIds, int skip) {
        return recordIds.stream().skip(skip).limit(999L).map(UUID::toString).collect(Collectors.toList());
    }

    private String buildBinds(List<String> params) {
        return params.stream().map(it -> "?").collect(Collectors.joining(", "));
    }
}

