/*
 * Decompiled with CFR 0.152.
 */
package com.mageddo.tobby.replicator;

import com.mageddo.tobby.ProducedRecord;
import com.mageddo.tobby.internal.utils.StopWatch;
import com.mageddo.tobby.producer.kafka.converter.ProducedRecordConverter;
import com.mageddo.tobby.replicator.RecordSend;
import com.mageddo.tobby.replicator.Replicator;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BufferedReplicator
implements Replicator {
    private static final Logger log = LoggerFactory.getLogger(BufferedReplicator.class);
    private final Producer<byte[], byte[]> producer;
    private final List<ProducedRecord> buffer;
    private final int maxBufferSize;
    private final int wave;
    private final StopWatch stopWatch;

    public BufferedReplicator(Producer<byte[], byte[]> producer, int bufferSize, int wave) {
        this.producer = producer;
        this.wave = wave;
        this.maxBufferSize = bufferSize;
        this.buffer = new ArrayList<ProducedRecord>(this.maxBufferSize);
        this.stopWatch = new StopWatch();
    }

    @Override
    public boolean send(ProducedRecord record) {
        this.buffer.add(record);
        if (this.buffer.size() < this.maxBufferSize) {
            if (log.isTraceEnabled()) {
                log.trace("status=addToBuffer, id={}", (Object)record.getId());
            }
            return false;
        }
        return true;
    }

    @Override
    public void flush() {
        long elapsedTimeSinceLastFlush = this.getTimeSinceLastFlush();
        this.stopWatch.reset();
        if (this.buffer.isEmpty()) {
            if (log.isTraceEnabled()) {
                log.trace("status=noBuffer, wave={}, records={}, time={}", new Object[]{this.wave, this.buffer.size(), StopWatch.display(elapsedTimeSinceLastFlush)});
            }
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("status=sending, wave={}, records={}", (Object)this.wave, (Object)this.buffer.size());
        }
        while (true) {
            try {
                StopWatch recordStopWatch = StopWatch.createStarted();
                ArrayList<RecordSend> futures = new ArrayList<RecordSend>();
                for (ProducedRecord producedRecord : this.buffer) {
                    futures.add(new RecordSend(producedRecord, this.producer.send(ProducedRecordConverter.toKafkaProducerRecord(producedRecord))));
                }
                if (log.isDebugEnabled()) {
                    log.debug("status=kafkaSendScheduled, wave={}, records={}, time={}", new Object[]{this.wave, this.buffer.size(), recordStopWatch.getTime()});
                }
                for (RecordSend future : futures) {
                    future.getFuture().get();
                }
                long produceTime = recordStopWatch.getSplitTime();
                recordStopWatch.split();
                if (log.isDebugEnabled()) {
                    log.debug("status=kafkaSendFlushed, wave={}, records={}, time={}", new Object[]{this.wave, this.buffer.size(), produceTime});
                }
                if (this.buffer.size() > 1000) {
                    log.info("wave={}, quantity={}, status=kafkaSendFlushed, timeSinceLastFlush={}, produceTime={}, recordsTime={}", new Object[]{this.wave, this.buffer.size(), StopWatch.display(elapsedTimeSinceLastFlush), StopWatch.display(produceTime), recordStopWatch.getTime()});
                }
                this.buffer.clear();
            }
            catch (InterruptedException | ExecutionException e) {
                log.warn("wave={}, status=failed-to-post-to-kafka, msg={}", new Object[]{this.wave, e.getMessage(), e});
                continue;
            }
            break;
        }
    }

    private long getTimeSinceLastFlush() {
        if (!this.stopWatch.isStarted()) {
            return 0L;
        }
        long time = this.stopWatch.getTime();
        this.stopWatch.reset();
        return time;
    }

    public int size() {
        return this.buffer.size();
    }

    public List<ProducedRecord> getBuffer() {
        return this.buffer;
    }
}

