From 46a5d17bf673427c8ecc1bfce9864dbc7625b269 Mon Sep 17 00:00:00 2001 From: wind Date: Tue, 11 Jan 2022 22:51:00 +0800 Subject: [PATCH] [cherry-pick-2.0.3][Improvement][TaskLog] Unified task log #7831 (#7926) * [cherry-pick][Improvement][TaskLog] Unified task log #7831 * [cherry-pick][Improvement][TaskLog] Unified task log #7831 * fix thread name Co-authored-by: caishunfeng <534328519@qq.com> --- .../master/runner/WorkflowExecuteThread.java | 7 +- .../master/runner/task/BaseTaskProcessor.java | 82 +++++++++++++++---- .../runner/task/CommonTaskProcessor.java | 20 ++--- .../runner/task/ConditionTaskProcessor.java | 29 ++----- .../runner/task/DependentTaskProcessor.java | 17 ++-- .../master/runner/task/ITaskProcessor.java | 4 +- .../master/runner/task/SubTaskProcessor.java | 20 +++-- .../runner/task/SwitchTaskProcessor.java | 19 ++--- .../server/master/runner/task/TaskAction.java | 4 +- 9 files changed, 108 insertions(+), 94 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 5988839a5c..79076868be 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -359,7 +359,7 @@ public class WorkflowExecuteThread implements Runnable { taskFinished(task); } else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) { ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId()); - iTaskProcessor.run(); + iTaskProcessor.action(TaskAction.RUN); if (iTaskProcessor.taskState().typeIsFinished()) { task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); @@ -630,11 +630,12 @@ public class WorkflowExecuteThread implements Runnable { && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) { notifyProcessHostUpdate(taskInstance); } - boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getMasterTaskCommitRetryTimes(), masterConfig.getMasterTaskCommitInterval()); + taskProcessor.init(taskInstance, processInstance); + boolean submit = taskProcessor.action(TaskAction.SUBMIT); if (submit) { this.taskInstanceHashMap.put(taskInstance.getId(), taskInstance.getTaskCode(), taskInstance); activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor); - taskProcessor.run(); + taskProcessor.action(TaskAction.RUN); addTimeoutCheck(taskInstance); addRetryCheck(taskInstance); TaskDefinition taskDefinition = processService.findTaskDefinition( diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 4446485b6c..508feb5384 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParamete import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; @@ -39,6 +40,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; @@ -68,7 +70,7 @@ import com.google.common.base.Strings; public abstract class BaseTaskProcessor implements ITaskProcessor { - protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME, getClass())); + protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass())); protected boolean killed = false; @@ -78,10 +80,32 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { protected TaskInstance taskInstance = null; - protected ProcessInstance processInstance; + protected ProcessInstance processInstance = null; + + protected int maxRetryTimes; + + protected int commitInterval; protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); + protected MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class); + + protected String threadLoggerInfoName; + + @Override + public void init(TaskInstance taskInstance, ProcessInstance processInstance) { + if (processService == null) { + processService = SpringApplicationContext.getBean(ProcessService.class); + } + if (masterConfig == null) { + masterConfig = SpringApplicationContext.getBean(MasterConfig.class); + } + this.taskInstance = taskInstance; + this.processInstance = processInstance; + this.maxRetryTimes = masterConfig.getMasterTaskCommitRetryTimes(); + this.commitInterval = masterConfig.getMasterTaskCommitInterval(); + } + /** * persist task * @@ -91,21 +115,16 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { /** * pause task, common tasks donot need this. - * - * @return */ protected abstract boolean pauseTask(); /** * kill task, all tasks need to realize this function - * - * @return */ protected abstract boolean killTask(); /** * task timeout process - * @return */ protected abstract boolean taskTimeout(); @@ -119,12 +138,22 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { return persistTask(taskAction); } - @Override - public void run() { - } + /* + * submit task + */ + protected abstract boolean submitTask(); + + /** + * run task + */ + protected abstract boolean runTask(); @Override public boolean action(TaskAction taskAction) { + String threadName = Thread.currentThread().getName(); + if (StringUtils.isNotEmpty(threadLoggerInfoName)) { + Thread.currentThread().setName(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME_FORMAT, threadLoggerInfoName)); + } switch (taskAction) { case STOP: @@ -133,13 +162,27 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { return pause(); case TIMEOUT: return timeout(); + case SUBMIT: + return submit(); + case RUN: + return run(); default: logger.error("unknown task action: {}", taskAction.toString()); - } + + // reset thread name + Thread.currentThread().setName(threadName); return false; } + protected boolean submit() { + return submitTask(); + } + + protected boolean run() { + return runTask(); + } + protected boolean timeout() { if (timeout) { return true; @@ -148,9 +191,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { return timeout; } - /** - * @return - */ protected boolean pause() { if (paused) { return true; @@ -172,6 +212,18 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { return null; } + /** + * set master task running logger. + */ + public void setTaskExecutionLogger() { + threadLoggerInfoName = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, + processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion(), + taskInstance.getProcessInstanceId(), + taskInstance.getId()); + Thread.currentThread().setName(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME_FORMAT, threadLoggerInfoName)); + } + /** * get TaskExecutionContext * @@ -321,7 +373,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { // whether udf type boolean udfTypeFlag = Enums.getIfPresent(UdfType.class, Strings.nullToEmpty(sqlParameters.getType())).isPresent() - && !StringUtils.isEmpty(sqlParameters.getUdfs()); + && !StringUtils.isEmpty(sqlParameters.getUdfs()); if (udfTypeFlag) { String[] udfFunIds = sqlParameters.getUdfs().split(","); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index 14bb3afd56..ca78ea3863 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; @@ -39,8 +38,6 @@ import org.apache.commons.lang.StringUtils; import java.util.Date; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; /** @@ -51,25 +48,17 @@ public class CommonTaskProcessor extends BaseTaskProcessor { @Autowired private TaskPriorityQueue taskUpdateQueue; - @Autowired - MasterConfig masterConfig; - @Autowired NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class); - /** - * logger of MasterBaseTaskExecThread - */ - protected Logger logger = LoggerFactory.getLogger(getClass()); - @Override - public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) { - this.processInstance = processInstance; - this.taskInstance = processService.submitTask(task, maxRetryTimes, commitInterval); + public boolean submitTask() { + this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval); if (this.taskInstance == null) { return false; } + setTaskExecutionLogger(); return dispatchTask(taskInstance, processInstance); } @@ -79,7 +68,8 @@ public class CommonTaskProcessor extends BaseTaskProcessor { } @Override - public void run() { + public boolean runTask() { + return true; } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java index 584e484123..c303a80de4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java @@ -26,15 +26,10 @@ import org.apache.dolphinscheduler.common.model.DependentItem; import org.apache.dolphinscheduler.common.model.DependentTaskModel; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.utils.DependentUtils; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.LogUtils; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.spi.task.TaskConstants; import java.util.ArrayList; import java.util.Date; @@ -42,23 +37,16 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * condition task processor */ public class ConditionTaskProcessor extends BaseTaskProcessor { - protected static final Logger logger = LoggerFactory.getLogger(TaskConstants.TASK_LOG_LOGGER_NAME); - /** * dependent parameters */ private DependentParameters dependentParameters; - ProcessInstance processInstance; - /** * condition result */ @@ -69,14 +57,11 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { */ private Map completeTaskList = new ConcurrentHashMap<>(); - MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class); - private TaskDefinition taskDefinition; @Override - public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { - this.processInstance = processInstance; - this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval); + public boolean submitTask() { + this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval); if (this.taskInstance == null) { return false; @@ -85,12 +70,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion() ); - String threadLoggerInfoName = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, - processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion(), - taskInstance.getProcessInstanceId(), - taskInstance.getId()); - Thread.currentThread().setName(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME_FORMAT,threadLoggerInfoName)); + setTaskExecutionLogger(); initTaskParameters(); logger.info("dependent task start"); return true; @@ -102,13 +82,14 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { } @Override - public void run() { + public boolean runTask() { if (conditionResult.equals(DependResult.WAITING)) { setConditionResult(); endTask(); } else { endTask(); } + return true; } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index 0f84a5f663..100c75f2e6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -27,13 +27,9 @@ import org.apache.dolphinscheduler.common.model.DependentTaskModel; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.utils.DependentUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.DependentExecute; import org.apache.dolphinscheduler.server.utils.LogUtils; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.ArrayList; import java.util.Date; @@ -69,18 +65,13 @@ public class DependentTaskProcessor extends BaseTaskProcessor { DependResult result; - ProcessInstance processInstance; TaskDefinition taskDefinition; - MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class); - boolean allDependentItemFinished; @Override - public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { - this.processInstance = processInstance; - this.taskInstance = task; - this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval); + public boolean submitTask() { + this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval); if (this.taskInstance == null) { return false; @@ -88,6 +79,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { taskDefinition = processService.findTaskDefinition( taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion() ); + setTaskExecutionLogger(); taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), taskInstance.getProcessInstanceId(), @@ -106,7 +98,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { } @Override - public void run() { + public boolean runTask() { if (!allDependentItemFinished) { allDependentItemFinished = allDependentTaskFinish(); } @@ -114,6 +106,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { getTaskDependResult(); endTask(); } + return true; } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java index aa1e490a09..41fb0d0acd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java @@ -28,14 +28,12 @@ public interface ITaskProcessor { boolean persist(TaskAction taskAction); - void run(); + void init(TaskInstance taskInstance, ProcessInstance processInstance); boolean action(TaskAction taskAction); String getType(); - boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval); - ExecutionStatus taskState(); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index 02f08a828c..16b21f0cc0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -22,9 +22,9 @@ import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; +import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.Date; @@ -36,8 +36,6 @@ import java.util.concurrent.locks.ReentrantLock; */ public class SubTaskProcessor extends BaseTaskProcessor { - private ProcessInstance processInstance; - private ProcessInstance subProcessInstance = null; private TaskDefinition taskDefinition; @@ -49,17 +47,22 @@ public class SubTaskProcessor extends BaseTaskProcessor { private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class); @Override - public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { - this.processInstance = processInstance; + public boolean submitTask() { taskDefinition = processService.findTaskDefinition( - task.getTaskCode(), task.getTaskDefinitionVersion() + taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion() ); - this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval); + this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval); if (this.taskInstance == null) { return false; } + setTaskExecutionLogger(); + taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion(), + taskInstance.getProcessInstanceId(), + taskInstance.getId())); + return true; } @@ -69,7 +72,7 @@ public class SubTaskProcessor extends BaseTaskProcessor { } @Override - public void run() { + public boolean runTask() { try { this.runLock.lock(); if (setSubWorkFlow()) { @@ -83,6 +86,7 @@ public class SubTaskProcessor extends BaseTaskProcessor { } finally { this.runLock.unlock(); } + return true; } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index c48a711008..7134ff0fa1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -25,13 +25,10 @@ import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters; import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang.StringUtils; @@ -48,27 +45,22 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*"; - private TaskInstance taskInstance; - - private ProcessInstance processInstance; TaskDefinition taskDefinition; - MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class); - /** * switch result */ private DependResult conditionResult; @Override - public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { - - this.processInstance = processInstance; - this.taskInstance = processService.submitTask(taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval); + public boolean submitTask() { + this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval); if (this.taskInstance == null) { return false; } + setTaskExecutionLogger(); + taskDefinition = processService.findTaskDefinition( taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion() ); @@ -84,7 +76,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { } @Override - public void run() { + public boolean runTask() { try { if (!this.taskState().typeIsFinished() && setSwitchResult()) { endTaskState(); @@ -95,6 +87,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { this.taskInstance.getId(), e); } + return true; } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java index 42c88463b2..0c4db767e9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java @@ -23,5 +23,7 @@ package org.apache.dolphinscheduler.server.master.runner.task; public enum TaskAction { PAUSE, STOP, - TIMEOUT + TIMEOUT, + SUBMIT, + RUN }