Browse Source

[Implement][MasterServer]TaskProcessor code optimization (#7754)

* task processor optimization

* fix test

Co-authored-by: caishunfeng <534328519@qq.com>
3.0.0/version-upgrade
wind 3 years ago committed by GitHub
parent
commit
4b22ad6cf6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  2. 70
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  3. 36
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java
  4. 43
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
  5. 35
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java
  6. 30
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
  7. 36
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java
  8. 32
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
  9. 25
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java
  10. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
  11. 35
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java
  12. 27
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
  13. 36
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java
  14. 27
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
  15. 7
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
  16. 16
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
  17. 12
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
  18. 2
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java

20
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -54,7 +54,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -333,7 +332,8 @@ public class WorkflowExecuteThread {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
taskProcessor.dispatch(taskInstance, processInstance);
taskProcessor.init(taskInstance, processInstance);
taskProcessor.action(TaskAction.DISPATCH);
this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
return true;
}
@ -343,7 +343,8 @@ public class WorkflowExecuteThread {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
ProcessInstance processInstance = this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
taskProcessor.dispatch(taskInstance, processInstance);
taskProcessor.init(taskInstance, processInstance);
taskProcessor.action(TaskAction.DISPATCH);
return true;
}
}
@ -406,7 +407,7 @@ public class WorkflowExecuteThread {
}
} else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) {
ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
iTaskProcessor.run();
iTaskProcessor.action(TaskAction.RUN);
if (iTaskProcessor.taskState().typeIsFinished()) {
task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
@ -800,15 +801,18 @@ public class WorkflowExecuteThread {
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {
try {
// package task instance before submit
processService.packageTaskInstance(taskInstance, processInstance);
ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
taskProcessor.init(taskInstance, processInstance);
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
&& taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);
}
// package task instance before submit
processService.packageTaskInstance(taskInstance, processInstance);
boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getTaskCommitRetryTimes(), masterConfig.getTaskCommitInterval(), masterConfig.isTaskLogger());
boolean submit = taskProcessor.action(TaskAction.SUBMIT);
if (!submit) {
logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
processInstance.getId(), processInstance.getName(),
@ -818,7 +822,7 @@ public class WorkflowExecuteThread {
validTaskMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance.getId());
taskInstanceMap.put(taskInstance.getId(), taskInstance);
activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
taskProcessor.run();
taskProcessor.action(TaskAction.RUN);
stateWheelExecuteThread.addTask4TimeoutCheck(taskInstance);
stateWheelExecuteThread.addTask4RetryCheck(taskInstance);

70
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
@ -80,7 +81,30 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected ProcessInstance processInstance;
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);;
protected int maxRetryTimes;
protected int commitInterval;
protected boolean isTaskLogger;
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
protected MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
@Override
public void init(TaskInstance taskInstance, ProcessInstance processInstance) {
if (processService == null) {
processService = SpringApplicationContext.getBean(ProcessService.class);
}
if (masterConfig == null) {
masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
}
this.taskInstance = taskInstance;
this.processInstance = processInstance;
this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes();
this.commitInterval = masterConfig.getTaskCommitInterval();
this.isTaskLogger = masterConfig.isTaskLogger();
}
/**
* pause task, common tasks donot need this.
@ -97,9 +121,21 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
*/
protected abstract boolean taskTimeout();
@Override
public void run() {
}
/**
* submit task
*/
protected abstract boolean submitTask();
/**
* run task
*/
protected abstract boolean runTask();
/**
* dispatch task
* @return
*/
protected abstract boolean dispatchTask();
@Override
public boolean action(TaskAction taskAction) {
@ -111,6 +147,12 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return pause();
case TIMEOUT:
return timeout();
case SUBMIT:
return submit();
case RUN:
return run();
case DISPATCH:
return dispatch();
default:
logger.error("unknown task action: {}", taskAction);
@ -118,6 +160,18 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return false;
}
protected boolean submit() {
return submitTask();
}
protected boolean run() {
return runTask();
}
protected boolean dispatch() {
return dispatchTask();
}
protected boolean timeout() {
if (timeout) {
return true;
@ -126,9 +180,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return timeout;
}
/**
*
*/
protected boolean pause() {
if (paused) {
return true;
@ -150,9 +201,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return null;
}
@Override
public void dispatch(TaskInstance taskInstance, ProcessInstance processInstance) {
public ExecutionStatus taskState() {
return this.taskInstance.getState();
}
/**

36
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java

@ -1,36 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.Constants;
import com.google.auto.service.AutoService;
@AutoService(ITaskProcessFactory.class)
public class CommonTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return Constants.COMMON_TASK_TYPE;
}
@Override
public ITaskProcessor create() {
return new CommonTaskProcessor();
}
}

43
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

@ -19,9 +19,6 @@ package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@ -38,9 +35,12 @@ import org.apache.commons.lang.StringUtils;
import java.util.Date;
import com.google.auto.service.AutoService;
/**
* common task processor
*/
@AutoService(ITaskProcessor.class)
public class CommonTaskProcessor extends BaseTaskProcessor {
private TaskPriorityQueue taskUpdateQueue;
@ -48,42 +48,32 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
private NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval, boolean isTaskLogger) {
this.processInstance = processInstance;
this.taskInstance = processService.submitTaskWithRetry(processInstance, task, maxRetryTimes, commitInterval);
public boolean submitTask() {
this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
setTaskExecutionLogger(isTaskLogger);
int taskGroupId = task.getTaskGroupId();
int taskGroupId = taskInstance.getTaskGroupId();
if (taskGroupId > 0) {
boolean acquireTaskGroup = processService.acquireTaskGroup(task.getId(),
task.getName(),
boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(),
taskInstance.getName(),
taskGroupId,
task.getProcessInstanceId(),
task.getTaskInstancePriority().getCode());
taskInstance.getProcessInstanceId(),
taskInstance.getTaskInstancePriority().getCode());
if (!acquireTaskGroup) {
logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName());
return true;
}
}
dispatchTask(taskInstance, processInstance);
dispatchTask();
return true;
}
@Override
public ExecutionStatus taskState() {
return this.taskInstance.getState();
}
@Override
public void dispatch(TaskInstance taskInstance, ProcessInstance processInstance) {
this.dispatchTask(taskInstance, processInstance);
}
@Override
public void run() {
public boolean runTask() {
return true;
}
@Override
@ -104,8 +94,8 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
return Constants.COMMON_TASK_TYPE;
}
private boolean dispatchTask(TaskInstance taskInstance, ProcessInstance processInstance) {
@Override
public boolean dispatchTask() {
try {
if (taskUpdateQueue == null) {
this.initQueue();
@ -133,8 +123,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
logger.info(String.format("master submit success, task : %s", taskInstance.getName()));
return true;
} catch (Exception e) {
logger.error("submit task Exception: ", e);
logger.error("task error : {}", JSONUtils.toJsonString(taskInstance));
logger.error("submit task error", e);
return false;
}
}

35
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java

@ -1,35 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import com.google.auto.service.AutoService;
@AutoService(ITaskProcessFactory.class)
public class ConditionTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return TaskType.CONDITIONS.getDesc();
}
@Override
public ITaskProcessor create() {
return new ConditionTaskProcessor();
}
}

30
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java

@ -27,12 +27,8 @@ import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
@ -40,9 +36,12 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.google.auto.service.AutoService;
/**
* condition task processor
*/
@AutoService(ITaskProcessor.class)
public class ConditionTaskProcessor extends BaseTaskProcessor {
/**
@ -60,21 +59,13 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
*/
private Map<Long, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);;
private TaskDefinition taskDefinition;
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
this.processInstance = processInstance;
this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
public boolean submitTask() {
this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
setTaskExecutionLogger(isTaskLogger);
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
@ -90,13 +81,19 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
}
@Override
public void run() {
public boolean runTask() {
if (conditionResult.equals(DependResult.WAITING)) {
setConditionResult();
endTask();
} else {
endTask();
}
return true;
}
@Override
protected boolean dispatchTask() {
return true;
}
@Override
@ -109,8 +106,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
@Override
protected boolean taskTimeout() {
TaskTimeoutStrategy taskTimeoutStrategy =
taskDefinition.getTimeoutNotifyStrategy();
TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
if (taskTimeoutStrategy == TaskTimeoutStrategy.WARN) {
return true;
}

36
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java

@ -1,36 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import com.google.auto.service.AutoService;
@AutoService(ITaskProcessFactory.class)
public class DependentTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return TaskType.DEPENDENT.getDesc();
}
@Override
public ITaskProcessor create() {
return new DependentTaskProcessor();
}
}

32
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java

@ -27,13 +27,8 @@ import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
@ -42,10 +37,12 @@ import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.google.auto.service.AutoService;
/**
* dependent task processor
*/
@AutoService(ITaskProcessor.class)
public class DependentTaskProcessor extends BaseTaskProcessor {
private DependentParameters dependentParameters;
@ -69,24 +66,16 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
DependResult result;
TaskDefinition taskDefinition;
private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);;
boolean allDependentItemFinished;
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
this.processInstance = processInstance;
this.taskInstance = task;
this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
public boolean submitTask() {
this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
@ -107,7 +96,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
}
@Override
public void run() {
public boolean runTask() {
if (!allDependentItemFinished) {
allDependentItemFinished = allDependentTaskFinish();
}
@ -115,12 +104,17 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
getTaskDependResult();
endTask();
}
return true;
}
@Override
protected boolean dispatchTask() {
return true;
}
@Override
protected boolean taskTimeout() {
TaskTimeoutStrategy taskTimeoutStrategy =
taskDefinition.getTimeoutNotifyStrategy();
TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
if (TaskTimeoutStrategy.FAILED != taskTimeoutStrategy
&& TaskTimeoutStrategy.WARNFAILED != taskTimeoutStrategy) {
return true;

25
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java

@ -1,25 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
public interface ITaskProcessFactory {
String type();
ITaskProcessor create();
}

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java

@ -26,16 +26,12 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
*/
public interface ITaskProcessor {
void run();
void init(TaskInstance taskInstance, ProcessInstance processInstance);
boolean action(TaskAction taskAction);
String getType();
boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger);
ExecutionStatus taskState();
void dispatch(TaskInstance taskInstance, ProcessInstance processInstance);
}

35
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java

@ -1,35 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import com.google.auto.service.AutoService;
@AutoService(ITaskProcessFactory.class)
public class SubTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return TaskType.SUB_PROCESS.getDesc();
}
@Override
public ITaskProcessor create() {
return new SubTaskProcessor();
}
}

27
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java

@ -21,8 +21,6 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -31,28 +29,26 @@ import java.util.Date;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.google.auto.service.AutoService;
/**
* subtask processor
*/
@AutoService(ITaskProcessor.class)
public class SubTaskProcessor extends BaseTaskProcessor {
private ProcessInstance subProcessInstance = null;
private TaskDefinition taskDefinition;
/**
* run lock
*/
private final Lock runLock = new ReentrantLock();
private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);;
private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
this.processInstance = processInstance;
taskDefinition = processService.findTaskDefinition(
task.getTaskCode(), task.getTaskDefinitionVersion()
);
this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
public boolean submitTask() {
this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
@ -67,7 +63,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
}
@Override
public void run() {
public boolean runTask() {
try {
this.runLock.lock();
if (setSubWorkFlow()) {
@ -81,12 +77,17 @@ public class SubTaskProcessor extends BaseTaskProcessor {
} finally {
this.runLock.unlock();
}
return true;
}
@Override
protected boolean dispatchTask() {
return true;
}
@Override
protected boolean taskTimeout() {
TaskTimeoutStrategy taskTimeoutStrategy =
taskDefinition.getTimeoutNotifyStrategy();
TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
if (TaskTimeoutStrategy.FAILED != taskTimeoutStrategy
&& TaskTimeoutStrategy.WARNFAILED != taskTimeoutStrategy) {
return true;

36
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java

@ -1,36 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import com.google.auto.service.AutoService;
@AutoService(ITaskProcessFactory.class)
public class SwitchTaskProcessFactory implements ITaskProcessFactory {
@Override
public String type() {
return TaskType.SWITCH.getDesc();
}
@Override
public ITaskProcessor create() {
return new SwitchTaskProcessor();
}
}

27
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java

@ -25,13 +25,9 @@ import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
@ -44,33 +40,28 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import com.google.auto.service.AutoService;
/**
* switch task processor
*/
@AutoService(ITaskProcessor.class)
public class SwitchTaskProcessor extends BaseTaskProcessor {
protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
TaskDefinition taskDefinition;
private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);;
/**
* switch result
*/
private DependResult conditionResult;
@Override
public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) {
this.processInstance = processInstance;
this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval);
public boolean submitTask() {
this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
@ -84,7 +75,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
}
@Override
public void run() {
public boolean runTask() {
try {
if (!this.taskState().typeIsFinished() && setSwitchResult()) {
endTaskState();
@ -95,6 +86,12 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
this.taskInstance.getId(),
e);
}
return true;
}
@Override
protected boolean dispatchTask() {
return true;
}
@Override

7
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java

@ -20,8 +20,11 @@ package org.apache.dolphinscheduler.server.master.runner.task;
/**
* task action
*/
public enum TaskAction {
public enum TaskAction {
PAUSE,
STOP,
TIMEOUT
TIMEOUT,
SUBMIT,
RUN,
DISPATCH
}

16
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java

@ -31,25 +31,25 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class TaskProcessorFactory {
public static final Map<String, ITaskProcessFactory> PROCESS_FACTORY_MAP = new ConcurrentHashMap<>();
public static final Map<String, ITaskProcessor> PROCESS_MAP = new ConcurrentHashMap<>();
private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
static {
for (ITaskProcessFactory iTaskProcessor : ServiceLoader.load(ITaskProcessFactory.class)) {
PROCESS_FACTORY_MAP.put(iTaskProcessor.type(), iTaskProcessor);
for (ITaskProcessor iTaskProcessor : ServiceLoader.load(ITaskProcessor.class)) {
PROCESS_MAP.put(iTaskProcessor.getType(), iTaskProcessor);
}
}
public static ITaskProcessor getTaskProcessor(String type) {
public static ITaskProcessor getTaskProcessor(String type) throws InstantiationException, IllegalAccessException {
if (StringUtils.isEmpty(type)) {
type = DEFAULT_PROCESSOR;
}
ITaskProcessFactory taskProcessFactory = PROCESS_FACTORY_MAP.get(type);
if (Objects.isNull(taskProcessFactory)) {
taskProcessFactory = PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR);
ITaskProcessor iTaskProcessor = PROCESS_MAP.get(type);
if (Objects.isNull(iTaskProcessor)) {
iTaskProcessor = PROCESS_MAP.get(DEFAULT_PROCESSOR);
}
return taskProcessFactory.create();
return iTaskProcessor.getClass().newInstance();
}
}

12
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java

@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.lang.reflect.Field;
@ -89,13 +90,18 @@ public class WorkflowExecuteThreadTest {
@Before
public void init() throws Exception {
processService = mock(ProcessService.class);
taskProcessorFactory = mock(TaskProcessorFactory.class);
applicationContext = mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
config = new MasterConfig();
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
processService = mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
taskProcessorFactory = mock(TaskProcessorFactory.class);
processInstance = mock(ProcessInstance.class);
Mockito.when(processInstance.getState()).thenReturn(ExecutionStatus.SUCCESS);
Mockito.when(processInstance.getHistoryCmd()).thenReturn(CommandType.COMPLEMENT_DATA.toString());

2
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java

@ -27,7 +27,7 @@ import org.junit.Test;
public class TaskProcessorFactoryTest {
@Test
public void testFactory() {
public void testFactory() throws InstantiationException, IllegalAccessException {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("shell");

Loading…
Cancel
Save