package org.apache.dolphinscheduler.api.service.impl;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteResponse;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.executor.ExecuteClient;
import org.apache.dolphinscheduler.api.executor.ExecuteContext;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.common.enums.ApiTriggerType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.IStreamingTaskOperator;
import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
import org.apache.dolphinscheduler.extract.master.IWorkflowInstanceService;
import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerRequest;
import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerResponse;
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.process.TriggerRelationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
/* loaded from: input_file:org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.class */
public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorService {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ExecutorServiceImpl.class);

    @Autowired
    private ProjectMapper projectMapper;

    @Autowired
    private ProjectService projectService;

    @Autowired
    private ProcessDefinitionMapper processDefinitionMapper;

    @Autowired
    ProcessDefinitionMapper processDefineMapper;

    @Autowired
    private MonitorService monitorService;

    @Autowired
    private ProcessInstanceMapper processInstanceMapper;

    @Autowired
    @Lazy
    private ProcessService processService;

    @Autowired
    private ProcessInstanceDao processInstanceDao;

    @Autowired
    private ProcessDefinitionService processDefinitionService;

    @Autowired
    private CommandService commandService;

    @Autowired
    private TaskDefinitionLogMapper taskDefinitionLogMapper;

    @Autowired
    private TaskDefinitionMapper taskDefinitionMapper;

    @Autowired
    private ProcessTaskRelationMapper processTaskRelationMapper;

    @Autowired
    private TaskGroupQueueMapper taskGroupQueueMapper;

    @Autowired
    private WorkerGroupService workerGroupService;

    @Autowired
    private TriggerRelationService triggerRelationService;

    @Autowired
    private ExecuteClient executeClient;

    @Autowired
    private TenantMapper tenantMapper;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$dolphinscheduler$common$enums$RunMode = new int[RunMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$RunMode[RunMode.RUN_MODE_SERIAL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$common$enums$RunMode[RunMode.RUN_MODE_PARALLEL.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$org$apache$dolphinscheduler$api$enums$ExecuteType = new int[ExecuteType.values().length];
            try {
                $SwitchMap$org$apache$dolphinscheduler$api$enums$ExecuteType[ExecuteType.PAUSE.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$api$enums$ExecuteType[ExecuteType.STOP.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$api$enums$ExecuteType[ExecuteType.REPEAT_RUNNING.ordinal()] = 3;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$api$enums$ExecuteType[ExecuteType.START_FAILURE_TASK_PROCESS.ordinal()] = 4;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$api$enums$ExecuteType[ExecuteType.RECOVER_SUSPENDED_PROCESS.ordinal()] = 5;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    @Override // org.apache.dolphinscheduler.api.service.ExecutorService
    @Transactional(rollbackFor = {Exception.class})
    public Map<String, Object> execProcessInstance(User user, long j, long j2, String str, CommandType commandType, FailureStrategy failureStrategy, String str2, TaskDependType taskDependType, WarningType warningType, Integer num, RunMode runMode, Priority priority, String str3, String str4, Long l, Integer num2, Map<String, String> map, Integer num3, int i, int i2, ComplementDependentMode complementDependentMode, Integer num4, boolean z, ExecutionOrder executionOrder) {
        this.projectService.checkProjectAndAuth(user, this.projectMapper.queryByCode(j), j, ApiFuncIdentificationConstant.WORKFLOW_START);
        HashMap hashMap = new HashMap();
        if (num2.intValue() <= 0 || num2.intValue() > 86400) {
            log.warn("Parameter timeout is invalid, timeout:{}.", num2);
            putMsg(hashMap, Status.TASK_TIMEOUT_PARAMS_ERROR, new Object[0]);
            return hashMap;
        }
        if (Objects.nonNull(num3) && num3.intValue() <= 0) {
            log.warn("Parameter expectedParallelismNumber is invalid, expectedParallelismNumber:{}.", num3);
            putMsg(hashMap, Status.TASK_PARALLELISM_PARAMS_ERROR, new Object[0]);
            return hashMap;
        }
        checkValidTenant(str4);
        ProcessDefinition findProcessDefinition = null != num4 ? this.processService.findProcessDefinition(Long.valueOf(j2), num4.intValue()) : this.processDefinitionMapper.queryByCode(j2);
        checkProcessDefinitionValid(j, findProcessDefinition, j2, Integer.valueOf(findProcessDefinition.getVersion()));
        checkStartNodeList(str2, Long.valueOf(j2), findProcessDefinition.getVersion());
        checkScheduleTimeNumExceed(commandType, str);
        checkMasterExists();
        long genCode = CodeGenerateUtils.getInstance().genCode();
        int createCommand = createCommand(Long.valueOf(genCode), commandType, findProcessDefinition.getCode(), taskDependType, failureStrategy, str2, str, warningType, user.getId().intValue(), num, runMode, priority, str3, str4, l, map, num3, i, i2, complementDependentMode, z, executionOrder);
        if (createCommand > 0) {
            findProcessDefinition.setWarningGroupId(num);
            this.processDefinitionMapper.updateById(findProcessDefinition);
            log.info("Create command complete, processDefinitionCode:{}, commandCount:{}.", Long.valueOf(findProcessDefinition.getCode()), Integer.valueOf(createCommand));
            hashMap.put("data", Long.valueOf(genCode));
            putMsg(hashMap, Status.SUCCESS, new Object[0]);
        } else {
            log.error("Start process instance failed because create command error, processDefinitionCode:{}.", Long.valueOf(findProcessDefinition.getCode()));
            putMsg(hashMap, Status.START_PROCESS_INSTANCE_ERROR, new Object[0]);
        }
        return hashMap;
    }

    private void checkMasterExists() {
        if (this.monitorService.getServerListFromRegistry(true).isEmpty()) {
            throw new ServiceException(Status.MASTER_NOT_EXISTS);
        }
    }

    private void checkScheduleTimeNumExceed(CommandType commandType, String str) {
        if (CommandType.COMPLEMENT_DATA.equals(commandType) && str != null) {
            Map map = JSONUtils.toMap(str);
            if (!map.containsKey("complementScheduleDateList") || ((String) map.get("complementScheduleDateList")).split(",").length <= 100) {
                return;
            }
            log.warn("Parameter cornTime is bigger than {}.", 100);
            throw new ServiceException(Status.SCHEDULE_TIME_NUMBER_EXCEED);
        }
    }

    @Override // org.apache.dolphinscheduler.api.service.ExecutorService
    public void checkProcessDefinitionValid(long j, ProcessDefinition processDefinition, long j2, Integer num) {
        if (j != processDefinition.getProjectCode()) {
            throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, Long.valueOf(processDefinition.getCode()));
        }
        if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
            throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, Long.valueOf(processDefinition.getCode()), Integer.valueOf(processDefinition.getVersion()));
        }
        if (!checkSubProcessDefinitionValid(processDefinition)) {
            throw new ServiceException(Status.SUB_PROCESS_DEFINE_NOT_RELEASE);
        }
    }

    @Override // org.apache.dolphinscheduler.api.service.ExecutorService
    public boolean checkSubProcessDefinitionValid(ProcessDefinition processDefinition) {
        List queryDownstreamByProcessDefinitionCode = this.processTaskRelationMapper.queryDownstreamByProcessDefinitionCode(processDefinition.getCode());
        if (queryDownstreamByProcessDefinitionCode.isEmpty()) {
            return true;
        }
        List queryByCodeList = this.taskDefinitionMapper.queryByCodeList((Set) queryDownstreamByProcessDefinitionCode.stream().map((v0) -> {
            return v0.getPostTaskCode();
        }).collect(Collectors.toSet()));
        HashSet hashSet = new HashSet();
        queryByCodeList.stream().filter(taskDefinition -> {
            return "SUB_PROCESS".equalsIgnoreCase(taskDefinition.getTaskType());
        }).forEach(taskDefinition2 -> {
            hashSet.add(Long.valueOf(JSONUtils.getNodeString(taskDefinition2.getTaskParams(), "processDefinitionCode")));
        });
        if (hashSet.isEmpty()) {
            return true;
        }
        return ((Set) this.processDefinitionMapper.queryByCodes(hashSet).stream().filter(processDefinition2 -> {
            return processDefinition2.getReleaseState().equals(ReleaseState.OFFLINE);
        }).collect(Collectors.toSet())).isEmpty();
    }

    private void checkValidTenant(String str) {
        if (!"default".equals(str) && this.tenantMapper.queryByTenantCode(str) == null) {
            throw new ServiceException(Status.TENANT_NOT_EXIST, str);
        }
    }

    @Override // org.apache.dolphinscheduler.api.service.ExecutorService
    public Map<String, Object> execute(User user, long j, Integer num, ExecuteType executeType) {
        Preconditions.checkNotNull(num, "workflowInstanceId cannot be null");
        Preconditions.checkNotNull(executeType, "executeType cannot be null");
        this.projectService.checkProjectAndAuthThrowException(user, Long.valueOf(j), ApiFuncIdentificationConstant.map.get(executeType));
        checkMasterExists();
        ProcessInstance processInstance = (ProcessInstance) this.processInstanceDao.queryOptionalById(num).orElseThrow(() -> {
            return new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, num);
        });
        Preconditions.checkState(processInstance.getProjectCode().longValue() == j, "The workflow instance's project code doesn't equals to the given project");
        this.executeClient.executeWorkflowInstance(new ExecuteContext(processInstance, this.processDefinitionService.queryWorkflowDefinitionThrowExceptionIfNotFound(processInstance.getProcessDefinitionCode().longValue(), processInstance.getProcessDefinitionVersion()), user, executeType));
        HashMap hashMap = new HashMap();
        hashMap.put("status", Status.SUCCESS);
        return hashMap;
    }

    @Override // org.apache.dolphinscheduler.api.service.ExecutorService
    public Map<String, Object> execute(User user, Integer num, ExecuteType executeType) {
        return execute(user, ((ProcessInstance) this.processInstanceMapper.selectById(num)).getProjectCode().longValue(), num, executeType);
    }

    @Override // org.apache.dolphinscheduler.api.service.ExecutorService
    public WorkflowExecuteResponse executeTask(User user, long j, Integer num, String str, TaskDependType taskDependType) {
        WorkflowExecuteResponse workflowExecuteResponse = new WorkflowExecuteResponse();
        this.projectService.checkProjectAndAuthThrowException(user, Long.valueOf(j), ApiFuncIdentificationConstant.map.get(ExecuteType.EXECUTE_TASK));
        ProcessInstance processInstance = (ProcessInstance) this.processService.findProcessInstanceDetailById(num.intValue()).orElseThrow(() -> {
            return new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, num);
        });
        if (!processInstance.getState().isFinished()) {
            log.error("Can not execute task for process instance which is not finished, processInstanceId:{}.", num);
            putMsg(workflowExecuteResponse, Status.WORKFLOW_INSTANCE_IS_NOT_FINISHED, new Object[0]);
            return workflowExecuteResponse;
        }
        ProcessDefinition findProcessDefinition = this.processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
        findProcessDefinition.setReleaseState(ReleaseState.ONLINE);
        checkProcessDefinitionValid(j, findProcessDefinition, processInstance.getProcessDefinitionCode().longValue(), Integer.valueOf(processInstance.getProcessDefinitionVersion()));
        try {
            if (this.taskDefinitionLogMapper.queryMaxVersionForDefinition(Long.parseLong(str)) == null) {
                putMsg(workflowExecuteResponse, Status.EXECUTE_NOT_DEFINE_TASK, new Object[0]);
                return workflowExecuteResponse;
            }
            HashMap hashMap = new HashMap();
            hashMap.put("ProcessInstanceId", num);
            hashMap.put("StartNodeList", str);
            Command command = new Command();
            command.setCommandType(CommandType.EXECUTE_TASK);
            command.setProcessDefinitionCode(findProcessDefinition.getCode());
            command.setCommandParam(JSONUtils.toJsonString(hashMap));
            command.setExecutorId(user.getId().intValue());
            command.setProcessDefinitionVersion(findProcessDefinition.getVersion());
            command.setProcessInstanceId(num.intValue());
            command.setTestFlag(processInstance.getTestFlag());
            command.setTaskDependType(taskDependType);
            if (!this.commandService.verifyIsNeedCreateCommand(command)) {
                log.warn("Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.", new Object[]{Long.valueOf(findProcessDefinition.getCode()), Integer.valueOf(findProcessDefinition.getVersion()), num});
                putMsg(workflowExecuteResponse, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(findProcessDefinition.getCode()));
                return workflowExecuteResponse;
            }
            log.info("Creating command, commandInfo:{}.", command);
            if (this.commandService.createCommand(command) > 0) {
                log.info("Create {} command complete, processDefinitionCode:{}, processDefinitionVersion:{}.", new Object[]{command.getCommandType().getDescp(), Long.valueOf(command.getProcessDefinitionCode()), Integer.valueOf(findProcessDefinition.getVersion())});
                putMsg(workflowExecuteResponse, Status.SUCCESS, new Object[0]);
            } else {
                log.error("Execute process instance failed because create {} command error, processDefinitionCode:{}, processDefinitionVersion:{}， processInstanceId:{}.", new Object[]{command.getCommandType().getDescp(), Long.valueOf(command.getProcessDefinitionCode()), Integer.valueOf(findProcessDefinition.getVersion()), num});
                putMsg(workflowExecuteResponse, Status.EXECUTE_PROCESS_INSTANCE_ERROR, new Object[0]);
            }
            return workflowExecuteResponse;
        } catch (NumberFormatException e) {
            log.error("startNodeList is not a number");
            putMsg(workflowExecuteResponse, Status.REQUEST_PARAMS_NOT_VALID_ERROR, str);
            return workflowExecuteResponse;
        }
    }

    @Override // org.apache.dolphinscheduler.api.service.ExecutorService
    public Map<String, Object> forceStartTaskInstance(User user, int i) {
        HashMap hashMap = new HashMap();
        TaskGroupQueue taskGroupQueue = (TaskGroupQueue) this.taskGroupQueueMapper.selectById(Integer.valueOf(i));
        checkMasterExists();
        if (taskGroupQueue.getInQueue() == Flag.NO.getCode()) {
            throw new ServiceException(Status.TASK_GROUP_QUEUE_ALREADY_START);
        }
        taskGroupQueue.setForceStart(Flag.YES.getCode());
        taskGroupQueue.setUpdateTime(new Date());
        this.taskGroupQueueMapper.updateById(taskGroupQueue);
        hashMap.put("status", Status.SUCCESS);
        return hashMap;
    }

    public void checkStartNodeList(String str, Long l, int i) {
        if (StringUtils.isNotEmpty(str)) {
            List list = (List) this.processService.findRelationByCode(l.longValue(), i).stream().map((v0) -> {
                return v0.getPostTaskCode();
            }).collect(Collectors.toList());
            for (String str2 : str.split(",")) {
                if (!list.contains(Long.valueOf(str2))) {
                    throw new ServiceException(Status.START_NODE_NOT_EXIST_IN_LAST_PROCESS, str2);
                }
            }
        }
    }

    private Map<String, Object> checkExecuteType(ProcessInstance processInstance, ExecuteType executeType) {
        HashMap hashMap = new HashMap();
        WorkflowExecutionStatus state = processInstance.getState();
        boolean z = false;
        switch (executeType) {
            case PAUSE:
                if (state.isRunning()) {
                    z = true;
                    break;
                }
                break;
            case STOP:
                if (state.canStop()) {
                    z = true;
                    break;
                }
                break;
            case REPEAT_RUNNING:
                if (state.isFinished()) {
                    z = true;
                    break;
                }
                break;
            case START_FAILURE_TASK_PROCESS:
                if (state.isFailure()) {
                    z = true;
                    break;
                }
                break;
            case RECOVER_SUSPENDED_PROCESS:
                if (state.isPause() || state.isStop()) {
                    z = true;
                    break;
                }
                break;
        }
        if (z) {
            putMsg(hashMap, Status.SUCCESS, new Object[0]);
        } else {
            putMsg(hashMap, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), state.toString(), executeType.toString());
        }
        return hashMap;
    }

    private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, WorkflowExecutionStatus workflowExecutionStatus) {
        HashMap hashMap = new HashMap();
        processInstance.setCommandType(commandType);
        processInstance.addHistoryCmd(commandType);
        processInstance.setStateWithDesc(workflowExecutionStatus, commandType.getDescp() + "by ui");
        if (this.processInstanceDao.updateById(processInstance)) {
            log.info("Process instance state is updated to {} in database, processInstanceName:{}.", workflowExecutionStatus.getDesc(), processInstance.getName());
            ((ITaskInstanceExecutionEventListener) SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(processInstance.getHost(), ITaskInstanceExecutionEventListener.class)).onWorkflowInstanceInstanceStateChange(new WorkflowInstanceStateChangeEvent(processInstance.getId().intValue(), 0, processInstance.getState(), processInstance.getId().intValue(), 0));
            putMsg(hashMap, Status.SUCCESS, new Object[0]);
        } else {
            log.error("Process instance state update error, processInstanceName:{}.", processInstance.getName());
            putMsg(hashMap, Status.EXECUTE_PROCESS_INSTANCE_ERROR, new Object[0]);
        }
        return hashMap;
    }

    @Override // org.apache.dolphinscheduler.api.service.ExecutorService
    public Map<String, Object> startCheckByProcessDefinedCode(long j) {
        List<ProcessDefinition> queryByCodes;
        HashMap hashMap = new HashMap();
        ProcessDefinition queryByCode = this.processDefinitionMapper.queryByCode(j);
        if (queryByCode == null) {
            log.error("Process definition is not be found, processDefinitionCode:{}.", Long.valueOf(j));
            putMsg(hashMap, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "processDefinitionCode");
            return hashMap;
        }
        List findAllSubWorkflowDefinitionCode = this.processService.findAllSubWorkflowDefinitionCode(queryByCode.getCode());
        if (!findAllSubWorkflowDefinitionCode.isEmpty() && (queryByCodes = this.processDefinitionMapper.queryByCodes(findAllSubWorkflowDefinitionCode)) != null) {
            for (ProcessDefinition processDefinition : queryByCodes) {
                if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
                    log.warn("Subprocess definition {} of process definition {} is not {}.", new Object[]{processDefinition.getName(), queryByCode.getName(), ReleaseState.ONLINE.getDescp()});
                    putMsg(hashMap, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
                    return hashMap;
                }
            }
        }
        putMsg(hashMap, Status.SUCCESS, new Object[0]);
        return hashMap;
    }

    private int createCommand(Long l, CommandType commandType, long j, TaskDependType taskDependType, FailureStrategy failureStrategy, String str, String str2, WarningType warningType, int i, Integer num, RunMode runMode, Priority priority, String str3, String str4, Long l2, Map<String, String> map, Integer num2, int i2, int i3, ComplementDependentMode complementDependentMode, boolean z, ExecutionOrder executionOrder) {
        Command command = new Command();
        HashMap hashMap = new HashMap();
        if (commandType == null) {
            command.setCommandType(CommandType.START_PROCESS);
        } else {
            command.setCommandType(commandType);
        }
        command.setProcessDefinitionCode(j);
        if (taskDependType != null) {
            command.setTaskDependType(taskDependType);
        }
        if (failureStrategy != null) {
            command.setFailureStrategy(failureStrategy);
        }
        if (!StringUtils.isEmpty(str)) {
            hashMap.put("StartNodeList", str);
        }
        if (warningType != null) {
            command.setWarningType(warningType);
        }
        if (map != null && map.size() > 0) {
            hashMap.put("StartParams", JSONUtils.toJsonString(map));
        }
        command.setCommandParam(JSONUtils.toJsonString(hashMap));
        command.setExecutorId(i);
        command.setWarningGroupId(num);
        command.setProcessInstancePriority(priority);
        command.setWorkerGroup(str3);
        command.setTenantCode(str4);
        command.setEnvironmentCode(l2);
        command.setDryRun(i2);
        command.setTestFlag(i3);
        ProcessDefinition findProcessDefinitionByCode = this.processService.findProcessDefinitionByCode(Long.valueOf(j));
        if (findProcessDefinitionByCode != null) {
            command.setProcessDefinitionVersion(findProcessDefinitionByCode.getVersion());
        }
        command.setProcessInstanceId(0);
        if (commandType != CommandType.COMPLEMENT_DATA) {
            command.setCommandParam(JSONUtils.toJsonString(hashMap));
            int createCommand = this.commandService.createCommand(command);
            if (createCommand > 0) {
                this.triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, l, command.getId());
            }
            return createCommand;
        }
        if (str2 == null || StringUtils.isEmpty(str2)) {
            log.error("Create {} type command error because parameter schedule is invalid.", command.getCommandType().getDescp());
            return 0;
        }
        if (!isValidateScheduleTime(str2)) {
            return 0;
        }
        try {
            log.info("Start to create {} command, processDefinitionCode:{}.", command.getCommandType().getDescp(), Long.valueOf(j));
            return createComplementCommandList(l, str2, runMode, command, num2, complementDependentMode, z, executionOrder);
        } catch (CronParseException e) {
            return 0;
        }
    }

    private int createComplementCommand(Long l, Command command, Map<String, String> map, List<ZonedDateTime> list, List<Schedule> list2, ComplementDependentMode complementDependentMode, boolean z) {
        map.put("complementScheduleDateList", (String) list.stream().map(zonedDateTime -> {
            return DateUtils.dateToString(zonedDateTime);
        }).collect(Collectors.joining(",")));
        command.setCommandParam(JSONUtils.toJsonString(map));
        log.info("Creating command, commandInfo:{}.", command);
        int createCommand = this.commandService.createCommand(command);
        if (createCommand > 0) {
            log.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), Long.valueOf(command.getProcessDefinitionCode()));
        } else {
            log.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), Long.valueOf(command.getProcessDefinitionCode()));
        }
        if (list2.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
            log.info("Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", Long.valueOf(command.getProcessDefinitionCode()));
        } else {
            log.info("Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", Long.valueOf(command.getProcessDefinitionCode()));
            createComplementDependentCommand(list2, command, z);
        }
        if (createCommand > 0) {
            this.triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, l, command.getId());
        }
        return createCommand;
    }

    protected int createComplementCommandList(Long l, String str, RunMode runMode, Command command, Integer num, ComplementDependentMode complementDependentMode, boolean z, ExecutionOrder executionOrder) throws CronParseException {
        int i = 0;
        RunMode runMode2 = runMode == null ? RunMode.RUN_MODE_SERIAL : runMode;
        Map<String, String> map = JSONUtils.toMap(command.getCommandParam());
        Map map2 = JSONUtils.toMap(str);
        if (Objects.isNull(executionOrder)) {
            executionOrder = ExecutionOrder.DESC_ORDER;
        }
        List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode = this.processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
        List<ZonedDateTime> arrayList = new ArrayList();
        if (map2.containsKey("complementStartDate") && map2.containsKey("complementEndDate")) {
            String str2 = (String) map2.get("complementStartDate");
            String str3 = (String) map2.get("complementEndDate");
            if (str2 != null && str3 != null) {
                arrayList = CronUtils.getSelfFireDateList(DateUtils.stringToZoneDateTime(str2), DateUtils.stringToZoneDateTime(str3), queryReleaseSchedulerListByProcessDefinitionCode);
            }
        }
        if (map2.containsKey("complementScheduleDateList")) {
            String str4 = (String) map2.get("complementScheduleDateList");
            if (StringUtils.isNotBlank(str4)) {
                arrayList = (List) Splitter.on(",").splitToStream(str4).map(str5 -> {
                    return DateUtils.stringToZoneDateTime(str5.trim());
                }).distinct().collect(Collectors.toList());
            }
        }
        if (CollectionUtils.isEmpty(arrayList)) {
            throw new ServiceException(Status.TASK_COMPLEMENT_DATA_DATE_ERROR);
        }
        if (executionOrder.equals(ExecutionOrder.DESC_ORDER)) {
            Collections.sort(arrayList, Collections.reverseOrder());
        } else {
            Collections.sort(arrayList);
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$dolphinscheduler$common$enums$RunMode[runMode2.ordinal()]) {
            case 1:
                log.info("RunMode of {} command is serial run, processDefinitionCode:{}.", command.getCommandType().getDescp(), Long.valueOf(command.getProcessDefinitionCode()));
                i = createComplementCommand(l, command, map, arrayList, queryReleaseSchedulerListByProcessDefinitionCode, complementDependentMode, z);
                break;
            case 2:
                log.info("RunMode of {} command is parallel run, processDefinitionCode:{}.", command.getCommandType().getDescp(), Long.valueOf(command.getProcessDefinitionCode()));
                if (CollectionUtils.isNotEmpty(arrayList)) {
                    int size = arrayList.size();
                    if (num != null && num.intValue() != 0) {
                        size = Math.min(size, num.intValue());
                    }
                    log.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", Integer.valueOf(size));
                    List<ZonedDateTime>[] listArr = new List[size];
                    for (int i2 = 0; i2 < arrayList.size(); i2++) {
                        if (Objects.isNull(listArr[i2 % size])) {
                            listArr[i2 % size] = new ArrayList();
                        }
                        listArr[i2 % size].add(arrayList.get(i2));
                    }
                    for (List<ZonedDateTime> list : listArr) {
                        i = createComplementCommand(l, command, map, list, queryReleaseSchedulerListByProcessDefinitionCode, complementDependentMode, z);
                    }
                    break;
                }
                break;
        }
        log.info("Create complement command count:{}, Create dependent complement command count:{}", Integer.valueOf(i), 0);
        return i;
    }

    public int createComplementDependentCommand(List<Schedule> list, Command command, boolean z) {
        int i = 0;
        try {
            Command command2 = (Command) BeanUtils.cloneBean(command);
            List<DependentProcessDefinition> complementDependentDefinitionList = getComplementDependentDefinitionList(command2.getProcessDefinitionCode(), CronUtils.getMaxCycle(list.get(0).getCrontab()), command2.getWorkerGroup(), z);
            command2.setTaskDependType(TaskDependType.TASK_POST);
            for (DependentProcessDefinition dependentProcessDefinition : complementDependentDefinitionList) {
                command2.setId((Integer) null);
                command2.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
                command2.setProcessDefinitionVersion(dependentProcessDefinition.getProcessDefinitionVersion());
                command2.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
                Map map = JSONUtils.toMap(command2.getCommandParam());
                map.put("StartNodeList", String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
                command2.setCommandParam(JSONUtils.toJsonString(map));
                log.info("Creating complement dependent command, commandInfo:{}.", command);
                i += this.commandService.createCommand(command2);
            }
            return i;
        } catch (Exception e) {
            log.error("Copy dependent command error.", e);
            return 0;
        }
    }

    private List<DependentProcessDefinition> getComplementDependentDefinitionList(long j, CycleEnum cycleEnum, String str, boolean z) {
        List<DependentProcessDefinition> checkDependentProcessDefinitionValid = checkDependentProcessDefinitionValid(this.processService.queryDependentProcessDefinitionByProcessDefinitionCode(j), cycleEnum, str, j);
        if (checkDependentProcessDefinitionValid.isEmpty()) {
            return checkDependentProcessDefinitionValid;
        }
        if (z) {
            ArrayList arrayList = new ArrayList(checkDependentProcessDefinitionValid);
            while (true) {
                List list = (List) arrayList.stream().flatMap(dependentProcessDefinition -> {
                    return checkDependentProcessDefinitionValid(this.processService.queryDependentProcessDefinitionByProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode()), cycleEnum, str, dependentProcessDefinition.getProcessDefinitionCode()).stream();
                }).collect(Collectors.toList());
                if (list.isEmpty()) {
                    break;
                }
                checkDependentProcessDefinitionValid.addAll(list);
                arrayList = new ArrayList(list);
            }
        }
        return checkDependentProcessDefinitionValid;
    }

    private List<DependentProcessDefinition> checkDependentProcessDefinitionValid(List<DependentProcessDefinition> list, CycleEnum cycleEnum, String str, long j) {
        ArrayList arrayList = new ArrayList();
        Map<Long, String> queryWorkerGroupByProcessDefinitionCodes = this.workerGroupService.queryWorkerGroupByProcessDefinitionCodes((List) list.stream().map((v0) -> {
            return v0.getProcessDefinitionCode();
        }).collect(Collectors.toList()));
        for (DependentProcessDefinition dependentProcessDefinition : list) {
            if (dependentProcessDefinition.getDependentCycle(j) == cycleEnum) {
                if (queryWorkerGroupByProcessDefinitionCodes.get(Long.valueOf(dependentProcessDefinition.getProcessDefinitionCode())) == null) {
                    dependentProcessDefinition.setWorkerGroup(str);
                }
                arrayList.add(dependentProcessDefinition);
            }
        }
        return arrayList;
    }

    private boolean isValidateScheduleTime(String str) {
        Map map = JSONUtils.toMap(str);
        if (map == null) {
            return false;
        }
        if (map.containsKey("complementScheduleDateList") && map.get("complementScheduleDateList") == null) {
            return false;
        }
        if (!map.containsKey("complementStartDate")) {
            return true;
        }
        String str2 = (String) map.get("complementStartDate");
        String str3 = (String) map.get("complementEndDate");
        if (str2 == null || str3 == null) {
            return false;
        }
        try {
            ZonedDateTime stringToZoneDateTime = DateUtils.stringToZoneDateTime(str2);
            ZonedDateTime stringToZoneDateTime2 = DateUtils.stringToZoneDateTime(str3);
            if (stringToZoneDateTime == null || stringToZoneDateTime2 == null) {
                return false;
            }
            if (!stringToZoneDateTime.isAfter(stringToZoneDateTime2)) {
                return true;
            }
            log.error("Complement data parameter error, start time should be before end time, startDate:{}, endDate:{}.", stringToZoneDateTime, stringToZoneDateTime2);
            return false;
        } catch (Exception e) {
            log.warn("Parse schedule time error, startDate:{}, endDate:{}.", str2, str3);
            return false;
        }
    }

    private String removeDuplicates(String str) {
        if (StringUtils.isNotEmpty(str)) {
            return (String) Arrays.stream(str.split(",")).map((v0) -> {
                return v0.trim();
            }).distinct().collect(Collectors.joining(","));
        }
        return null;
    }

    @Override // org.apache.dolphinscheduler.api.service.ExecutorService
    public WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer num) {
        ProcessInstance processInstance = (ProcessInstance) this.processService.findProcessInstanceDetailById(num.intValue()).orElse(null);
        if (processInstance != null) {
            return ((IWorkflowInstanceService) SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(processInstance.getHost(), IWorkflowInstanceService.class)).getWorkflowExecutingData(num);
        }
        log.error("Process instance does not exist, processInstanceId:{}.", num);
        return null;
    }

    @Override // org.apache.dolphinscheduler.api.service.ExecutorService
    public void execStreamTaskInstance(User user, long j, long j2, int i, int i2, String str, String str2, Long l, Map<String, String> map, int i3) {
        this.projectService.checkProjectAndAuth(user, this.projectMapper.queryByCode(j), j, ApiFuncIdentificationConstant.WORKFLOW_START);
        checkValidTenant(str2);
        checkMasterExists();
        Server server = this.monitorService.getServerListFromRegistry(true).get(0);
        StreamingTaskTriggerRequest streamingTaskTriggerRequest = new StreamingTaskTriggerRequest();
        streamingTaskTriggerRequest.setExecutorId(user.getId().intValue());
        streamingTaskTriggerRequest.setExecutorName(user.getUserName());
        streamingTaskTriggerRequest.setProjectCode(j);
        streamingTaskTriggerRequest.setTaskDefinitionCode(j2);
        streamingTaskTriggerRequest.setTaskDefinitionVersion(i);
        streamingTaskTriggerRequest.setWorkerGroup(str);
        streamingTaskTriggerRequest.setTenantCode(str2);
        streamingTaskTriggerRequest.setWarningGroupId(i2);
        streamingTaskTriggerRequest.setEnvironmentCode(l);
        streamingTaskTriggerRequest.setStartParams(map);
        streamingTaskTriggerRequest.setDryRun(i3);
        IStreamingTaskOperator iStreamingTaskOperator = (IStreamingTaskOperator) SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(server.getHost() + ":" + server.getPort(), IStreamingTaskOperator.class);
        StreamingTaskTriggerResponse triggerStreamingTask = iStreamingTaskOperator.triggerStreamingTask(streamingTaskTriggerRequest);
        if (triggerStreamingTask.isSuccess()) {
            log.info("Send task execute start command complete, response is {}.", iStreamingTaskOperator);
        } else {
            log.error("Start to execute stream task instance error, projectCode:{}, taskDefinitionCode:{}, taskVersion:{}, response: {}.", new Object[]{Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i), triggerStreamingTask});
            throw new ServiceException(Status.START_TASK_INSTANCE_ERROR);
        }
    }
}
