package org.apache.dolphinscheduler.plugin.task.spark;

import io.fabric8.kubernetes.client.Config;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/dolphinscheduler/plugin/task/spark/SparkTask.class */
public class SparkTask extends AbstractYarnTask {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(SparkTask.class);
    private SparkParameters sparkParameters;
    private final TaskExecutionContext taskExecutionContext;

    public SparkTask(TaskExecutionContext taskExecutionContext) {
        super(taskExecutionContext);
        this.taskExecutionContext = taskExecutionContext;
    }

    public void init() {
        this.sparkParameters = (SparkParameters) JSONUtils.parseObject(this.taskExecutionContext.getTaskParams(), SparkParameters.class);
        if (null == this.sparkParameters) {
            log.error("Spark params is null");
        } else {
            if (!this.sparkParameters.checkParameters()) {
                throw new RuntimeException("spark task params is not valid");
            }
            log.info("Initialize spark task params {}", JSONUtils.toPrettyJsonString(this.sparkParameters));
        }
    }

    protected String getScript() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.sparkParameters.getProgramType() == ProgramType.SQL ? SparkConstants.SPARK_SQL_COMMAND : SparkConstants.SPARK_SUBMIT_COMMAND);
        arrayList.addAll(populateSparkOptions());
        return (String) arrayList.stream().collect(Collectors.joining(" "));
    }

    protected Map<String, String> getProperties() {
        return ParameterUtils.convert(this.taskExecutionContext.getPrepareParamsMap());
    }

    private List<String> populateSparkOptions() {
        String rawScript;
        ArrayList arrayList = new ArrayList();
        arrayList.add(SparkConstants.MASTER);
        String deployMode = StringUtils.isNotEmpty(this.sparkParameters.getDeployMode()) ? this.sparkParameters.getDeployMode() : SparkConstants.DEPLOY_MODE_LOCAL;
        boolean isNotEmpty = StringUtils.isNotEmpty(this.sparkParameters.getNamespace());
        String str = isNotEmpty ? SparkConstants.SPARK_ON_K8S_MASTER_PREFIX + Config.fromKubeconfig(this.taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml()).getMasterUrl() : SparkConstants.SPARK_ON_YARN;
        if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
            arrayList.add(str);
            arrayList.add(SparkConstants.DEPLOY_MODE);
        }
        arrayList.add(deployMode);
        ProgramType programType = this.sparkParameters.getProgramType();
        String mainClass = this.sparkParameters.getMainClass();
        if (programType != ProgramType.PYTHON && programType != ProgramType.SQL && StringUtils.isNotEmpty(mainClass)) {
            arrayList.add(SparkConstants.MAIN_CLASS);
            arrayList.add(mainClass);
        }
        populateSparkResourceDefinitions(arrayList);
        String appName = this.sparkParameters.getAppName();
        if (StringUtils.isNotEmpty(appName)) {
            arrayList.add(SparkConstants.SPARK_NAME);
            arrayList.add(ArgsUtils.escape(appName));
        }
        String others = this.sparkParameters.getOthers();
        if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode) && (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_YARN_QUEUE))) {
            String yarnQueue = this.sparkParameters.getYarnQueue();
            if (StringUtils.isNotEmpty(yarnQueue)) {
                arrayList.add(SparkConstants.SPARK_YARN_QUEUE);
                arrayList.add(yarnQueue);
            }
        }
        if (StringUtils.isNotEmpty(others)) {
            arrayList.add(others);
        }
        if (isNotEmpty) {
            arrayList.add(String.format(SparkConstants.DRIVER_LABEL_CONF, "dolphinscheduler-label", this.taskExecutionContext.getTaskAppId()));
            arrayList.add(String.format(SparkConstants.SPARK_KUBERNETES_NAMESPACE, JSONUtils.toMap(this.sparkParameters.getNamespace()).get("name")));
        }
        ResourceInfo mainJar = this.sparkParameters.getMainJar();
        if (programType != ProgramType.SQL) {
            arrayList.add(this.taskExecutionContext.getResourceContext().getResourceItem(mainJar.getResourceName()).getResourceAbsolutePathInLocal());
        }
        String mainArgs = this.sparkParameters.getMainArgs();
        if (programType != ProgramType.SQL && StringUtils.isNotEmpty(mainArgs)) {
            arrayList.add(mainArgs);
        }
        if (ProgramType.SQL == programType) {
            String str2 = "";
            arrayList.add(SparkConstants.SQL_FROM_FILE);
            if (SparkConstants.TYPE_FILE.equals(this.sparkParameters.getSqlExecutionType())) {
                List<ResourceInfo> resourceList = this.sparkParameters.getResourceList();
                if (resourceList.size() > 1) {
                    log.warn("more than 1 files detected, use the first one by default");
                }
                try {
                    str2 = resourceList.get(0).getResourceName();
                    rawScript = FileUtils.readFileToString(new File(this.taskExecutionContext.getResourceContext().getResourceItem(str2).getResourceAbsolutePathInLocal()), StandardCharsets.UTF_8);
                } catch (IOException e) {
                    log.error("read sql content from file {} error ", str2, e);
                    throw new TaskException("read sql content error", e);
                }
            } else {
                rawScript = this.sparkParameters.getRawScript();
            }
            arrayList.add(generateScriptFile(rawScript));
        }
        return arrayList;
    }

    private void populateSparkResourceDefinitions(List<String> list) {
        int driverCores = this.sparkParameters.getDriverCores();
        if (driverCores > 0) {
            list.add(String.format(SparkConstants.DRIVER_CORES, Integer.valueOf(driverCores)));
        }
        String driverMemory = this.sparkParameters.getDriverMemory();
        if (StringUtils.isNotEmpty(driverMemory)) {
            list.add(String.format(SparkConstants.DRIVER_MEMORY, driverMemory));
        }
        int numExecutors = this.sparkParameters.getNumExecutors();
        if (numExecutors > 0) {
            list.add(String.format(SparkConstants.NUM_EXECUTORS, Integer.valueOf(numExecutors)));
        }
        int executorCores = this.sparkParameters.getExecutorCores();
        if (executorCores > 0) {
            list.add(String.format(SparkConstants.EXECUTOR_CORES, Integer.valueOf(executorCores)));
        }
        String executorMemory = this.sparkParameters.getExecutorMemory();
        if (StringUtils.isNotEmpty(executorMemory)) {
            list.add(String.format(SparkConstants.EXECUTOR_MEMORY, executorMemory));
        }
    }

    private String generateScriptFile(String str) {
        String format = String.format("%s/%s_node.sql", this.taskExecutionContext.getExecutePath(), this.taskExecutionContext.getTaskAppId());
        File file = new File(format);
        Path path = file.toPath();
        if (!Files.exists(path, new LinkOption[0])) {
            String replaceParam = replaceParam(str);
            log.info("raw script : {}", replaceParam);
            log.info("task execute path : {}", this.taskExecutionContext.getExecutePath());
            FileAttribute<Set<PosixFilePermission>> asFileAttribute = PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxr-xr-x"));
            try {
                if (SystemUtils.IS_OS_WINDOWS) {
                    Files.createFile(path, new FileAttribute[0]);
                } else {
                    if (!file.getParentFile().exists()) {
                        file.getParentFile().mkdirs();
                    }
                    Files.createFile(path, asFileAttribute);
                }
                Files.write(path, replaceParam.getBytes(), StandardOpenOption.APPEND);
            } catch (IOException e) {
                throw new RuntimeException("generate spark sql script error", e);
            }
        }
        return format;
    }

    private String replaceParam(String str) {
        return ParameterUtils.convertParameterPlaceholders(str.replaceAll("\\r\\n", System.lineSeparator()), ParameterUtils.convert(this.taskExecutionContext.getPrepareParamsMap()));
    }

    public AbstractParameters getParameters() {
        return this.sparkParameters;
    }
}
