From ad2646ff1f7baa5d76d29023ced2c28a89b52f6b Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Thu, 16 Jun 2022 21:46:18 +0800 Subject: [PATCH] Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe (#10479) * Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe --- .../common/utils/FileUtils.java | 29 ++++- .../dao/entity/ProcessInstance.java | 12 +- .../consumer/TaskPriorityQueueConsumer.java | 8 +- .../master/dispatch/ExecutorDispatcher.java | 2 - .../master/processor/queue/TaskEvent.java | 106 +----------------- .../queue/TaskExecuteThreadPool.java | 22 ++-- .../runner/WorkflowExecuteRunnable.java | 66 ++++++----- .../runner/WorkflowExecuteThreadPool.java | 2 +- .../runner/task/CommonTaskProcessor.java | 4 +- .../runner/task/TaskProcessorFactory.java | 23 ++-- .../runner/task/TaskProcessorFactoryTest.java | 4 +- .../remote/utils/ChannelUtils.java | 12 +- .../dolphinscheduler/remote/utils/Host.java | 2 + .../service/process/ProcessServiceImpl.java | 5 +- .../queue/PeerTaskInstancePriorityQueue.java | 35 +++--- .../PeerTaskInstancePriorityQueueTest.java | 9 +- .../src/main/resources/application.yaml | 4 +- .../plugin/task/shell/ShellTask.java | 7 +- .../server/worker/config/WorkerConfig.java | 4 +- .../worker/processor/TaskCallbackService.java | 2 + .../worker/runner/TaskExecuteThread.java | 28 ++++- 21 files changed, 173 insertions(+), 213 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java index bdcf62f76a..23e4b74b75 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java @@ -17,14 +17,25 @@ package org.apache.dolphinscheduler.common.utils; +import static org.apache.dolphinscheduler.common.Constants.DATA_BASEDIR_PATH; +import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXES; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXES_DEFAULT_VALUE; +import static org.apache.dolphinscheduler.common.Constants.UTF_8; +import static org.apache.dolphinscheduler.common.Constants.YYYYMMDDHHMMSS; + import org.apache.commons.io.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.*; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.nio.file.NoSuchFileException; -import static org.apache.dolphinscheduler.common.Constants.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * file utils @@ -112,7 +123,15 @@ public class FileUtils { File execLocalPathFile = new File(execLocalPath); if (execLocalPathFile.exists()) { - org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile); + try { + org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile); + } catch (Exception ex) { + if (ex instanceof NoSuchFileException || ex.getCause() instanceof NoSuchFileException) { + // this file is already be deleted. + } else { + throw ex; + } + } } //create work dir diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index 29971f94db..054df92e4f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -265,13 +265,11 @@ public class ProcessInstance { */ public ProcessInstance(ProcessDefinition processDefinition) { this.processDefinition = processDefinition; - this.name = processDefinition.getName() - + "-" - + - processDefinition.getVersion() - + "-" - + - DateUtils.getCurrentTimeStamp(); + // todo: the name is not unique + this.name = String.join("-", + processDefinition.getName(), + String.valueOf(processDefinition.getVersion()), + DateUtils.getCurrentTimeStamp()); } public String getVarPool() { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index c1b54df62f..d75595de1a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -190,16 +190,18 @@ public class TaskPriorityQueueConsumer extends Thread { return true; } } - result = dispatcher.dispatch(executionContext); if (result) { + logger.info("Master success dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId()); addDispatchEvent(context, executionContext); + } else { + logger.info("Master failed to dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId()); } } catch (RuntimeException e) { - logger.error("dispatch error: ", e); + logger.error("Master dispatch task to worker error: ", e); } catch (ExecuteException e) { - logger.error("dispatch error: {}", e.getMessage()); + logger.error("Master dispatch task to worker error: {}", e); } return result; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java index 25db4eb8a6..d439c28267 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.server.master.dispatch; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java index 3ed41329fe..842bcaf333 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java @@ -27,10 +27,12 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import java.util.Date; import io.netty.channel.Channel; +import lombok.Data; /** * task event */ +@Data public class TaskEvent { /** @@ -144,108 +146,4 @@ public class TaskEvent { event.setEvent(Event.WORKER_REJECT); return event; } - - public String getVarPool() { - return varPool; - } - - public void setVarPool(String varPool) { - this.varPool = varPool; - } - - public int getTaskInstanceId() { - return taskInstanceId; - } - - public void setTaskInstanceId(int taskInstanceId) { - this.taskInstanceId = taskInstanceId; - } - - public String getWorkerAddress() { - return workerAddress; - } - - public void setWorkerAddress(String workerAddress) { - this.workerAddress = workerAddress; - } - - public ExecutionStatus getState() { - return state; - } - - public void setState(ExecutionStatus state) { - this.state = state; - } - - public Date getStartTime() { - return startTime; - } - - public void setStartTime(Date startTime) { - this.startTime = startTime; - } - - public Date getEndTime() { - return endTime; - } - - public void setEndTime(Date endTime) { - this.endTime = endTime; - } - - public String getExecutePath() { - return executePath; - } - - public void setExecutePath(String executePath) { - this.executePath = executePath; - } - - public String getLogPath() { - return logPath; - } - - public void setLogPath(String logPath) { - this.logPath = logPath; - } - - public int getProcessId() { - return processId; - } - - public void setProcessId(int processId) { - this.processId = processId; - } - - public String getAppIds() { - return appIds; - } - - public void setAppIds(String appIds) { - this.appIds = appIds; - } - - public Event getEvent() { - return event; - } - - public void setEvent(Event event) { - this.event = event; - } - - public Channel getChannel() { - return channel; - } - - public void setChannel(Channel channel) { - this.channel = channel; - } - - public int getProcessInstanceId() { - return processInstanceId; - } - - public void setProcessInstanceId(int processInstanceId) { - this.processInstanceId = processInstanceId; - } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java index 6bf3f862ae..323ea86411 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java @@ -79,7 +79,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor { public void submitTaskEvent(TaskEvent taskEvent) { if (!processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) { - logger.warn("workflowExecuteThread is null, event: {}", taskEvent); + logger.warn("Cannot find workflowExecuteThread from cacheManager, event: {}", taskEvent); return; } if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) { @@ -114,20 +114,24 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor { future.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { - logger.error("handle event {} failed: {}", taskExecuteThread.getProcessInstanceId(), ex); - if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) { - taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId()); - logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId()); + Integer processInstanceId = taskExecuteThread.getProcessInstanceId(); + logger.error("persist event failed processInstanceId: {}", processInstanceId, ex); + if (!processInstanceExecCacheManager.contains(processInstanceId)) { + taskExecuteThreadMap.remove(processInstanceId); + logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}", + processInstanceId); } multiThreadFilterMap.remove(taskExecuteThread.getKey()); } @Override public void onSuccess(Object result) { - logger.info("persist events {} succeeded.", taskExecuteThread.getProcessInstanceId()); - if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) { - taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId()); - logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId()); + Integer processInstanceId = taskExecuteThread.getProcessInstanceId(); + logger.info("persist events succeeded, processInstanceId: {}", processInstanceId); + if (!processInstanceExecCacheManager.contains(processInstanceId)) { + taskExecuteThreadMap.remove(processInstanceId); + logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}", + processInstanceId); } multiThreadFilterMap.remove(taskExecuteThread.getKey()); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 3365e67c35..ae627c44f5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -17,10 +17,19 @@ package org.apache.dolphinscheduler.server.master.runner; -import com.google.common.collect.Lists; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.math.NumberUtils; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; +import static org.apache.dolphinscheduler.common.Constants.COMMA; +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; +import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR; +import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.FailureStrategy; @@ -69,8 +78,10 @@ import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.corn.CronUtils; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; import java.util.ArrayList; import java.util.Arrays; @@ -88,18 +99,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; -import static org.apache.dolphinscheduler.common.Constants.COMMA; -import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; -import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; -import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR; -import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; /** * Workflow execute task, used to execute a workflow instance. @@ -1001,7 +1004,7 @@ public class WorkflowExecuteRunnable implements Runnable { * @param taskInstance task instance * @return TaskInstance */ - private TaskInstance submitTaskExec(TaskInstance taskInstance) { + private Optional submitTaskExec(TaskInstance taskInstance) { try { // package task instance before submit processService.packageTaskInstance(taskInstance, processInstance); @@ -1019,7 +1022,7 @@ public class WorkflowExecuteRunnable implements Runnable { logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!", processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName()); - return null; + return Optional.empty(); } // in a dag, only one taskInstance is valid per taskCode, so need to set the old taskInstance invalid @@ -1058,10 +1061,10 @@ public class WorkflowExecuteRunnable implements Runnable { taskStateChangeEvent.setType(StateEventType.TASK_STATE_CHANGE); this.stateEvents.add(taskStateChangeEvent); } - return taskInstance; + return Optional.of(taskInstance); } catch (Exception e) { logger.error("submit standby task error", e); - return null; + return Optional.empty(); } } @@ -1360,6 +1363,7 @@ public class WorkflowExecuteRunnable implements Runnable { for (TaskInstance task : taskInstances) { if (readyToSubmitTaskQueue.contains(task)) { + logger.warn("Task is already at submit queue, taskInstanceId: {}", task.getId()); continue; } @@ -1764,9 +1768,6 @@ public class WorkflowExecuteRunnable implements Runnable { * @param taskInstance task instance */ private void removeTaskFromStandbyList(TaskInstance taskInstance) { - logger.info("remove task from stand by list, id: {} name:{}", - taskInstance.getId(), - taskInstance.getName()); try { readyToSubmitTaskQueue.remove(taskInstance); } catch (Exception e) { @@ -1859,14 +1860,14 @@ public class WorkflowExecuteRunnable implements Runnable { } DependResult dependResult = getDependResultForTask(task); if (DependResult.SUCCESS == dependResult) { - TaskInstance taskInstance = submitTaskExec(task); - if (taskInstance == null) { + Optional taskInstanceOptional = submitTaskExec(task); + if (!taskInstanceOptional.isPresent()) { this.taskFailedSubmit = true; // Remove and add to complete map and error map removeTaskFromStandbyList(task); completeTaskMap.put(task.getTaskCode(), task.getId()); errorTaskMap.put(task.getTaskCode(), task.getId()); - logger.error("process {}, task {}, code:{} submit task failed.", task.getProcessInstanceId(), task.getName(), task.getTaskCode()); + logger.error("Task submitted failed, processInstanceId: {}, taskInstanceId: {}", task.getProcessInstanceId(), task.getId()); } else { removeTaskFromStandbyList(task); } @@ -1874,11 +1875,11 @@ public class WorkflowExecuteRunnable implements Runnable { // if the dependency fails, the current node is not submitted and the state changes to failure. dependFailedTaskMap.put(task.getTaskCode(), task.getId()); removeTaskFromStandbyList(task); - logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult); + logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(), dependResult); } else if (DependResult.NON_EXEC == dependResult) { // for some reasons(depend task pause/stop) this task would not be submit removeTaskFromStandbyList(task); - logger.info("remove task {},id:{} , because depend result : {}", task.getName(), task.getId(), dependResult); + logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}", task.getId(), dependResult); } } } catch (Exception e) { @@ -2067,6 +2068,11 @@ public class WorkflowExecuteRunnable implements Runnable { } private void measureTaskState(StateEvent taskStateEvent) { + if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) { + // the event is broken + logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent); + return; + } if (taskStateEvent.getExecutionStatus().typeIsFinished()) { TaskMetrics.incTaskFinish(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index 849c3a23b3..b658f669f7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -105,7 +105,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { /** * Handle the events belong to the given workflow. */ - public void executeEvent(WorkflowExecuteRunnable workflowExecuteThread) { + public void executeEvent(final WorkflowExecuteRunnable workflowExecuteThread) { if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) { return; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index fd803ac371..fcfe7c67bf 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -119,7 +119,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName()); return true; } - logger.debug("task ready to submit: {}", taskInstance.getName()); + logger.info("task ready to submit: taskInstanceId: {}", taskInstance.getId()); TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(), processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(), @@ -134,7 +134,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { taskPriority.setTaskExecutionContext(taskExecutionContext); taskUpdateQueue.put(taskPriority); - logger.info("master submit success, task : {}", taskInstance.getName()); + logger.info("Master submit task to priority queue success, taskInstanceId : {}", taskInstance.getId()); return true; } catch (Exception e) { logger.error("submit task error", e); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java index 542697a92f..41c2bd56d3 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java @@ -21,8 +21,9 @@ import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE; import org.apache.commons.lang3.StringUtils; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.Map; -import java.util.Objects; import java.util.ServiceLoader; import java.util.concurrent.ConcurrentHashMap; @@ -36,27 +37,31 @@ public final class TaskProcessorFactory { private static final Logger logger = LoggerFactory.getLogger(TaskProcessorFactory.class); - public static final Map PROCESS_MAP = new ConcurrentHashMap<>(); + public static final Map> PROCESS_MAP = new ConcurrentHashMap<>(); private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE; static { for (ITaskProcessor iTaskProcessor : ServiceLoader.load(ITaskProcessor.class)) { - PROCESS_MAP.put(iTaskProcessor.getType(), iTaskProcessor); + try { + PROCESS_MAP.put(iTaskProcessor.getType(), (Constructor) iTaskProcessor.getClass().getConstructor()); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("The task processor should has a no args constructor"); + } } } - public static ITaskProcessor getTaskProcessor(String type) throws InstantiationException, IllegalAccessException { + public static ITaskProcessor getTaskProcessor(String type) throws InvocationTargetException, InstantiationException, IllegalAccessException { if (StringUtils.isEmpty(type)) { type = DEFAULT_PROCESSOR; } - ITaskProcessor iTaskProcessor = PROCESS_MAP.get(type); - if (Objects.isNull(iTaskProcessor)) { - logger.warn("task processor not found for type: {}", type); - return PROCESS_MAP.get(DEFAULT_PROCESSOR); + Constructor iTaskProcessorConstructor = PROCESS_MAP.get(type); + if (iTaskProcessorConstructor == null) { + logger.warn("ITaskProcessor could not found for taskType: {}", type); + iTaskProcessorConstructor = PROCESS_MAP.get(DEFAULT_PROCESSOR); } - return iTaskProcessor.getClass().newInstance(); + return iTaskProcessorConstructor.newInstance(); } /** diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java index d0371809cc..b974a40b31 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.runner.task; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import java.lang.reflect.InvocationTargetException; + import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -27,7 +29,7 @@ import org.junit.Test; public class TaskProcessorFactoryTest { @Test - public void testFactory() throws InstantiationException, IllegalAccessException { + public void testFactory() throws InvocationTargetException, InstantiationException, IllegalAccessException { TaskInstance taskInstance = new TaskInstance(); taskInstance.setTaskType("shell"); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java index 239a3993c0..b4177ec25d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java @@ -21,6 +21,9 @@ import org.apache.dolphinscheduler.common.utils.NetUtils; import java.net.InetSocketAddress; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.netty.channel.Channel; /** @@ -28,6 +31,8 @@ import io.netty.channel.Channel; */ public class ChannelUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(ChannelUtils.class); + private ChannelUtils() { throw new IllegalStateException(ChannelUtils.class.getName()); } @@ -49,7 +54,7 @@ public class ChannelUtils { * @return remote address */ public static String getRemoteAddress(Channel channel) { - return NetUtils.getHost(((InetSocketAddress) channel.remoteAddress()).getAddress()); + return toAddress(channel).getAddress(); } /** @@ -60,6 +65,11 @@ public class ChannelUtils { */ public static Host toAddress(Channel channel) { InetSocketAddress socketAddress = ((InetSocketAddress) channel.remoteAddress()); + if (socketAddress == null) { + // the remote channel already closed + LOGGER.warn("The channel is already closed"); + return Host.EMPTY; + } return new Host(NetUtils.getHost(socketAddress.getAddress()), socketAddress.getPort()); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java index 2163e9c7d8..dc8e1f0d36 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java @@ -27,6 +27,8 @@ import java.util.Objects; */ public class Host implements Serializable { + public static final Host EMPTY = new Host(); + /** * address */ diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 3760bde2b6..b42e033b20 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -766,7 +766,8 @@ public class ProcessServiceImpl implements ProcessService { processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); processInstance.setRecovery(Flag.NO); processInstance.setStartTime(new Date()); - processInstance.setRestartTime(processInstance.getStartTime()); + // the new process instance restart time is null. + processInstance.setRestartTime(null); processInstance.setRunTimes(1); processInstance.setMaxTryTimes(0); processInstance.setCommandParam(command.getCommandParam()); @@ -1285,7 +1286,7 @@ public class ProcessServiceImpl implements ProcessService { @Override @Transactional(rollbackFor = Exception.class) public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) { - logger.info("start submit task : {}, instance id:{}, state: {}", + logger.info("start submit task : {}, processInstance id:{}, state: {}", taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState()); //submit to db TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java index 231fd2a20f..2e939ee332 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -21,11 +21,13 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; +import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.Iterator; import java.util.PriorityQueue; +import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; /** * Task instances priority queue implementation @@ -40,12 +42,8 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator()); - - /** - * Lock used for all public operations - */ - private final ReentrantLock lock = new ReentrantLock(true); + private final PriorityQueue queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator()); + private final Set taskInstanceIdSet = Collections.synchronizedSet(new HashSet<>()); /** * put task instance to priority queue @@ -56,6 +54,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue iterator = this.queue.iterator(); - while (iterator.hasNext()) { - TaskInstance taskInstance = iterator.next(); - if (taskCode == taskInstance.getTaskCode() - && taskVersion == taskInstance.getTaskDefinitionVersion()) { - return true; - } - } - return false; - + public boolean contains(int taskInstanceId) { + return taskInstanceIdSet.contains(taskInstanceId); } /** diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java index 8da3a6c194..67e40d1189 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java @@ -46,14 +46,11 @@ public class PeerTaskInstancePriorityQueueTest { Assert.assertTrue(queue.size() < peekBeforeLength); } - @Test + + @Test(expected = TaskPriorityQueueException.class) public void poll() throws Exception { PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); - try { - queue.poll(1000, TimeUnit.MILLISECONDS); - } catch (TaskPriorityQueueException e) { - e.printStackTrace(); - } + queue.poll(1000, TimeUnit.MILLISECONDS); } @Test diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index d968090e46..945cae26bb 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -109,7 +109,7 @@ master: # master prepare execute thread number to limit handle commands in parallel pre-exec-threads: 10 # master execute thread number to limit process instances in parallel - exec-threads: 100 + exec-threads: 10 # master dispatch task number per batch dispatch-task-number: 3 # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight @@ -134,7 +134,7 @@ worker: # worker listener port listen-port: 1234 # worker execute thread number to limit task instances in parallel - exec-threads: 100 + exec-threads: 10 # worker heartbeat interval, the unit is second heartbeat-interval: 10 # worker host weight to dispatch tasks, default value 100 diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java index 94a3ffe43a..441deb495b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java @@ -34,6 +34,7 @@ import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.SystemUtils; import java.io.File; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; @@ -147,7 +148,11 @@ public class ShellTask extends AbstractTaskExecutor { if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); } - Files.createFile(path, attr); + try { + Files.createFile(path, attr); + } catch (FileAlreadyExistsException ex) { + // this is expected + } } Files.write(path, shellParameters.getRawScript().getBytes(), StandardOpenOption.APPEND); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 3eb112bb07..059aa64ce6 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -21,9 +21,9 @@ import java.util.Set; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.stereotype.Component; +import org.springframework.context.annotation.Configuration; -@Component +@Configuration @EnableConfigurationProperties @ConfigurationProperties("worker") public class WorkerConfig { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 448f62c8c2..8264ea52f0 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -163,6 +163,8 @@ public class TaskCallbackService { } } }); + } else { + logger.warn("Remote channel of taskInstanceId is null: {}, cannot send command: {}", taskInstanceId, command); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 48c590cbd7..df75caa3bb 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -19,12 +19,15 @@ package org.apache.dolphinscheduler.server.worker.runner; import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; -import org.apache.commons.lang3.tuple.Pair; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException; import org.apache.dolphinscheduler.common.storage.StorageOperate; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; @@ -39,17 +42,26 @@ import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.tuple.Pair; import java.io.File; import java.io.IOException; -import java.util.*; +import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import com.google.common.base.Strings; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Strings; + /** * task scheduler thread */ @@ -232,7 +244,11 @@ public class TaskExecuteThread implements Runnable, Delayed { org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath)); logger.info("exec local path: {} cleared.", execLocalPath); } catch (IOException e) { - logger.error("delete exec dir failed : {}", e.getMessage(), e); + if (e instanceof NoSuchFileException) { + // this is expected + } else { + logger.error("Delete exec dir failed.", e); + } } } } @@ -263,7 +279,7 @@ public class TaskExecuteThread implements Runnable, Delayed { task.cancelApplication(true); ProcessUtils.killYarnJob(taskExecutionContext); } catch (Exception e) { - logger.error(e.getMessage(), e); + logger.error("Kill task failed", e); } } }