/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.sink.elasticsearch;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.BucketPath;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
import org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory;
import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
import org.apache.flume.sink.elasticsearch.client.ElasticSearchClient;
import org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticSearchSink
extends AbstractSink
implements Configurable {
    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchSink.class);
    private boolean isLocal = false;
    private final CounterGroup counterGroup = new CounterGroup();
    private static final int defaultBatchSize = 100;
    private int batchSize = 100;
    private long ttlMs = -1L;
    private String clusterName = "elasticsearch";
    private String indexName = "flume";
    private String indexType = "log";
    private String clientType = "transport";
    private final Pattern pattern = Pattern.compile("^(\\d+)(\\D*)", 2);
    private Matcher matcher = this.pattern.matcher("");
    private String[] serverAddresses = null;
    private ElasticSearchClient client = null;
    private Context elasticSearchClientContext = null;
    private ElasticSearchIndexRequestBuilderFactory indexRequestFactory;
    private ElasticSearchEventSerializer eventSerializer;
    private IndexNameBuilder indexNameBuilder;
    private SinkCounter sinkCounter;

    public ElasticSearchSink() {
        this(false);
    }

    @VisibleForTesting
    ElasticSearchSink(boolean isLocal) {
        this.isLocal = isLocal;
    }

    @VisibleForTesting
    String[] getServerAddresses() {
        return this.serverAddresses;
    }

    @VisibleForTesting
    String getClusterName() {
        return this.clusterName;
    }

    @VisibleForTesting
    String getIndexName() {
        return this.indexName;
    }

    @VisibleForTesting
    String getIndexType() {
        return this.indexType;
    }

    @VisibleForTesting
    long getTTLMs() {
        return this.ttlMs;
    }

    @VisibleForTesting
    ElasticSearchEventSerializer getEventSerializer() {
        return this.eventSerializer;
    }

    @VisibleForTesting
    IndexNameBuilder getIndexNameBuilder() {
        return this.indexNameBuilder;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Sink.Status process() throws EventDeliveryException {
        Sink.Status status;
        block13: {
            logger.debug("processing...");
            status = Sink.Status.READY;
            Channel channel = this.getChannel();
            Transaction txn = channel.getTransaction();
            try {
                Event event;
                int count;
                txn.begin();
                for (count = 0; count < this.batchSize && (event = channel.take()) != null; ++count) {
                    String realIndexType = BucketPath.escapeString((String)this.indexType, (Map)event.getHeaders());
                    this.client.addEvent(event, this.indexNameBuilder, realIndexType, this.ttlMs);
                }
                if (count <= 0) {
                    this.sinkCounter.incrementBatchEmptyCount();
                    this.counterGroup.incrementAndGet("channel.underflow");
                    status = Sink.Status.BACKOFF;
                } else {
                    if (count < this.batchSize) {
                        this.sinkCounter.incrementBatchUnderflowCount();
                        status = Sink.Status.BACKOFF;
                    } else {
                        this.sinkCounter.incrementBatchCompleteCount();
                    }
                    this.sinkCounter.addToEventDrainAttemptCount((long)count);
                    this.client.execute();
                }
                txn.commit();
                this.sinkCounter.addToEventDrainSuccessCount((long)count);
                this.counterGroup.incrementAndGet("transaction.success");
            }
            catch (Throwable ex) {
                try {
                    txn.rollback();
                    this.counterGroup.incrementAndGet("transaction.rollback");
                }
                catch (Exception ex2) {
                    logger.error("Exception in rollback. Rollback might not have been successful.", (Throwable)ex2);
                }
                if (ex instanceof Error || ex instanceof RuntimeException) {
                    logger.error("Failed to commit transaction. Transaction rolled back.", ex);
                    Throwables.propagate((Throwable)ex);
                    break block13;
                }
                logger.error("Failed to commit transaction. Transaction rolled back.", ex);
                throw new EventDeliveryException("Failed to commit transaction. Transaction rolled back.", ex);
            }
            finally {
                txn.close();
            }
        }
        return status;
    }

    public void configure(Context context) {
        Context serializerContext;
        block18: {
            if (!this.isLocal) {
                if (StringUtils.isNotBlank((String)context.getString("hostNames"))) {
                    this.serverAddresses = StringUtils.deleteWhitespace((String)context.getString("hostNames")).split(",");
                }
                Preconditions.checkState((this.serverAddresses != null && this.serverAddresses.length > 0 ? 1 : 0) != 0, (Object)"Missing Param:hostNames");
            }
            if (StringUtils.isNotBlank((String)context.getString("indexName"))) {
                this.indexName = context.getString("indexName");
            }
            if (StringUtils.isNotBlank((String)context.getString("indexType"))) {
                this.indexType = context.getString("indexType");
            }
            if (StringUtils.isNotBlank((String)context.getString("clusterName"))) {
                this.clusterName = context.getString("clusterName");
            }
            if (StringUtils.isNotBlank((String)context.getString("batchSize"))) {
                this.batchSize = Integer.parseInt(context.getString("batchSize"));
            }
            if (StringUtils.isNotBlank((String)context.getString("ttl"))) {
                this.ttlMs = this.parseTTL(context.getString("ttl"));
                Preconditions.checkState((this.ttlMs > 0L ? 1 : 0) != 0, (Object)"ttl must be greater than 0 or not set.");
            }
            if (StringUtils.isNotBlank((String)context.getString("client"))) {
                this.clientType = context.getString("client");
            }
            this.elasticSearchClientContext = new Context();
            this.elasticSearchClientContext.putAll((Map)context.getSubProperties("client."));
            String serializerClazz = "org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer";
            if (StringUtils.isNotBlank((String)context.getString("serializer"))) {
                serializerClazz = context.getString("serializer");
            }
            serializerContext = new Context();
            serializerContext.putAll((Map)context.getSubProperties("serializer."));
            try {
                Class<?> clazz = Class.forName(serializerClazz);
                Configurable serializer = (Configurable)clazz.newInstance();
                if (serializer instanceof ElasticSearchIndexRequestBuilderFactory) {
                    this.indexRequestFactory = (ElasticSearchIndexRequestBuilderFactory)serializer;
                    this.indexRequestFactory.configure(serializerContext);
                    break block18;
                }
                if (serializer instanceof ElasticSearchEventSerializer) {
                    this.eventSerializer = (ElasticSearchEventSerializer)serializer;
                    this.eventSerializer.configure(serializerContext);
                    break block18;
                }
                throw new IllegalArgumentException(serializerClazz + " is not an ElasticSearchEventSerializer");
            }
            catch (Exception e) {
                logger.error("Could not instantiate event serializer.", (Throwable)e);
                Throwables.propagate((Throwable)e);
            }
        }
        if (this.sinkCounter == null) {
            this.sinkCounter = new SinkCounter(this.getName());
        }
        String indexNameBuilderClass = "org.apache.flume.sink.elasticsearch.TimeBasedIndexNameBuilder";
        if (StringUtils.isNotBlank((String)context.getString("indexNameBuilder"))) {
            indexNameBuilderClass = context.getString("indexNameBuilder");
        }
        Context indexnameBuilderContext = new Context();
        serializerContext.putAll((Map)context.getSubProperties("indexNameBuilder."));
        try {
            Class<?> clazz = Class.forName(indexNameBuilderClass);
            this.indexNameBuilder = (IndexNameBuilder)clazz.newInstance();
            indexnameBuilderContext.put("indexName", this.indexName);
            this.indexNameBuilder.configure(indexnameBuilderContext);
        }
        catch (Exception e) {
            logger.error("Could not instantiate index name builder.", (Throwable)e);
            Throwables.propagate((Throwable)e);
        }
        if (this.sinkCounter == null) {
            this.sinkCounter = new SinkCounter(this.getName());
        }
        Preconditions.checkState((boolean)StringUtils.isNotBlank((String)this.indexName), (Object)"Missing Param:indexName");
        Preconditions.checkState((boolean)StringUtils.isNotBlank((String)this.indexType), (Object)"Missing Param:indexType");
        Preconditions.checkState((boolean)StringUtils.isNotBlank((String)this.clusterName), (Object)"Missing Param:clusterName");
        Preconditions.checkState((this.batchSize >= 1 ? 1 : 0) != 0, (Object)"batchSize must be greater than 0");
    }

    public void start() {
        block4: {
            ElasticSearchClientFactory clientFactory = new ElasticSearchClientFactory();
            logger.info("ElasticSearch sink {} started");
            this.sinkCounter.start();
            try {
                if (this.isLocal) {
                    this.client = clientFactory.getLocalClient(this.clientType, this.eventSerializer, this.indexRequestFactory);
                } else {
                    this.client = clientFactory.getClient(this.clientType, this.serverAddresses, this.clusterName, this.eventSerializer, this.indexRequestFactory);
                    this.client.configure(this.elasticSearchClientContext);
                }
                this.sinkCounter.incrementConnectionCreatedCount();
            }
            catch (Exception ex) {
                ex.printStackTrace();
                this.sinkCounter.incrementConnectionFailedCount();
                if (this.client == null) break block4;
                this.client.close();
                this.sinkCounter.incrementConnectionClosedCount();
            }
        }
        super.start();
    }

    public void stop() {
        logger.info("ElasticSearch sink {} stopping");
        if (this.client != null) {
            this.client.close();
        }
        this.sinkCounter.incrementConnectionClosedCount();
        this.sinkCounter.stop();
        super.stop();
    }

    private long parseTTL(String ttl) {
        this.matcher = this.matcher.reset(ttl);
        if (this.matcher.find()) {
            if (this.matcher.group(2).equals("ms")) {
                return Long.parseLong(this.matcher.group(1));
            }
            if (this.matcher.group(2).equals("s")) {
                return TimeUnit.SECONDS.toMillis(Integer.parseInt(this.matcher.group(1)));
            }
            if (this.matcher.group(2).equals("m")) {
                return TimeUnit.MINUTES.toMillis(Integer.parseInt(this.matcher.group(1)));
            }
            if (this.matcher.group(2).equals("h")) {
                return TimeUnit.HOURS.toMillis(Integer.parseInt(this.matcher.group(1)));
            }
            if (this.matcher.group(2).equals("d")) {
                return TimeUnit.DAYS.toMillis(Integer.parseInt(this.matcher.group(1)));
            }
            if (this.matcher.group(2).equals("w")) {
                return TimeUnit.DAYS.toMillis(7 * Integer.parseInt(this.matcher.group(1)));
            }
            if (this.matcher.group(2).equals("")) {
                logger.info("TTL qualifier is empty. Defaulting to day qualifier.");
                return TimeUnit.DAYS.toMillis(Integer.parseInt(this.matcher.group(1)));
            }
            logger.debug("Unknown TTL qualifier provided. Setting TTL to 0.");
            return 0L;
        }
        logger.info("TTL not provided. Skipping the TTL config by returning 0.");
        return 0L;
    }
}

