Browse Source

Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe (#10479)

* Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe

(cherry picked from commit ad2646ff1f)
3.0.0/version-upgrade
Wenjun Ruan 3 years ago
parent
commit
4ceb420873
  1. 29
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
  2. 12
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
  3. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  4. 105
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
  5. 22
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
  6. 176
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  7. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
  8. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
  9. 27
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
  10. 4
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
  11. 12
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
  12. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
  13. 5
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  14. 35
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
  15. 9
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
  16. 4
      dolphinscheduler-standalone-server/src/main/resources/application.yaml
  17. 7
      dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
  18. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  19. 2
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  20. 24
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

29
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java

@ -17,14 +17,25 @@
package org.apache.dolphinscheduler.common.utils;
import static org.apache.dolphinscheduler.common.Constants.DATA_BASEDIR_PATH;
import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXES;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXES_DEFAULT_VALUE;
import static org.apache.dolphinscheduler.common.Constants.UTF_8;
import static org.apache.dolphinscheduler.common.Constants.YYYYMMDDHHMMSS;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import static org.apache.dolphinscheduler.common.Constants.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* file utils
@ -112,7 +123,15 @@ public class FileUtils {
File execLocalPathFile = new File(execLocalPath);
if (execLocalPathFile.exists()) {
org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile);
try {
org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile);
} catch (Exception ex) {
if (ex instanceof NoSuchFileException || ex.getCause() instanceof NoSuchFileException) {
// this file is already be deleted.
} else {
throw ex;
}
}
}
//create work dir

12
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java

