/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tephra.txprune;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TxConstants;
import org.apache.tephra.txprune.TransactionPruningPlugin;
import org.apache.tephra.txprune.TransactionPruningRunnable;
import org.apache.twill.internal.utils.Instances;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionPruningService
extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionPruningService.class);
    private final Configuration conf;
    private final TransactionManager txManager;
    private final long scheduleInterval;
    private final boolean pruneEnabled;
    private TransactionPruningRunnable pruneRunnable;
    private ScheduledExecutorService scheduledExecutorService;

    public TransactionPruningService(Configuration conf, TransactionManager txManager) {
        this.conf = conf;
        this.txManager = txManager;
        this.pruneEnabled = conf.getBoolean("data.tx.prune.enable", false);
        this.scheduleInterval = conf.getLong("data.tx.prune.interval", TxConstants.TransactionPruning.DEFAULT_PRUNE_INTERVAL);
    }

    protected void startUp() throws Exception {
        if (!this.pruneEnabled) {
            LOG.info("Transaction pruning is not enabled");
            return;
        }
        LOG.info("Starting {}...", (Object)((Object)((Object)this)).getClass().getSimpleName());
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("tephra-pruning-thread").setDaemon(true).build());
        Map<String, TransactionPruningPlugin> plugins = this.initializePlugins();
        long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(this.conf.getInt("data.tx.max.lifetime", TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME));
        long txPruneBufferMillis = TimeUnit.SECONDS.toMillis(this.conf.getLong("data.tx.grace.period", TxConstants.TransactionPruning.DEFAULT_PRUNE_GRACE_PERIOD));
        this.pruneRunnable = this.getTxPruneRunnable(this.txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis);
        this.scheduledExecutorService.scheduleAtFixedRate(this.pruneRunnable, this.scheduleInterval, this.scheduleInterval, TimeUnit.SECONDS);
        LOG.info("Scheduled {} plugins with interval {} seconds", (Object)plugins.size(), (Object)this.scheduleInterval);
    }

    protected void shutDown() throws Exception {
        if (!this.pruneEnabled) {
            return;
        }
        LOG.info("Stopping {}...", (Object)((Object)((Object)this)).getClass().getSimpleName());
        try {
            this.scheduledExecutorService.shutdown();
            this.scheduledExecutorService.awaitTermination(30L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            this.scheduledExecutorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
        LOG.info("Stopped {}", (Object)((Object)((Object)this)).getClass().getSimpleName());
    }

    public void pruneNow() {
        if (this.pruneEnabled) {
            this.scheduledExecutorService.execute(this.pruneRunnable);
            LOG.info("Triggered invalid transaction pruning due to request received.");
        } else {
            LOG.info("Request to trigger transaction pruning received but pruning is not enabled.");
        }
    }

    @VisibleForTesting
    TransactionPruningRunnable getTxPruneRunnable(TransactionManager txManager, Map<String, TransactionPruningPlugin> plugins, long txMaxLifetimeMillis, long txPruneBufferMillis) {
        return new TransactionPruningRunnable(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis);
    }

    private Map<String, TransactionPruningPlugin> initializePlugins() throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException, IOException {
        HashMap<String, TransactionPruningPlugin> initializedPlugins = new HashMap<String, TransactionPruningPlugin>();
        HashSet<String> plugins = new HashSet<String>(Arrays.asList(this.conf.getTrimmedStrings("data.tx.prune.plugins", new String[]{"data.tx.prune.plugin.default"})));
        LOG.info("Initializing invalid list prune plugins {}", plugins);
        for (String plugin : plugins) {
            Class clazz = null;
            if ("data.tx.prune.plugin.default".equals(plugin)) {
                Class defaultClass = Class.forName("org.apache.tephra.hbase.txprune.HBaseTransactionPruningPlugin");
                if (TransactionPruningPlugin.class.isAssignableFrom(defaultClass)) {
                    clazz = defaultClass;
                }
            } else {
                clazz = this.conf.getClass(plugin + ".class", null, TransactionPruningPlugin.class);
            }
            if (clazz == null) {
                throw new IllegalStateException("No class specified in configuration for invalid pruning plugin " + plugin);
            }
            LOG.debug("Got class {} for plugin {}", (Object)clazz.getName(), (Object)plugin);
            TransactionPruningPlugin instance = (TransactionPruningPlugin)Instances.newInstance((Class)clazz);
            instance.initialize(this.conf);
            LOG.debug("Plugin {} initialized", (Object)plugin);
            initializedPlugins.put(plugin, instance);
        }
        return initializedPlugins;
    }
}

