/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import joptsimple.OptionSpec;
import kafka.common.StreamEndException;
import kafka.consumer.BaseConsumer;
import kafka.consumer.BaseConsumerRecord;
import kafka.consumer.NewShinyConsumer;
import kafka.consumer.OldConsumer;
import kafka.tools.ConsoleConsumer;
import kafka.tools.MessageFormatter;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import kafka.utils.Logging$class;
import kafka.utils.ZkUtils$;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.log4j.Logger;
import scala.Function0;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/*
 * Duplicate member names - consider using --renamedupmembers true
 */
public final class ConsoleConsumer$
implements Logging {
    public static final ConsoleConsumer$ MODULE$;
    private int messageCount;
    private final CountDownLatch kafka$tools$ConsoleConsumer$$shutdownLatch;
    private final String loggerName;
    private final Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    private volatile boolean bitmap$0;

    static {
        new ConsoleConsumer$();
    }

    @Override
    public String loggerName() {
        return this.loggerName;
    }

    private Logger logger$lzycompute() {
        ConsoleConsumer$ consoleConsumer$ = this;
        synchronized (consoleConsumer$) {
            if (!this.bitmap$0) {
                this.logger = Logging$class.logger(this);
                this.bitmap$0 = true;
            }
            return this.logger;
        }
    }

    @Override
    public Logger logger() {
        return this.bitmap$0 ? this.logger : this.logger$lzycompute();
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    @Override
    public Log4jController$ kafka$utils$Logging$$log4jController() {
        return this.kafka$utils$Logging$$log4jController;
    }

    @Override
    public void kafka$utils$Logging$_setter_$loggerName_$eq(String x$1) {
        this.loggerName = x$1;
    }

    @Override
    public void kafka$utils$Logging$_setter_$kafka$utils$Logging$$log4jController_$eq(Log4jController$ x$1) {
        this.kafka$utils$Logging$$log4jController = x$1;
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging$class.trace(this, msg);
    }

    @Override
    public Object trace(Function0<Throwable> e) {
        return Logging$class.trace(this, e);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.trace(this, msg, e);
    }

    @Override
    public void swallowTrace(Function0<BoxedUnit> action) {
        Logging$class.swallowTrace(this, action);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging$class.debug(this, msg);
    }

    @Override
    public Object debug(Function0<Throwable> e) {
        return Logging$class.debug(this, e);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.debug(this, msg, e);
    }

    @Override
    public void swallowDebug(Function0<BoxedUnit> action) {
        Logging$class.swallowDebug(this, action);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging$class.info(this, msg);
    }

    @Override
    public Object info(Function0<Throwable> e) {
        return Logging$class.info(this, e);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.info(this, msg, e);
    }

    @Override
    public void swallowInfo(Function0<BoxedUnit> action) {
        Logging$class.swallowInfo(this, action);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging$class.warn(this, msg);
    }

    @Override
    public Object warn(Function0<Throwable> e) {
        return Logging$class.warn(this, e);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.warn(this, msg, e);
    }

    @Override
    public void swallowWarn(Function0<BoxedUnit> action) {
        Logging$class.swallowWarn(this, action);
    }

    @Override
    public void swallow(Function0<BoxedUnit> action) {
        Logging$class.swallow(this, action);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging$class.error(this, msg);
    }

    @Override
    public Object error(Function0<Throwable> e) {
        return Logging$class.error(this, e);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.error(this, msg, e);
    }

    @Override
    public void swallowError(Function0<BoxedUnit> action) {
        Logging$class.swallowError(this, action);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging$class.fatal(this, msg);
    }

    @Override
    public Object fatal(Function0<Throwable> e) {
        return Logging$class.fatal(this, e);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.fatal(this, msg, e);
    }

    public int messageCount() {
        return this.messageCount;
    }

    public void messageCount_$eq(int x$1) {
        this.messageCount = x$1;
    }

    public CountDownLatch kafka$tools$ConsoleConsumer$$shutdownLatch() {
        return this.kafka$tools$ConsoleConsumer$$shutdownLatch;
    }

    public void main(String[] args) {
        ConsoleConsumer.ConsumerConfig conf = new ConsoleConsumer.ConsumerConfig(args);
        try {
            this.run(conf);
        }
        catch (Throwable throwable) {
            this.error((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Unknown error when running consumer: ";
                }
            }, (Function0<Throwable>)new Serializable(throwable){
                public static final long serialVersionUID = 0L;
                private final Throwable e$1;

                public final Throwable apply() {
                    return this.e$1;
                }
                {
                    this.e$1 = e$1;
                }
            });
            System.exit(1);
        }
    }

    /*
     * WARNING - void declaration
     */
    public void run(ConsoleConsumer.ConsumerConfig conf) {
        BaseConsumer baseConsumer;
        if (conf.useNewConsumer()) {
            long timeoutMs = conf.timeoutMs() >= 0 ? (long)conf.timeoutMs() : Long.MAX_VALUE;
            baseConsumer = new NewShinyConsumer(conf.topicArg(), this.getNewConsumerProps(conf), timeoutMs);
        } else {
            this.checkZk(conf);
            baseConsumer = new OldConsumer(conf.filterSpec(), this.getOldConsumerProps(conf));
        }
        BaseConsumer consumer = baseConsumer;
        this.addShutdownHook(consumer, conf);
        try {
            this.process(Predef$.MODULE$.int2Integer(conf.maxMessages()), conf.formatter(), consumer, conf.skipMessageOnError());
            consumer.cleanup();
            this.reportRecordCount();
            if (!conf.groupIdPassed()) {
                ZkUtils$.MODULE$.maybeDeletePath((String)conf.options().valueOf(conf.zkConnectOpt()), new StringBuilder().append((Object)"/consumers/").append(conf.consumerProps().get("group.id")).toString());
            }
            this.kafka$tools$ConsoleConsumer$$shutdownLatch().countDown();
            return;
        }
        catch (Throwable throwable) {
            void var2_3;
            var2_3.cleanup();
            this.reportRecordCount();
            if (!conf.groupIdPassed()) {
                ZkUtils$.MODULE$.maybeDeletePath((String)conf.options().valueOf(conf.zkConnectOpt()), new StringBuilder().append((Object)"/consumers/").append(conf.consumerProps().get("group.id")).toString());
            }
            this.kafka$tools$ConsoleConsumer$$shutdownLatch().countDown();
            throw throwable;
        }
    }

    public void checkZk(ConsoleConsumer.ConsumerConfig config) {
        if (!this.checkZkPathExists((String)config.options().valueOf(config.zkConnectOpt()), "/brokers/ids")) {
            System.err.println("No brokers found in ZK.");
            System.exit(1);
        }
        if (!config.options().has((OptionSpec)config.deleteConsumerOffsetsOpt()) && config.options().has((OptionSpec)config.resetBeginningOpt()) && this.checkZkPathExists((String)config.options().valueOf(config.zkConnectOpt()), new StringBuilder().append((Object)"/consumers/").append((Object)config.consumerProps().getProperty("group.id")).append((Object)"/offsets").toString())) {
            System.err.println(new StringBuilder().append((Object)"Found previous offset information for this group ").append((Object)config.consumerProps().getProperty("group.id")).append((Object)". Please use --delete-consumer-offsets to delete previous offsets metadata").toString());
            System.exit(1);
        }
    }

    public void addShutdownHook(BaseConsumer consumer, ConsoleConsumer.ConsumerConfig conf) {
        Runtime.getRuntime().addShutdownHook(new Thread(consumer){
            private final BaseConsumer consumer$1;

            public void run() {
                this.consumer$1.stop();
                ConsoleConsumer$.MODULE$.kafka$tools$ConsoleConsumer$$shutdownLatch().await();
            }
            {
                this.consumer$1 = consumer$1;
            }
        });
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void process(Integer maxMessages, MessageFormatter formatter, BaseConsumer consumer, boolean skipMessageOnError) {
        while (this.messageCount() < Predef$.MODULE$.Integer2int(maxMessages) || BoxesRunTime.equalsNumObject((Number)maxMessages, (Object)BoxesRunTime.boxToInteger((int)-1))) {
            BaseConsumerRecord baseConsumerRecord;
            try {
                baseConsumerRecord = consumer.receive();
            }
            catch (Throwable throwable) {
                this.error((Function0<String>)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final String apply() {
                        return "Error processing message, terminating consumer process: ";
                    }
                }, (Function0<Throwable>)new Serializable(throwable){
                    public static final long serialVersionUID = 0L;
                    private final Throwable e$2;

                    public final Throwable apply() {
                        return this.e$2;
                    }
                    {
                        this.e$2 = e$2;
                    }
                });
                return;
            }
            catch (WakeupException wakeupException) {
                this.trace((Function0<String>)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final String apply() {
                        return "Caught WakeupException because consumer is shutdown, ignore and terminate.";
                    }
                });
                return;
            }
            catch (StreamEndException streamEndException) {
                this.trace((Function0<String>)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final String apply() {
                        return "Caught StreamEndException because consumer is shutdown, ignore and terminate.";
                    }
                });
                return;
            }
            BaseConsumerRecord msg = baseConsumerRecord;
            this.messageCount_$eq(this.messageCount() + 1);
            try {
                formatter.writeTo(msg.key(), msg.value(), System.out);
            }
            catch (Throwable throwable) {
                if (!skipMessageOnError) throw throwable;
                this.error((Function0<String>)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final String apply() {
                        return "Error processing message, skipping this message: ";
                    }
                }, (Function0<Throwable>)new Serializable(throwable){
                    public static final long serialVersionUID = 0L;
                    private final Throwable e$3;

                    public final Throwable apply() {
                        return this.e$3;
                    }
                    {
                        this.e$3 = e$3;
                    }
                });
            }
            this.checkErr(formatter);
        }
    }

    public void reportRecordCount() {
        System.err.println(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Processed a total of ", " messages"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.messageCount())})));
    }

    public void checkErr(MessageFormatter formatter) {
        if (System.out.checkError()) {
            System.err.println("Unable to write to standard out, closing consumer.");
            formatter.close();
            System.exit(1);
        }
    }

    /*
     * WARNING - void declaration
     */
    public Properties getOldConsumerProps(ConsoleConsumer.ConsumerConfig config) {
        void var2_2;
        Properties props = new Properties();
        props.putAll((Map<?, ?>)config.consumerProps());
        props.put("auto.offset.reset", config.fromBeginning() ? "smallest" : "largest");
        props.put("zookeeper.connect", config.zkConnectionStr());
        if (!config.options().has((OptionSpec)config.deleteConsumerOffsetsOpt()) && config.options().has((OptionSpec)config.resetBeginningOpt()) && this.checkZkPathExists((String)config.options().valueOf(config.zkConnectOpt()), new StringBuilder().append((Object)"/consumers/").append((Object)props.getProperty("group.id")).append((Object)"/offsets").toString())) {
            System.err.println(new StringBuilder().append((Object)"Found previous offset information for this group ").append((Object)props.getProperty("group.id")).append((Object)". Please use --delete-consumer-offsets to delete previous offsets metadata").toString());
            System.exit(1);
        }
        if (config.options().has((OptionSpec)config.deleteConsumerOffsetsOpt())) {
            ZkUtils$.MODULE$.maybeDeletePath((String)config.options().valueOf(config.zkConnectOpt()), new StringBuilder().append((Object)"/consumers/").append((Object)config.consumerProps().getProperty("group.id")).toString());
        }
        Object object = config.timeoutMs() >= 0 ? props.put("consumer.timeout.ms", ((Object)BoxesRunTime.boxToInteger((int)config.timeoutMs())).toString()) : BoxedUnit.UNIT;
        return var2_2;
    }

    /*
     * WARNING - void declaration
     */
    public Properties getNewConsumerProps(ConsoleConsumer.ConsumerConfig config) {
        void var2_2;
        Properties props = new Properties();
        props.putAll((Map<?, ?>)config.consumerProps());
        props.put("auto.offset.reset", config.options().has((OptionSpec)config.resetBeginningOpt()) ? "earliest" : "latest");
        props.put("bootstrap.servers", config.bootstrapServer());
        props.put("key.deserializer", config.keyDeserializer() == null ? "org.apache.kafka.common.serialization.ByteArrayDeserializer" : config.keyDeserializer());
        props.put("value.deserializer", config.valueDeserializer() == null ? "org.apache.kafka.common.serialization.ByteArrayDeserializer" : config.valueDeserializer());
        return var2_2;
    }

    public boolean checkZkPathExists(String zkUrl, String path) {
        boolean bl;
        try {
            ZkClient zk = ZkUtils$.MODULE$.createZkClient(zkUrl, 30000, 30000);
            bl = zk.exists(path);
        }
        catch (Throwable throwable) {
            bl = false;
        }
        return bl;
    }

    private ConsoleConsumer$() {
        MODULE$ = this;
        Logging$class.$init$(this);
        this.messageCount = 0;
        this.kafka$tools$ConsoleConsumer$$shutdownLatch = new CountDownLatch(1);
    }
}

