/*
 * Decompiled with CFR 0.152.
 */
package com.aizuda.snailjob.server.retry.task.support.dispatch;

import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.PartitionTask;
import com.aizuda.snailjob.server.common.dto.ScanTask;
import com.aizuda.snailjob.server.common.enums.RetryTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.server.common.pekko.ActorGenerator;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.snailjob.server.retry.task.dto.RetryTaskPrepareDTO;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.handler.RateLimiterHandler;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.Retry;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

@Component(value="ScanRetryActor")
@Scope(value="prototype")
public class ScanRetryActor
extends AbstractActor {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ScanRetryActor.class);
    public static final ConcurrentSkipListSet<String> REPEATED_PULL = new ConcurrentSkipListSet(new ArrayList());
    private final SystemProperties systemProperties;
    private final AccessTemplate accessTemplate;
    private final RetryMapper retryMapper;
    private final GroupConfigMapper groupConfigMapper;
    private final RateLimiterHandler rateLimiterHandler;

    public AbstractActor.Receive createReceive() {
        return this.receiveBuilder().match(ScanTask.class, config -> {
            try {
                this.doScan((ScanTask)config);
            }
            catch (Exception e) {
                SnailJobLog.LOCAL.error("Data scanner processing exception. [{}]", new Object[]{config, e});
            }
            finally {
                REPEATED_PULL.remove(config.getBucketStr());
            }
        }).build();
    }

    private void doScan(ScanTask scanTask) {
        PartitionTaskUtils.process(startId -> this.listAvailableTasks(startId, scanTask.getBuckets()), this::processRetryPartitionTasks, this::stopCondition, (long)0L);
    }

    private boolean stopCondition(List<? extends PartitionTask> partitionTasks) {
        if (CollectionUtils.isEmpty(partitionTasks)) {
            return true;
        }
        if (!this.rateLimiterHandler.tryAcquire(partitionTasks.size())) {
            log.warn("Current node triggers current limit");
            return true;
        }
        return false;
    }

    private void processRetryPartitionTasks(List<? extends PartitionTask> partitionTasks) {
        if (CollUtil.isEmpty(partitionTasks)) {
            return;
        }
        Map<Long, RetrySceneConfig> sceneConfigMap = this.getSceneConfigMap(partitionTasks);
        ArrayList<Retry> waitUpdateRetries = new ArrayList<Retry>();
        ArrayList<RetryTaskPrepareDTO> waitExecRetries = new ArrayList<RetryTaskPrepareDTO>();
        for (PartitionTask partitionTask : partitionTasks) {
            RetryPartitionTask retryPartitionTask = (RetryPartitionTask)partitionTask;
            RetrySceneConfig retrySceneConfig = sceneConfigMap.get(retryPartitionTask.getSceneId());
            if (Objects.isNull(retrySceneConfig)) continue;
            this.processRetry(retryPartitionTask, retrySceneConfig, waitExecRetries, waitUpdateRetries);
        }
        if (CollUtil.isEmpty(waitUpdateRetries)) {
            return;
        }
        this.retryMapper.updateBatchNextTriggerAtById(waitUpdateRetries);
        for (RetryTaskPrepareDTO retryTaskPrepareDTO : waitExecRetries) {
            ActorRef actorRef = ActorGenerator.retryTaskPrepareActor();
            actorRef.tell((Object)retryTaskPrepareDTO, actorRef);
        }
    }

    private Map<Long, RetrySceneConfig> getSceneConfigMap(List<? extends PartitionTask> partitionTasks) {
        Set sceneIdSet = StreamUtils.toSet(partitionTasks, partitionTask -> ((RetryPartitionTask)((Object)partitionTask)).getSceneId());
        List retrySceneConfigs = this.accessTemplate.getSceneConfigAccess().list((LambdaQueryWrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().select(new SFunction[]{RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval, RetrySceneConfig::getBlockStrategy, RetrySceneConfig::getSceneName, RetrySceneConfig::getCbTriggerType, RetrySceneConfig::getCbTriggerInterval, RetrySceneConfig::getExecutorTimeout, RetrySceneConfig::getId}).eq(RetrySceneConfig::getSceneStatus, (Object)StatusEnum.YES.getStatus())).in(RetrySceneConfig::getId, (Collection)sceneIdSet));
        return StreamUtils.toIdentityMap((Collection)retrySceneConfigs, RetrySceneConfig::getId);
    }

    private void processRetry(RetryPartitionTask partitionTask, RetrySceneConfig retrySceneConfig, List<RetryTaskPrepareDTO> waitExecRetries, List<Retry> waitUpdateRetries) {
        Retry retry = new Retry();
        retry.setNextTriggerAt(this.calculateNextTriggerTime(partitionTask, retrySceneConfig));
        retry.setId(partitionTask.getId());
        waitUpdateRetries.add(retry);
        RetryTaskPrepareDTO retryTaskPrepareDTO = RetryTaskConverter.INSTANCE.toRetryTaskPrepareDTO(partitionTask);
        retryTaskPrepareDTO.setBlockStrategy(retrySceneConfig.getBlockStrategy());
        retryTaskPrepareDTO.setExecutorTimeout(retrySceneConfig.getExecutorTimeout());
        retryTaskPrepareDTO.setRetryTaskExecutorScene(RetryTaskExecutorSceneEnum.AUTO_RETRY.getScene());
        waitExecRetries.add(retryTaskPrepareDTO);
    }

    protected Long calculateNextTriggerTime(RetryPartitionTask partitionTask, RetrySceneConfig retrySceneConfig) {
        WaitStrategy waitStrategy;
        WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
        long now = DateUtils.toNowMilli();
        long nextTriggerAt = partitionTask.getNextTriggerAt();
        if (nextTriggerAt + DateUtils.toEpochMilli((long)SystemConstants.SCHEDULE_PERIOD) < now) {
            nextTriggerAt = now;
            partitionTask.setNextTriggerAt(nextTriggerAt);
        }
        waitStrategyContext.setNextTriggerAt(nextTriggerAt);
        waitStrategyContext.setDelayLevel(Integer.valueOf(partitionTask.getRetryCount() + 1));
        if (SyetemTaskTypeEnum.CALLBACK.getType().equals(partitionTask.getTaskType())) {
            waitStrategyContext.setTriggerInterval(retrySceneConfig.getCbTriggerInterval());
            waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy((int)retrySceneConfig.getCbTriggerType());
        } else {
            waitStrategyContext.setTriggerInterval(retrySceneConfig.getTriggerInterval());
            waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy((int)retrySceneConfig.getBackOff());
        }
        return waitStrategy.computeTriggerTime(waitStrategyContext);
    }

    public List<RetryPartitionTask> listAvailableTasks(Long startId, Set<Integer> buckets) {
        List retries = this.accessTemplate.getRetryAccess().listPage(new PageDTO(0L, (long)this.systemProperties.getRetryPullPageSize(), Boolean.FALSE.booleanValue()), (LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().select(new SFunction[]{Retry::getId, Retry::getNextTriggerAt, Retry::getGroupName, Retry::getRetryCount, Retry::getSceneName, Retry::getNamespaceId, Retry::getTaskType, Retry::getSceneId, Retry::getGroupId}).eq(Retry::getRetryStatus, (Object)RetryStatusEnum.RUNNING.getStatus())).in(Retry::getBucketIndex, buckets)).le(Retry::getNextTriggerAt, (Object)(DateUtils.toNowMilli() + DateUtils.toEpochMilli((long)SystemConstants.SCHEDULE_PERIOD)))).gt(Retry::getId, (Object)startId)).orderByAsc(Retry::getId)).getRecords();
        if (CollUtil.isNotEmpty((Collection)retries)) {
            List groupConfigs = StreamUtils.toList((Collection)this.groupConfigMapper.selectList((Wrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().select(new SFunction[]{GroupConfig::getId}).eq(GroupConfig::getGroupStatus, (Object)StatusEnum.YES.getStatus())).in(GroupConfig::getId, (Collection)StreamUtils.toSet((Collection)retries, Retry::getGroupId))), GroupConfig::getId);
            retries = retries.stream().filter(retry -> groupConfigs.contains(retry.getGroupId())).collect(Collectors.toList());
        }
        return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retries);
    }

    @Generated
    public ScanRetryActor(SystemProperties systemProperties, AccessTemplate accessTemplate, RetryMapper retryMapper, GroupConfigMapper groupConfigMapper, RateLimiterHandler rateLimiterHandler) {
        this.systemProperties = systemProperties;
        this.accessTemplate = accessTemplate;
        this.retryMapper = retryMapper;
        this.groupConfigMapper = groupConfigMapper;
        this.rateLimiterHandler = rateLimiterHandler;
    }
}

