From 24864512212a94f53da32eb20dc56673f960547a Mon Sep 17 00:00:00 2001 From: Tq <36755957+Tianqi-Dotes@users.noreply.github.com> Date: Sun, 20 Dec 2020 21:52:52 +0800 Subject: [PATCH] remove AbstractTask.getCurTaskParamsClass()and replace with TaskParametersUtils.getParameters() (#4262) * remove getCurTaskParamsClass() in AbstractTask.java and replace it with TaskParametersUtils.getParameters() * remove unused imports in AbstractTask.java * reformat --- .../server/worker/task/AbstractTask.java | 110 +++++------------- 1 file changed, 31 insertions(+), 79 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index fe60f4a82c..de7d35f404 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -16,6 +16,8 @@ */ package org.apache.dolphinscheduler.server.worker.task; +import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; @@ -23,27 +25,17 @@ import org.apache.dolphinscheduler.common.enums.TaskRecordStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; -import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; -import org.apache.dolphinscheduler.common.task.datax.DataxParameters; -import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; -import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; -import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; -import org.apache.dolphinscheduler.common.task.python.PythonParameters; -import org.apache.dolphinscheduler.common.task.shell.ShellParameters; -import org.apache.dolphinscheduler.common.task.spark.SparkParameters; -import org.apache.dolphinscheduler.common.task.sql.SqlParameters; -import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; -import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.TaskRecordDao; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; + import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; import java.util.List; import java.util.Map; -import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER; +import org.slf4j.Logger; /** * executive task @@ -61,13 +53,13 @@ public abstract class AbstractTask { TaskExecutionContext taskExecutionContext; /** - * log record + * log record */ protected Logger logger; /** - * SHELL process pid + * SHELL process pid */ protected int processId; @@ -83,14 +75,15 @@ public abstract class AbstractTask { protected volatile boolean cancel = false; /** - * exit code + * exit code */ protected volatile int exitStatusCode = -1; /** * constructor + * * @param taskExecutionContext taskExecutionContext - * @param logger logger + * @param logger logger */ protected AbstractTask(TaskExecutionContext taskExecutionContext, Logger logger) { this.taskExecutionContext = taskExecutionContext; @@ -99,6 +92,7 @@ public abstract class AbstractTask { /** * init task + * * @throws Exception exception */ public void init() throws Exception { @@ -106,6 +100,7 @@ public abstract class AbstractTask { /** * task handle + * * @throws Exception exception */ public abstract void handle() throws Exception; @@ -113,6 +108,7 @@ public abstract class AbstractTask { /** * cancel application + * * @param status status * @throws Exception exception */ @@ -122,6 +118,7 @@ public abstract class AbstractTask { /** * log handle + * * @param logs log list */ public void logHandle(List logs) { @@ -136,13 +133,15 @@ public abstract class AbstractTask { public void setVarPool(String varPool) { this.varPool = varPool; } + public String getVarPool() { return varPool; } /** * get exit status code - * @return exit status code + * + * @return exit status code */ public int getExitStatusCode() { return exitStatusCode; @@ -170,21 +169,21 @@ public abstract class AbstractTask { /** * get task parameters + * * @return AbstractParameters */ public abstract AbstractParameters getParameters(); - /** * result processing */ - public void after(){ - if (getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ + public void after() { + if (getExitStatusCode() == Constants.EXIT_CODE_SUCCESS) { // task recor flat : if true , start up qianfan if (TaskRecordDao.getTaskRecordFlag() - && TaskType.typeIsNormalTask(taskExecutionContext.getTaskType())){ - AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskExecutionContext.getTaskParams(), getCurTaskParamsClass()); + && TaskType.typeIsNormalTask(taskExecutionContext.getTaskType())) { + AbstractParameters params = TaskParametersUtils.getParameters(taskExecutionContext.getTaskType(), taskExecutionContext.getTaskParams()); // replace placeholder Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), @@ -193,81 +192,34 @@ public abstract class AbstractTask { CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); if (paramsMap != null && !paramsMap.isEmpty() - && paramsMap.containsKey("v_proc_date")){ + && paramsMap.containsKey("v_proc_date")) { String vProcDate = paramsMap.get("v_proc_date").getValue(); - if (!StringUtils.isEmpty(vProcDate)){ + if (!StringUtils.isEmpty(vProcDate)) { TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskExecutionContext.getTaskName(), vProcDate); - logger.info("task record status : {}",taskRecordState); - if (taskRecordState == TaskRecordStatus.FAILURE){ + logger.info("task record status : {}", taskRecordState); + if (taskRecordState == TaskRecordStatus.FAILURE) { setExitStatusCode(Constants.EXIT_CODE_FAILURE); } } } } - }else if (getExitStatusCode() == Constants.EXIT_CODE_KILL){ + } else if (getExitStatusCode() == Constants.EXIT_CODE_KILL) { setExitStatusCode(Constants.EXIT_CODE_KILL); - }else { + } else { setExitStatusCode(Constants.EXIT_CODE_FAILURE); } } - - - /** - * get current task parameter class - * @return Task Params Class - */ - private Class getCurTaskParamsClass(){ - Class paramsClass = null; - // get task type - TaskType taskType = TaskType.valueOf(taskExecutionContext.getTaskType()); - switch (taskType){ - case SHELL: - paramsClass = ShellParameters.class; - break; - case SQL: - paramsClass = SqlParameters.class; - break; - case PROCEDURE: - paramsClass = ProcedureParameters.class; - break; - case MR: - paramsClass = MapreduceParameters.class; - break; - case SPARK: - paramsClass = SparkParameters.class; - break; - case FLINK: - paramsClass = FlinkParameters.class; - break; - case PYTHON: - paramsClass = PythonParameters.class; - break; - case DATAX: - paramsClass = DataxParameters.class; - break; - case SQOOP: - paramsClass = SqoopParameters.class; - break; - case CONDITIONS: - paramsClass = ConditionsParameters.class; - break; - default: - logger.error("not support this task type: {}", taskType); - throw new IllegalArgumentException("not support this task type"); - } - return paramsClass; - } - /** * get exit status according to exitCode + * * @return exit status */ - public ExecutionStatus getExitStatus(){ + public ExecutionStatus getExitStatus() { ExecutionStatus status; - switch (getExitStatusCode()){ + switch (getExitStatusCode()) { case Constants.EXIT_CODE_SUCCESS: status = ExecutionStatus.SUCCESS; break;