/*
 * Decompiled with CFR 0.152.
 */
package com.mageddo.tobby.replicator;

import com.mageddo.tobby.ProducedRecord;
import com.mageddo.tobby.internal.utils.StopWatch;
import com.mageddo.tobby.internal.utils.Threads;
import com.mageddo.tobby.internal.utils.UncheckedExecutionException;
import com.mageddo.tobby.producer.kafka.converter.ProducedRecordConverter;
import java.util.List;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.kafka.clients.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class BatchSender {
    private static final Logger log = LoggerFactory.getLogger(BatchSender.class);
    private final Producer<byte[], byte[]> producer;

    @Inject
    public BatchSender(Producer<byte[], byte[]> producer) {
        this.producer = producer;
    }

    public void send(List<ProducedRecord> records) {
        if (records.isEmpty()) {
            if (log.isTraceEnabled()) {
                log.trace("status=noBuffer, records={}", (Object)records.size());
            }
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("status=sending, records={}", (Object)records.size());
        }
        while (true) {
            try {
                StopWatch stopWatch = StopWatch.createStarted();
                List futures = records.stream().map(it -> this.producer.send(ProducedRecordConverter.toKafkaProducerRecord(it))).collect(Collectors.toList());
                Threads.get(futures);
                log.debug("status=sent, time={}", (Object)stopWatch.getTime());
            }
            catch (UncheckedExecutionException e) {
                log.warn("status=failed-to-post-to-kafka, msg={}", (Object)e.getMessage(), (Object)e);
                continue;
            }
            break;
        }
    }
}

