/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ons.client.rocketmq.impl;

import com.aliyun.openservices.ons.api.OffsetStore;
import com.aliyun.openservices.ons.api.TopicPartition;
import com.aliyun.openservices.ons.shaded.com.google.common.base.Optional;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.ExecutorServices;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.ThreadFactoryImpl;
import com.aliyun.openservices.ons.shaded.org.slf4j.Logger;
import com.aliyun.openservices.ons.shaded.org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public abstract class AbstractOffsetStore
implements OffsetStore {
    private static final Logger log = LoggerFactory.getLogger(AbstractOffsetStore.class);
    private final long persistPeriodSeconds;
    private final ConcurrentMap<TopicPartition, Long> offsetTable;
    private final ScheduledExecutorService offsetPersistScheduler;

    public AbstractOffsetStore(long persistPeriodSeconds) {
        this.persistPeriodSeconds = persistPeriodSeconds;
        this.offsetTable = new ConcurrentHashMap<TopicPartition, Long>();
        this.offsetPersistScheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("OffsetPersistScheduler"));
    }

    @Override
    public void start() {
        Map<TopicPartition, Long> queueOffsetTable = this.loadOffset();
        if (null != queueOffsetTable) {
            this.offsetTable.putAll(queueOffsetTable);
        }
        this.offsetPersistScheduler.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                try {
                    AbstractOffsetStore.this.persistOffset(AbstractOffsetStore.this.offsetTable);
                }
                catch (Throwable t) {
                    log.error("Exception occurs while trying to persist offset", t);
                }
            }
        }, this.persistPeriodSeconds, this.persistPeriodSeconds, TimeUnit.SECONDS);
    }

    @Override
    public void shutdown() {
        try {
            if (!ExecutorServices.awaitTerminated(this.offsetPersistScheduler)) {
                log.error("[Bug] Timeout to shutdown the offset persist scheduler.");
            }
        }
        catch (Throwable t) {
            log.error("Failed to shutdown the offset persist scheduler.", t);
        }
    }

    public abstract Map<TopicPartition, Long> loadOffset();

    public abstract void persistOffset(Map<TopicPartition, Long> var1);

    @Override
    public void updateOffset(TopicPartition partition, long offset) {
        this.offsetTable.put(partition, offset);
    }

    @Override
    public Optional<Long> readOffset(TopicPartition partition) {
        Long offset = (Long)this.offsetTable.get(partition);
        if (null == offset) {
            return Optional.absent();
        }
        return Optional.of(offset);
    }
}

