/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ots.internal.streamclient.core.task;

import com.alicloud.openservices.tablestore.SyncClientInterface;
import com.alicloud.openservices.tablestore.model.Stream;
import com.alicloud.openservices.tablestore.model.StreamShard;
import com.aliyun.openservices.ots.internal.streamclient.DependencyException;
import com.aliyun.openservices.ots.internal.streamclient.StreamClientException;
import com.aliyun.openservices.ots.internal.streamclient.StreamConfig;
import com.aliyun.openservices.ots.internal.streamclient.core.task.ITask;
import com.aliyun.openservices.ots.internal.streamclient.core.task.TaskResult;
import com.aliyun.openservices.ots.internal.streamclient.core.task.TaskType;
import com.aliyun.openservices.ots.internal.streamclient.lease.ShardLease;
import com.aliyun.openservices.ots.internal.streamclient.lease.interfaces.ILeaseManager;
import com.aliyun.openservices.ots.internal.streamclient.utils.OTSHelper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShardSyncTask
implements ITask {
    private static final Logger LOG = LoggerFactory.getLogger(ShardSyncTask.class);
    private final SyncClientInterface ots;
    private final String tableName;
    private final ILeaseManager<ShardLease> leaseManager;
    private String streamId;

    public ShardSyncTask(StreamConfig streamConfig, ILeaseManager<ShardLease> leaseManager) {
        this.ots = streamConfig.getOTSClient();
        this.tableName = streamConfig.getDataTableName();
        this.leaseManager = leaseManager;
    }

    @Override
    public TaskResult call() {
        LOG.debug("Start shard sync task.");
        try {
            this.checkAndCreateLeasesForNewShards();
            LOG.debug("Shard Sync task completed.");
            return new TaskResult(true);
        }
        catch (Exception e) {
            LOG.warn("Exception encountered in shard sync task: {}", (Throwable)e);
            return new TaskResult(e);
        }
    }

    @Override
    public TaskType getTaskType() {
        return TaskType.SHARDSYNC;
    }

    void checkAndCreateLeasesForNewShards() throws StreamClientException, DependencyException {
        List<StreamShard> shardList = OTSHelper.listShard(this.ots, this.getStreamId());
        List<ShardLease> currentLeases = this.leaseManager.listLeases();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Current leases, count: {}." + currentLeases.size());
            for (ShardLease lease : currentLeases) {
                LOG.debug("ShardLease: {}", (Object)lease);
            }
        }
        List<ShardLease> newLeasesToCreate = this.determineNewLeasesToCreate(shardList, currentLeases);
        for (ShardLease lease : newLeasesToCreate) {
            this.leaseManager.createLease(lease);
            LOG.info("New lease created, Lease: {}.", (Object)lease);
        }
        this.cleanupGarbageLeases(shardList, currentLeases);
    }

    List<ShardLease> determineNewLeasesToCreate(List<StreamShard> shardList, List<ShardLease> currentLeases) throws DependencyException {
        HashSet<String> leaseKeySet = new HashSet<String>();
        for (ShardLease lease : currentLeases) {
            leaseKeySet.add(lease.getLeaseKey());
        }
        HashMap<String, ShardLease> shardIdToNewShard = new HashMap<String, ShardLease>();
        for (StreamShard shard : shardList) {
            if (leaseKeySet.contains(shard.getShardId())) continue;
            ShardLease newShard = this.createNewShardLease(this.getStreamId(), shard);
            shardIdToNewShard.put(newShard.getLeaseKey(), newShard);
        }
        LinkedHashMap<String, ShardLease> sortedLeasesToCreate = new LinkedHashMap<String, ShardLease>();
        for (ShardLease lease : shardIdToNewShard.values()) {
            this.sortLeaseInInherit(lease, shardIdToNewShard, sortedLeasesToCreate);
        }
        return new ArrayList<ShardLease>(sortedLeasesToCreate.values());
    }

    private void sortLeaseInInherit(ShardLease lease, Map<String, ShardLease> shardIdToNewShard, LinkedHashMap<String, ShardLease> sortedLeasesToCreate) {
        for (String shardId : lease.getParentShardIds()) {
            ShardLease parentLease = shardIdToNewShard.get(shardId);
            if (parentLease == null || sortedLeasesToCreate.containsKey(parentLease.getLeaseKey())) continue;
            this.sortLeaseInInherit(parentLease, shardIdToNewShard, sortedLeasesToCreate);
        }
        if (!sortedLeasesToCreate.containsKey(lease.getLeaseKey())) {
            sortedLeasesToCreate.put(lease.getLeaseKey(), lease);
        }
    }

    ShardLease createNewShardLease(String streamId, StreamShard shard) {
        ShardLease shardLease = new ShardLease(shard.getShardId());
        shardLease.setStreamId(streamId);
        HashSet<String> parentShardIds = new HashSet<String>();
        if (shard.getParentId() != null) {
            parentShardIds.add(shard.getParentId());
        }
        if (shard.getParentSiblingId() != null) {
            parentShardIds.add(shard.getParentSiblingId());
        }
        shardLease.setParentShardIds(parentShardIds);
        shardLease.setCheckpoint("TRIM_HORIZON");
        return shardLease;
    }

    String getStreamId() throws DependencyException {
        if (this.streamId != null) {
            return this.streamId;
        }
        List<Stream> streams = OTSHelper.listStream(this.ots, this.tableName);
        if (streams.isEmpty()) {
            throw new DependencyException("Can't get streamId. Please check whether to enable Stream.");
        }
        if (streams.size() != 1) {
            LOG.error("Expect there is only one stream, tableName: {}.", (Object)this.tableName);
            for (Stream stream : streams) {
                LOG.error("Stream: {}", (Object)stream);
            }
            throw new DependencyException("Expect there is only one stream.");
        }
        this.streamId = streams.get(0).getStreamId();
        return this.streamId;
    }

    void cleanupGarbageLeases(List<StreamShard> shardList, List<ShardLease> currentLeases) throws StreamClientException, DependencyException {
        HashSet<String> leaseKeySet = new HashSet<String>();
        for (ShardLease lease : currentLeases) {
            leaseKeySet.add(lease.getLeaseKey());
        }
        for (StreamShard shard : shardList) {
            if (!leaseKeySet.contains(shard.getShardId())) continue;
            leaseKeySet.remove(shard.getShardId());
        }
        for (String leaseKey : leaseKeySet) {
            this.leaseManager.deleteLease(leaseKey);
            LOG.info("Delete expired lease, LeaseKey: {}.", (Object)leaseKey);
        }
    }
}

