From 4b22ad6cf6c584a35b01bc9d3d548419dda676a2 Mon Sep 17 00:00:00 2001 From: wind Date: Tue, 4 Jan 2022 13:38:31 +0800 Subject: [PATCH] [Implement][MasterServer]TaskProcessor code optimization (#7754) * task processor optimization * fix test Co-authored-by: caishunfeng <534328519@qq.com> --- .../master/runner/WorkflowExecuteThread.java | 20 +++--- .../master/runner/task/BaseTaskProcessor.java | 70 ++++++++++++++++--- .../runner/task/CommonTaskProcessFactory.java | 36 ---------- .../runner/task/CommonTaskProcessor.java | 43 +++++------- .../task/ConditionTaskProcessFactory.java | 35 ---------- .../runner/task/ConditionTaskProcessor.java | 30 ++++---- .../task/DependentTaskProcessFactory.java | 36 ---------- .../runner/task/DependentTaskProcessor.java | 32 ++++----- .../runner/task/ITaskProcessFactory.java | 25 ------- .../master/runner/task/ITaskProcessor.java | 6 +- .../runner/task/SubTaskProcessFactory.java | 35 ---------- .../master/runner/task/SubTaskProcessor.java | 27 +++---- .../runner/task/SwitchTaskProcessFactory.java | 36 ---------- .../runner/task/SwitchTaskProcessor.java | 27 ++++--- .../server/master/runner/task/TaskAction.java | 7 +- .../runner/task/TaskProcessorFactory.java | 16 ++--- .../master/WorkflowExecuteThreadTest.java | 12 +++- .../runner/task/TaskProcessorFactoryTest.java | 2 +- 18 files changed, 164 insertions(+), 331 deletions(-) delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index c23da356c2..5ca705db78 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -54,7 +54,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.Schedule; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -333,7 +332,8 @@ public class WorkflowExecuteThread { ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId()); TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId()); - taskProcessor.dispatch(taskInstance, processInstance); + taskProcessor.init(taskInstance, processInstance); + taskProcessor.action(TaskAction.DISPATCH); this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); return true; } @@ -343,7 +343,8 @@ public class WorkflowExecuteThread { ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId()); TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId()); - taskProcessor.dispatch(taskInstance, processInstance); + taskProcessor.init(taskInstance, processInstance); + taskProcessor.action(TaskAction.DISPATCH); return true; } } @@ -406,7 +407,7 @@ public class WorkflowExecuteThread { } } else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) { ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId()); - iTaskProcessor.run(); + iTaskProcessor.action(TaskAction.RUN); if (iTaskProcessor.taskState().typeIsFinished()) { task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); @@ -800,15 +801,18 @@ public class WorkflowExecuteThread { */ private TaskInstance submitTaskExec(TaskInstance taskInstance) { try { + // package task instance before submit + processService.packageTaskInstance(taskInstance, processInstance); + ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType()); + taskProcessor.init(taskInstance, processInstance); + if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) { notifyProcessHostUpdate(taskInstance); } - // package task instance before submit - processService.packageTaskInstance(taskInstance, processInstance); - boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getTaskCommitRetryTimes(), masterConfig.getTaskCommitInterval(), masterConfig.isTaskLogger()); + boolean submit = taskProcessor.action(TaskAction.SUBMIT); if (!submit) { logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!", processInstance.getId(), processInstance.getName(), @@ -818,7 +822,7 @@ public class WorkflowExecuteThread { validTaskMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance.getId()); taskInstanceMap.put(taskInstance.getId(), taskInstance); activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor); - taskProcessor.run(); + taskProcessor.action(TaskAction.RUN); stateWheelExecuteThread.addTask4TimeoutCheck(taskInstance); stateWheelExecuteThread.addTask4RetryCheck(taskInstance); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 0b3d96bfc5..8d62f1fda0 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; @@ -80,7 +81,30 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { protected ProcessInstance processInstance; - protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);; + protected int maxRetryTimes; + + protected int commitInterval; + + protected boolean isTaskLogger; + + protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); + + protected MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class); + + @Override + public void init(TaskInstance taskInstance, ProcessInstance processInstance) { + if (processService == null) { + processService = SpringApplicationContext.getBean(ProcessService.class); + } + if (masterConfig == null) { + masterConfig = SpringApplicationContext.getBean(MasterConfig.class); + } + this.taskInstance = taskInstance; + this.processInstance = processInstance; + this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes(); + this.commitInterval = masterConfig.getTaskCommitInterval(); + this.isTaskLogger = masterConfig.isTaskLogger(); + } /** * pause task, common tasks donot need this. @@ -97,9 +121,21 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { */ protected abstract boolean taskTimeout(); - @Override - public void run() { - } + /** + * submit task + */ + protected abstract boolean submitTask(); + + /** + * run task + */ + protected abstract boolean runTask(); + + /** + * dispatch task + * @return + */ + protected abstract boolean dispatchTask(); @Override public boolean action(TaskAction taskAction) { @@ -111,6 +147,12 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { return pause(); case TIMEOUT: return timeout(); + case SUBMIT: + return submit(); + case RUN: + return run(); + case DISPATCH: + return dispatch(); default: logger.error("unknown task action: {}", taskAction); @@ -118,6 +160,18 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { return false; } + protected boolean submit() { + return submitTask(); + } + + protected boolean run() { + return runTask(); + } + + protected boolean dispatch() { + return dispatchTask(); + } + protected boolean timeout() { if (timeout) { return true; @@ -126,9 +180,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { return timeout; } - /** - * - */ protected boolean pause() { if (paused) { return true; @@ -150,9 +201,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { return null; } - @Override - public void dispatch(TaskInstance taskInstance, ProcessInstance processInstance) { - + public ExecutionStatus taskState() { + return this.taskInstance.getState(); } /** diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java deleted file mode 100644 index 488465063e..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner.task; - -import org.apache.dolphinscheduler.common.Constants; - -import com.google.auto.service.AutoService; - -@AutoService(ITaskProcessFactory.class) -public class CommonTaskProcessFactory implements ITaskProcessFactory { - @Override - public String type() { - return Constants.COMMON_TASK_TYPE; - - } - - @Override - public ITaskProcessor create() { - return new CommonTaskProcessor(); - } -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index f35127af6d..db6e7e202e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -19,9 +19,6 @@ package org.apache.dolphinscheduler.server.master.runner.task; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; @@ -38,9 +35,12 @@ import org.apache.commons.lang.StringUtils; import java.util.Date; +import com.google.auto.service.AutoService; + /** * common task processor */ +@AutoService(ITaskProcessor.class) public class CommonTaskProcessor extends BaseTaskProcessor { private TaskPriorityQueue taskUpdateQueue; @@ -48,42 +48,32 @@ public class CommonTaskProcessor extends BaseTaskProcessor { private NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class); @Override - public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval, boolean isTaskLogger) { - this.processInstance = processInstance; - this.taskInstance = processService.submitTaskWithRetry(processInstance, task, maxRetryTimes, commitInterval); + public boolean submitTask() { + this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval); if (this.taskInstance == null) { return false; } setTaskExecutionLogger(isTaskLogger); - int taskGroupId = task.getTaskGroupId(); + int taskGroupId = taskInstance.getTaskGroupId(); if (taskGroupId > 0) { - boolean acquireTaskGroup = processService.acquireTaskGroup(task.getId(), - task.getName(), + boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(), + taskInstance.getName(), taskGroupId, - task.getProcessInstanceId(), - task.getTaskInstancePriority().getCode()); + taskInstance.getProcessInstanceId(), + taskInstance.getTaskInstancePriority().getCode()); if (!acquireTaskGroup) { logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName()); return true; } } - dispatchTask(taskInstance, processInstance); + dispatchTask(); return true; } @Override - public ExecutionStatus taskState() { - return this.taskInstance.getState(); - } - - @Override - public void dispatch(TaskInstance taskInstance, ProcessInstance processInstance) { - this.dispatchTask(taskInstance, processInstance); - } - - @Override - public void run() { + public boolean runTask() { + return true; } @Override @@ -104,8 +94,8 @@ public class CommonTaskProcessor extends BaseTaskProcessor { return Constants.COMMON_TASK_TYPE; } - private boolean dispatchTask(TaskInstance taskInstance, ProcessInstance processInstance) { - + @Override + public boolean dispatchTask() { try { if (taskUpdateQueue == null) { this.initQueue(); @@ -133,8 +123,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { logger.info(String.format("master submit success, task : %s", taskInstance.getName())); return true; } catch (Exception e) { - logger.error("submit task Exception: ", e); - logger.error("task error : {}", JSONUtils.toJsonString(taskInstance)); + logger.error("submit task error", e); return false; } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java deleted file mode 100644 index 3028c56535..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner.task; - -import org.apache.dolphinscheduler.common.enums.TaskType; - -import com.google.auto.service.AutoService; - -@AutoService(ITaskProcessFactory.class) -public class ConditionTaskProcessFactory implements ITaskProcessFactory { - @Override - public String type() { - return TaskType.CONDITIONS.getDesc(); - } - - @Override - public ITaskProcessor create() { - return new ConditionTaskProcessor(); - } -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java index 7a7fe29dbb..3d96d2e14d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java @@ -27,12 +27,8 @@ import org.apache.dolphinscheduler.common.model.DependentTaskModel; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.utils.DependentUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.LogUtils; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.ArrayList; import java.util.Date; @@ -40,9 +36,12 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import com.google.auto.service.AutoService; + /** * condition task processor */ +@AutoService(ITaskProcessor.class) public class ConditionTaskProcessor extends BaseTaskProcessor { /** @@ -60,21 +59,13 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { */ private Map completeTaskList = new ConcurrentHashMap<>(); - private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);; - - private TaskDefinition taskDefinition; - @Override - public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) { - this.processInstance = processInstance; - this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval); + public boolean submitTask() { + this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval); if (this.taskInstance == null) { return false; } - taskDefinition = processService.findTaskDefinition( - taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion() - ); setTaskExecutionLogger(isTaskLogger); String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance)); @@ -90,13 +81,19 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { } @Override - public void run() { + public boolean runTask() { if (conditionResult.equals(DependResult.WAITING)) { setConditionResult(); endTask(); } else { endTask(); } + return true; + } + + @Override + protected boolean dispatchTask() { + return true; } @Override @@ -109,8 +106,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { @Override protected boolean taskTimeout() { - TaskTimeoutStrategy taskTimeoutStrategy = - taskDefinition.getTimeoutNotifyStrategy(); + TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy(); if (taskTimeoutStrategy == TaskTimeoutStrategy.WARN) { return true; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java deleted file mode 100644 index 3f885ed256..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner.task; - -import org.apache.dolphinscheduler.common.enums.TaskType; - -import com.google.auto.service.AutoService; - -@AutoService(ITaskProcessFactory.class) -public class DependentTaskProcessFactory implements ITaskProcessFactory { - - @Override - public String type() { - return TaskType.DEPENDENT.getDesc(); - } - - @Override - public ITaskProcessor create() { - return new DependentTaskProcessor(); - } -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index 1c1c9696b5..4b20b681de 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -27,13 +27,8 @@ import org.apache.dolphinscheduler.common.model.DependentTaskModel; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.utils.DependentUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.DependentExecute; import org.apache.dolphinscheduler.server.utils.LogUtils; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.ArrayList; import java.util.Date; @@ -42,10 +37,12 @@ import java.util.List; import java.util.Map; import com.fasterxml.jackson.annotation.JsonFormat; +import com.google.auto.service.AutoService; /** * dependent task processor */ +@AutoService(ITaskProcessor.class) public class DependentTaskProcessor extends BaseTaskProcessor { private DependentParameters dependentParameters; @@ -69,24 +66,16 @@ public class DependentTaskProcessor extends BaseTaskProcessor { DependResult result; - TaskDefinition taskDefinition; - - private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);; - boolean allDependentItemFinished; @Override - public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) { - this.processInstance = processInstance; - this.taskInstance = task; - this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval); + public boolean submitTask() { + this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval); if (this.taskInstance == null) { return false; } - taskDefinition = processService.findTaskDefinition( - taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion() - ); + taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), @@ -107,7 +96,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { } @Override - public void run() { + public boolean runTask() { if (!allDependentItemFinished) { allDependentItemFinished = allDependentTaskFinish(); } @@ -115,12 +104,17 @@ public class DependentTaskProcessor extends BaseTaskProcessor { getTaskDependResult(); endTask(); } + return true; + } + + @Override + protected boolean dispatchTask() { + return true; } @Override protected boolean taskTimeout() { - TaskTimeoutStrategy taskTimeoutStrategy = - taskDefinition.getTimeoutNotifyStrategy(); + TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy(); if (TaskTimeoutStrategy.FAILED != taskTimeoutStrategy && TaskTimeoutStrategy.WARNFAILED != taskTimeoutStrategy) { return true; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java deleted file mode 100644 index ffbbafb4ba..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner.task; - -public interface ITaskProcessFactory { - - String type(); - - ITaskProcessor create(); -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java index 66e49f99cc..d1f3c4c727 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java @@ -26,16 +26,12 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; */ public interface ITaskProcessor { - void run(); + void init(TaskInstance taskInstance, ProcessInstance processInstance); boolean action(TaskAction taskAction); String getType(); - boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger); - ExecutionStatus taskState(); - void dispatch(TaskInstance taskInstance, ProcessInstance processInstance); - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java deleted file mode 100644 index 439d8e1ee9..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner.task; - -import org.apache.dolphinscheduler.common.enums.TaskType; - -import com.google.auto.service.AutoService; - -@AutoService(ITaskProcessFactory.class) -public class SubTaskProcessFactory implements ITaskProcessFactory { - @Override - public String type() { - return TaskType.SUB_PROCESS.getDesc(); - } - - @Override - public ITaskProcessor create() { - return new SubTaskProcessor(); - } -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index 50ddaeb649..85bce57a61 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -21,8 +21,6 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -31,28 +29,26 @@ import java.util.Date; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import com.google.auto.service.AutoService; + /** * subtask processor */ +@AutoService(ITaskProcessor.class) public class SubTaskProcessor extends BaseTaskProcessor { private ProcessInstance subProcessInstance = null; - private TaskDefinition taskDefinition; /** * run lock */ private final Lock runLock = new ReentrantLock(); - private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);; + private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class); @Override - public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) { - this.processInstance = processInstance; - taskDefinition = processService.findTaskDefinition( - task.getTaskCode(), task.getTaskDefinitionVersion() - ); - this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval); + public boolean submitTask() { + this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval); if (this.taskInstance == null) { return false; @@ -67,7 +63,7 @@ public class SubTaskProcessor extends BaseTaskProcessor { } @Override - public void run() { + public boolean runTask() { try { this.runLock.lock(); if (setSubWorkFlow()) { @@ -81,12 +77,17 @@ public class SubTaskProcessor extends BaseTaskProcessor { } finally { this.runLock.unlock(); } + return true; + } + + @Override + protected boolean dispatchTask() { + return true; } @Override protected boolean taskTimeout() { - TaskTimeoutStrategy taskTimeoutStrategy = - taskDefinition.getTimeoutNotifyStrategy(); + TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy(); if (TaskTimeoutStrategy.FAILED != taskTimeoutStrategy && TaskTimeoutStrategy.WARNFAILED != taskTimeoutStrategy) { return true; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java deleted file mode 100644 index d536e65bb8..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner.task; - -import org.apache.dolphinscheduler.common.enums.TaskType; - -import com.google.auto.service.AutoService; - -@AutoService(ITaskProcessFactory.class) -public class SwitchTaskProcessFactory implements ITaskProcessFactory { - - @Override - public String type() { - return TaskType.SWITCH.getDesc(); - } - - @Override - public ITaskProcessor create() { - return new SwitchTaskProcessor(); - } -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index 1afd211504..ce2706f02f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -25,13 +25,9 @@ import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters; import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang.StringUtils; @@ -44,33 +40,28 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import com.google.auto.service.AutoService; + /** * switch task processor */ +@AutoService(ITaskProcessor.class) public class SwitchTaskProcessor extends BaseTaskProcessor { protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*"; - TaskDefinition taskDefinition; - - private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);; - /** * switch result */ private DependResult conditionResult; @Override - public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) { - this.processInstance = processInstance; - this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval); + public boolean submitTask() { + this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval); if (this.taskInstance == null) { return false; } - taskDefinition = processService.findTaskDefinition( - taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion() - ); taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), taskInstance.getProcessInstanceId(), @@ -84,7 +75,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { } @Override - public void run() { + public boolean runTask() { try { if (!this.taskState().typeIsFinished() && setSwitchResult()) { endTaskState(); @@ -95,6 +86,12 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { this.taskInstance.getId(), e); } + return true; + } + + @Override + protected boolean dispatchTask() { + return true; } @Override diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java index 42c88463b2..9044945258 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java @@ -20,8 +20,11 @@ package org.apache.dolphinscheduler.server.master.runner.task; /** * task action */ -public enum TaskAction { +public enum TaskAction { PAUSE, STOP, - TIMEOUT + TIMEOUT, + SUBMIT, + RUN, + DISPATCH } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java index 4b20848e22..2b9e9d644a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java @@ -31,25 +31,25 @@ import java.util.concurrent.ConcurrentHashMap; */ public class TaskProcessorFactory { - public static final Map PROCESS_FACTORY_MAP = new ConcurrentHashMap<>(); + public static final Map PROCESS_MAP = new ConcurrentHashMap<>(); private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE; static { - for (ITaskProcessFactory iTaskProcessor : ServiceLoader.load(ITaskProcessFactory.class)) { - PROCESS_FACTORY_MAP.put(iTaskProcessor.type(), iTaskProcessor); + for (ITaskProcessor iTaskProcessor : ServiceLoader.load(ITaskProcessor.class)) { + PROCESS_MAP.put(iTaskProcessor.getType(), iTaskProcessor); } } - public static ITaskProcessor getTaskProcessor(String type) { + public static ITaskProcessor getTaskProcessor(String type) throws InstantiationException, IllegalAccessException { if (StringUtils.isEmpty(type)) { type = DEFAULT_PROCESSOR; } - ITaskProcessFactory taskProcessFactory = PROCESS_FACTORY_MAP.get(type); - if (Objects.isNull(taskProcessFactory)) { - taskProcessFactory = PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR); + ITaskProcessor iTaskProcessor = PROCESS_MAP.get(type); + if (Objects.isNull(iTaskProcessor)) { + iTaskProcessor = PROCESS_MAP.get(DEFAULT_PROCESSOR); } - return taskProcessFactory.create(); + return iTaskProcessor.getClass().newInstance(); } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java index 8f2572d404..1e3893824d 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java @@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import java.lang.reflect.Field; @@ -89,13 +90,18 @@ public class WorkflowExecuteThreadTest { @Before public void init() throws Exception { - processService = mock(ProcessService.class); - taskProcessorFactory = mock(TaskProcessorFactory.class); - applicationContext = mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + config = new MasterConfig(); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); + processService = mock(ProcessService.class); + Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + + taskProcessorFactory = mock(TaskProcessorFactory.class); + processInstance = mock(ProcessInstance.class); Mockito.when(processInstance.getState()).thenReturn(ExecutionStatus.SUCCESS); Mockito.when(processInstance.getHistoryCmd()).thenReturn(CommandType.COMPLEMENT_DATA.toString()); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java index 4114a7a0fe..d0371809cc 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java @@ -27,7 +27,7 @@ import org.junit.Test; public class TaskProcessorFactoryTest { @Test - public void testFactory() { + public void testFactory() throws InstantiationException, IllegalAccessException { TaskInstance taskInstance = new TaskInstance(); taskInstance.setTaskType("shell");