@ -266,13 +266,11 @@ public class ProcessInstance {
*/
public ProcessInstance(ProcessDefinition processDefinition) {
this.processDefinition = processDefinition;
this.name = processDefinition.getName()
+ "-"
+
processDefinition.getVersion()
+ "-"
+
DateUtils.getCurrentTimeStamp();
// todo: the name is not unique
this.name = String.join("-",
processDefinition.getName(),
String.valueOf(processDefinition.getVersion()),
DateUtils.getCurrentTimeStamp());
}
public String getVarPool() {

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -180,16 +180,18 @@ public class TaskPriorityQueueConsumer extends Thread {
return true;
}
}
result = dispatcher.dispatch(executionContext);
if (result) {
logger.info("Master success dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
addDispatchEvent(context, executionContext);
} else {
logger.info("Master failed to dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
}
} catch (RuntimeException e) {
logger.error("dispatch error: ", e);
logger.error("Master dispatch task to worker error: ", e);
} catch (ExecuteException e) {
logger.error("dispatch error: {}", e.getMessage());
logger.error("Master dispatch task to worker error: {}", e);
}
return result;
}

105
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java

@ -26,10 +26,12 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import java.util.Date;
import io.netty.channel.Channel;
import lombok.Data;
/**
* task event
*/
@Data
public class TaskEvent {
/**
@ -135,107 +137,4 @@ public class TaskEvent {
return event;
}
public String getVarPool() {
return varPool;
}
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getWorkerAddress() {
return workerAddress;
}
public void setWorkerAddress(String workerAddress) {
this.workerAddress = workerAddress;
}
public ExecutionStatus getState() {
return state;
}
public void setState(ExecutionStatus state) {
this.state = state;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public String getAppIds() {
return appIds;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
}
public Event getEvent() {
return event;
}
public void setEvent(Event event) {
this.event = event;
}
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
}

22
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java

@ -79,7 +79,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
public void submitTaskEvent(TaskEvent taskEvent) {
if (!processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) {
logger.warn("workflowExecuteThread is null, event: {}", taskEvent);
logger.warn("Cannot find workflowExecuteThread from cacheManager, event: {}", taskEvent);
return;
}
if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) {
@ -114,20 +114,24 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
future.addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
logger.error("handle event {} failed: {}", taskExecuteThread.getProcessInstanceId(), ex);
if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) {
taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId());
Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
logger.error("persist event failed processInstanceId: {}", processInstanceId, ex);
if (!processInstanceExecCacheManager.contains(processInstanceId)) {
taskExecuteThreadMap.remove(processInstanceId);
logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}",
processInstanceId);
}
multiThreadFilterMap.remove(taskExecuteThread.getKey());
}
@Override
public void onSuccess(Object result) {
logger.info("persist events {} succeeded.", taskExecuteThread.getProcessInstanceId());
if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) {
taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId());
Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
logger.info("persist events succeeded, processInstanceId: {}", processInstanceId);
if (!processInstanceExecCacheManager.contains(processInstanceId)) {
taskExecuteThreadMap.remove(processInstanceId);
logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}",
processInstanceId);
}
multiThreadFilterMap.remove(taskExecuteThread.getKey());
}

176
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -75,8 +75,8 @@ import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import java.util.ArrayList;
import java.util.Arrays;
@ -87,6 +87,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -238,12 +239,12 @@ public class WorkflowExecuteThread {
* @param masterConfig masterConfig
* @param stateWheelExecuteThread stateWheelExecuteThread
*/
public WorkflowExecuteThread(ProcessInstance processInstance
, ProcessService processService
, NettyExecutorManager nettyExecutorManager
, ProcessAlertManager processAlertManager
, MasterConfig masterConfig
, StateWheelExecuteThread stateWheelExecuteThread) {
public WorkflowExecuteThread(ProcessInstance processInstance,
ProcessService processService,
NettyExecutorManager nettyExecutorManager,
ProcessAlertManager processAlertManager,
MasterConfig masterConfig,
StateWheelExecuteThread stateWheelExecuteThread) {
this.processService = processService;
this.processInstance = processInstance;
this.masterConfig = masterConfig;
@ -279,15 +280,14 @@ public class WorkflowExecuteThread {
}
public String getKey() {
if (StringUtils.isNotEmpty(key)
|| this.processDefinition == null) {
if (StringUtils.isNotEmpty(key) || this.processDefinition == null) {
return key;
}
key = String.format("%d_%d_%d",
this.processDefinition.getCode(),
this.processDefinition.getVersion(),
this.processInstance.getId());
this.processDefinition.getCode(),
this.processDefinition.getVersion(),
this.processInstance.getId());
return key;
}
@ -436,10 +436,10 @@ public class WorkflowExecuteThread {
private void taskFinished(TaskInstance taskInstance) {
logger.info("work flow {} task id:{} code:{} state:{} ",
processInstance.getId(),
taskInstance.getId(),
taskInstance.getTaskCode(),
taskInstance.getState());
processInstance.getId(),
taskInstance.getId(),
taskInstance.getTaskCode(),
taskInstance.getState());
activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
@ -460,7 +460,7 @@ public class WorkflowExecuteThread {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// There are child nodes and the failure policy is: CONTINUE
if (DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)
&& processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
&& processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
} else {
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
@ -492,8 +492,9 @@ public class WorkflowExecuteThread {
this.stateEvents.add(nextEvent);
} else {
ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(),
org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
this.processService.sendStartTask2Master(processInstance,
nextTaskInstance.getId(),
org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
}
}
}
@ -515,7 +516,8 @@ public class WorkflowExecuteThread {
}
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
if (!taskInstance.retryTaskIntervalOverTime()) {
logger.info("failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}",
logger.info(
"failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}",
processInstance.getId(),
newTaskInstance.getTaskCode(),
newTaskInstance.getState(),
@ -552,7 +554,7 @@ public class WorkflowExecuteThread {
logger.info("process instance update: {}", processInstanceId);
processInstance = processService.findProcessInstanceById(processInstanceId);
processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
}
@ -580,9 +582,7 @@ public class WorkflowExecuteThread {
*/
public boolean checkProcessInstance(StateEvent stateEvent) {
if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) {
logger.error("mismatch process instance id: {}, state event:{}",
this.processInstance.getId(),
stateEvent);
logger.error("mismatch process instance id: {}, state event:{}", this.processInstance.getId(), stateEvent);
return false;
}
return true;
@ -742,9 +742,9 @@ public class WorkflowExecuteThread {
return true;
}
logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}",
processInstance.getId(),
processInstance.getScheduleTime(),
complementListDate.toString());
processInstance.getId(),
processInstance.getScheduleTime(),
complementListDate.toString());
scheduleDate = complementListDate.get(index + 1);
}
//the next process complement
@ -783,8 +783,7 @@ public class WorkflowExecuteThread {
}
private boolean needComplementProcess() {
if (processInstance.isComplementData()
&& Flag.NO == processInstance.getIsSubProcess()) {
if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()) {
return true;
}
return false;
@ -863,7 +862,7 @@ public class WorkflowExecuteThread {
return;
}
processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
List<TaskInstance> recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam());
@ -883,7 +882,9 @@ public class WorkflowExecuteThread {
List<String> recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList);
List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
ProcessDag processDag = generateFlowDag(taskNodeList,
startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType());
startNodeNameList,
recoveryNodeCodeList,
processInstance.getTaskDependType());
if (processDag == null) {
logger.error("processDag is null");
return;
@ -955,14 +956,16 @@ public class WorkflowExecuteThread {
if (complementListDate.size() == 0 && needComplementProcess()) {
complementListDate = CronUtils.getSelfFireDateList(start, end, schedules);
logger.info(" process definition code:{} complement data: {}",
processInstance.getProcessDefinitionCode(), complementListDate.toString());
processInstance.getProcessDefinitionCode(),
complementListDate.toString());
if (complementListDate.size() > 0 && Flag.NO == processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(complementListDate.get(0));
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE)));
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA,
processInstance.getScheduleTime(),
cmdParam.get(Constants.SCHEDULE_TIMEZONE)));
processService.updateProcessInstance(processInstance);
}
}
@ -976,7 +979,7 @@ public class WorkflowExecuteThread {
* @param taskInstance task instance
* @return TaskInstance
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {
private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
try {
// package task instance before submit
processService.packageTaskInstance(taskInstance, processInstance);
@ -984,17 +987,17 @@ public class WorkflowExecuteThread {
ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
taskProcessor.init(taskInstance, processInstance);
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
&& taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION && taskProcessor.getType()
.equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);
}
boolean submit = taskProcessor.action(TaskAction.SUBMIT);
if (!submit) {
logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
processInstance.getId(), processInstance.getName(),
taskInstance.getId(), taskInstance.getName());
return null;
processInstance.getId(), processInstance.getName(),
taskInstance.getId(), taskInstance.getName());
return Optional.empty();
}
// in a dag, only one taskInstance is valid per taskCode, so need to set the old taskInstance invalid
@ -1033,10 +1036,10 @@ public class WorkflowExecuteThread {
taskStateChangeEvent.setType(StateEventType.TASK_STATE_CHANGE);
this.stateEvents.add(taskStateChangeEvent);
}
return taskInstance;
return Optional.of(taskInstance);
} catch (Exception e) {
logger.error("submit standby task error", e);
return null;
return Optional.empty();
}
}
@ -1333,6 +1336,7 @@ public class WorkflowExecuteThread {
for (TaskInstance task : taskInstances) {
if (readyToSubmitTaskQueue.contains(task)) {
logger.warn("Task is already at submit queue, taskInstanceId: {}", task.getId());
continue;
}
@ -1659,11 +1663,12 @@ public class WorkflowExecuteThread {
private void updateProcessInstanceState() {
ExecutionStatus state = getProcessInstanceState(processInstance);
if (processInstance.getState() != state) {
logger.info(
"work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
processInstance.getId(), processInstance.getName(),
processInstance.getState(), state,
processInstance.getCommandType());
logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
processInstance.getId(),
processInstance.getName(),
processInstance.getState(),
state,
processInstance.getCommandType());
processInstance.setState(state);
if (state.typeIsFinished()) {
@ -1687,11 +1692,12 @@ public class WorkflowExecuteThread {
private void updateProcessInstanceState(StateEvent stateEvent) {
ExecutionStatus state = stateEvent.getExecutionStatus();
if (processInstance.getState() != state) {
logger.info(
"work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
processInstance.getId(), processInstance.getName(),
processInstance.getState(), state,
processInstance.getCommandType());
logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
processInstance.getId(),
processInstance.getName(),
processInstance.getState(),
state,
processInstance.getCommandType());
processInstance.setState(state);
if (state.typeIsFinished()) {
@ -1723,7 +1729,9 @@ public class WorkflowExecuteThread {
return;
}
logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}",
taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode());
taskInstance.getName(),
taskInstance.getId(),
taskInstance.getTaskCode());
readyToSubmitTaskQueue.put(taskInstance);
} catch (Exception e) {
logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e);
@ -1736,15 +1744,14 @@ public class WorkflowExecuteThread {
* @param taskInstance task instance
*/
private void removeTaskFromStandbyList(TaskInstance taskInstance) {
logger.info("remove task from stand by list, id: {} name:{}",
taskInstance.getId(),
taskInstance.getName());
logger.info("remove task from stand by list, id: {} name:{}", taskInstance.getId(), taskInstance.getName());
try {
readyToSubmitTaskQueue.remove(taskInstance);
} catch (Exception e) {
logger.error("remove task instance from readyToSubmitTaskQueue error, task id:{}, Name: {}",
taskInstance.getId(),
taskInstance.getName(), e);
taskInstance.getId(),
taskInstance.getName(),
e);
}
}
@ -1766,8 +1773,9 @@ public class WorkflowExecuteThread {
* close the on going tasks
*/
private void killAllTasks() {
logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(),
activeTaskProcessorMaps.size());
logger.info("kill called on process instance id: {}, num: {}",
processInstance.getId(),
activeTaskProcessorMaps.size());
if (readyToSubmitTaskQueue.size() > 0) {
readyToSubmitTaskQueue.clear();
@ -1831,14 +1839,16 @@ public class WorkflowExecuteThread {
}
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
TaskInstance taskInstance = submitTaskExec(task);
if (taskInstance == null) {
Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
if (!taskInstanceOptional.isPresent()) {
this.taskFailedSubmit = true;
// Remove and add to complete map and error map
removeTaskFromStandbyList(task);
completeTaskMap.put(task.getTaskCode(), task.getId());
errorTaskMap.put(task.getTaskCode(), task.getId());
logger.error("process {}, task {}, code:{} submit task failed.", task.getProcessInstanceId(), task.getName(), task.getTaskCode());
logger.error("Task submitted failed, processInstanceId: {}, taskInstanceId: {}",
task.getProcessInstanceId(),
task.getId());
} else {
removeTaskFromStandbyList(task);
}
@ -1846,11 +1856,15 @@ public class WorkflowExecuteThread {
// if the dependency fails, the current node is not submitted and the state changes to failure.
dependFailedTaskMap.put(task.getTaskCode(), task.getId());
removeTaskFromStandbyList(task);
logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}",
task.getId(),
dependResult);
} else if (DependResult.NON_EXEC == dependResult) {
// for some reasons(depend task pause/stop) this task would not be submit
removeTaskFromStandbyList(task);
logger.info("remove task {},id:{} , because depend result : {}", task.getName(), task.getId(), dependResult);
logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}",
task.getId(),
dependResult);
}
}
} catch (Exception e) {
@ -2009,4 +2023,30 @@ public class WorkflowExecuteThread {
}
}
}
}
private void measureTaskState(StateEvent taskStateEvent) {
if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) {
// the event is broken
logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent);
return;
}
if (taskStateEvent.getExecutionStatus().typeIsFinished()) {
TaskMetrics.incTaskFinish();
}
switch (taskStateEvent.getExecutionStatus()) {
case STOP:
TaskMetrics.incTaskStop();
break;
case SUCCESS:
TaskMetrics.incTaskSuccess();
break;
case FAILURE:
TaskMetrics.incTaskFailure();
break;
default:
break;
}
}
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -100,7 +100,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
/**
* execute workflow
*/
public void executeEvent(WorkflowExecuteThread workflowExecuteThread) {
public void executeEvent(final WorkflowExecuteRunnable workflowExecuteThread) {
if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
return;
}

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

@ -110,7 +110,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName());
return true;
}
logger.info("task ready to submit: {}", taskInstance);
logger.info("task ready to submit: taskInstanceId: {}", taskInstance.getId());
TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(),
@ -125,7 +125,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
taskPriority.setTaskExecutionContext(taskExecutionContext);
taskUpdateQueue.put(taskPriority);
logger.info(String.format("master submit success, task : %s", taskInstance.getName()));
logger.info("Master submit task to priority queue success, taskInstanceId : {}", taskInstance.getId());
return true;
} catch (Exception e) {
logger.error("submit task error", e);

27
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java

@ -21,36 +21,47 @@ import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* the factory to create task processor
*/
public class TaskProcessorFactory {
public static final Map<String, ITaskProcessor> PROCESS_MAP = new ConcurrentHashMap<>();
private static final Logger logger = LoggerFactory.getLogger(TaskProcessorFactory.class);
public static final Map<String, Constructor<ITaskProcessor>> PROCESS_MAP = new ConcurrentHashMap<>();
private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
static {
for (ITaskProcessor iTaskProcessor : ServiceLoader.load(ITaskProcessor.class)) {
PROCESS_MAP.put(iTaskProcessor.getType(), iTaskProcessor);
try {
PROCESS_MAP.put(iTaskProcessor.getType(), (Constructor<ITaskProcessor>) iTaskProcessor.getClass().getConstructor());
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("The task processor should has a no args constructor");
}
}
}
public static ITaskProcessor getTaskProcessor(String type) throws InstantiationException, IllegalAccessException {
public static ITaskProcessor getTaskProcessor(String type) throws InvocationTargetException, InstantiationException, IllegalAccessException {
if (StringUtils.isEmpty(type)) {
type = DEFAULT_PROCESSOR;
}
ITaskProcessor iTaskProcessor = PROCESS_MAP.get(type);
if (Objects.isNull(iTaskProcessor)) {
iTaskProcessor = PROCESS_MAP.get(DEFAULT_PROCESSOR);
Constructor<ITaskProcessor> iTaskProcessorConstructor = PROCESS_MAP.get(type);
if (iTaskProcessorConstructor == null) {
logger.warn("ITaskProcessor could not found for taskType: {}", type);
iTaskProcessorConstructor = PROCESS_MAP.get(DEFAULT_PROCESSOR);
}
return iTaskProcessor.getClass().newInstance();
return iTaskProcessorConstructor.newInstance();
}
/**

4
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java

@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.lang.reflect.InvocationTargetException;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@ -27,7 +29,7 @@ import org.junit.Test;
public class TaskProcessorFactoryTest {
@Test
public void testFactory() throws InstantiationException, IllegalAccessException {
public void testFactory() throws InvocationTargetException, InstantiationException, IllegalAccessException {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("shell");

12
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java

@ -21,6 +21,9 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
import java.net.InetSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
/**
@ -28,6 +31,8 @@ import io.netty.channel.Channel;
*/
public class ChannelUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelUtils.class);
private ChannelUtils() {
throw new IllegalStateException(ChannelUtils.class.getName());
}
@ -49,7 +54,7 @@ public class ChannelUtils {
* @return remote address
*/
public static String getRemoteAddress(Channel channel) {
return NetUtils.getHost(((InetSocketAddress) channel.remoteAddress()).getAddress());
return toAddress(channel).getAddress();
}
/**
@ -60,6 +65,11 @@ public class ChannelUtils {
*/
public static Host toAddress(Channel channel) {
InetSocketAddress socketAddress = ((InetSocketAddress) channel.remoteAddress());
if (socketAddress == null) {
// the remote channel already closed
LOGGER.warn("The channel is already closed");
return Host.EMPTY;
}
return new Host(NetUtils.getHost(socketAddress.getAddress()), socketAddress.getPort());
}

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java

@ -27,6 +27,8 @@ import java.util.Objects;
*/
public class Host implements Serializable {
public static final Host EMPTY = new Host();
/**
* address
*/

5
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -748,7 +748,8 @@ public class ProcessServiceImpl implements ProcessService {
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
processInstance.setRecovery(Flag.NO);
processInstance.setStartTime(new Date());
processInstance.setRestartTime(processInstance.getStartTime());
// the new process instance restart time is null.
processInstance.setRestartTime(null);
processInstance.setRunTimes(1);
processInstance.setMaxTryTimes(0);
processInstance.setCommandParam(command.getCommandParam());
@ -1266,7 +1267,7 @@ public class ProcessServiceImpl implements ProcessService {
@Override
@Transactional(rollbackFor = Exception.class)
public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) {
logger.info("start submit task : {}, instance id:{}, state: {}",
logger.info("start submit task : {}, processInstance id:{}, state: {}",
taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
//submit to db
TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);

35
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java

@ -21,11 +21,13 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* Task instances priority queue implementation
@ -40,12 +42,8 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
/**
* queue
*/
private PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
/**
* Lock used for all public operations
*/
private final ReentrantLock lock = new ReentrantLock(true);
private final PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
private final Set<Integer> taskInstanceIdSet = Collections.synchronizedSet(new HashSet<>());
/**
* put task instance to priority queue
@ -56,6 +54,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
@Override
public void put(TaskInstance taskInstance) throws TaskPriorityQueueException {
queue.add(taskInstance);
taskInstanceIdSet.add(taskInstance.getId());
}
/**
@ -66,7 +65,11 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
*/
@Override
public TaskInstance take() throws TaskPriorityQueueException {
return queue.poll();
TaskInstance taskInstance = queue.poll();
if (taskInstance != null) {
taskInstanceIdSet.remove(taskInstance.getId());
}
return taskInstance;
}
/**
@ -111,6 +114,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
*/
public void clear() {
queue.clear();
taskInstanceIdSet.clear();
}
/**
@ -120,20 +124,11 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
* @return true is contains
*/
public boolean contains(TaskInstance taskInstance) {
return this.contains(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
return this.contains(taskInstance.getId());
}
public boolean contains(long taskCode, int taskVersion) {
Iterator<TaskInstance> iterator = this.queue.iterator();
while (iterator.hasNext()) {
TaskInstance taskInstance = iterator.next();
if (taskCode == taskInstance.getTaskCode()
&& taskVersion == taskInstance.getTaskDefinitionVersion()) {
return true;
}
}
return false;
public boolean contains(int taskInstanceId) {
return taskInstanceIdSet.contains(taskInstanceId);
}
/**

9
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java

@ -46,14 +46,11 @@ public class PeerTaskInstancePriorityQueueTest {
Assert.assertTrue(queue.size() < peekBeforeLength);
}
@Test
@Test(expected = TaskPriorityQueueException.class)
public void poll() throws Exception {
PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
try {
queue.poll(1000, TimeUnit.MILLISECONDS);
} catch (TaskPriorityQueueException e) {
e.printStackTrace();
}
queue.poll(1000, TimeUnit.MILLISECONDS);
}
@Test

4
dolphinscheduler-standalone-server/src/main/resources/application.yaml

@ -109,7 +109,7 @@ master:
# master prepare execute thread number to limit handle commands in parallel
pre-exec-threads: 10
# master execute thread number to limit process instances in parallel
exec-threads: 100
exec-threads: 10
# master dispatch task number per batch
dispatch-task-number: 3
# master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
@ -134,7 +134,7 @@ worker:
# worker listener port
listen-port: 1234
# worker execute thread number to limit task instances in parallel
exec-threads: 100
exec-threads: 10
# worker heartbeat interval, the unit is second
heartbeat-interval: 10
# worker host weight to dispatch tasks, default value 100

7
dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java

@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections4.MapUtils;
import java.io.File;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
@ -147,7 +148,11 @@ public class ShellTask extends AbstractTaskExecutor {
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
Files.createFile(path, attr);
try {
Files.createFile(path, attr);
} catch (FileAlreadyExistsException ex) {
// this is expected
}
}
Files.write(path, shellParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -21,9 +21,9 @@ import java.util.Set;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
import org.springframework.context.annotation.Configuration;
@Component
@Configuration
@EnableConfigurationProperties
@ConfigurationProperties("worker")
public class WorkerConfig {

2
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -162,6 +162,8 @@ public class TaskCallbackService {
}
}
});
} else {
logger.warn("Remote channel of taskInstanceId is null: {}, cannot send command: {}", taskInstanceId, command);
}
}

24
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -19,12 +19,15 @@ package org.apache.dolphinscheduler.server.worker.runner;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -40,10 +43,17 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -233,7 +243,11 @@ public class TaskExecuteThread implements Runnable, Delayed {
org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath));
logger.info("exec local path: {} cleared.", execLocalPath);
} catch (IOException e) {
logger.error("delete exec dir failed : {}", e.getMessage(), e);
if (e instanceof NoSuchFileException) {
// this is expected
} else {
logger.error("Delete exec dir failed.", e);
}
}
}
}
@ -264,7 +278,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
task.cancelApplication(true);
ProcessUtils.killYarnJob(taskExecutionContext);
} catch (Exception e) {
logger.error(e.getMessage(), e);
logger.error("Kill task failed", e);
}
}
}

Loading…
Cancel
Save