diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index f7a8dddd03..76d4ae1525 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -18,6 +18,8 @@ package org.apache.dolphinscheduler.server.master.config; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -27,15 +29,47 @@ import org.springframework.stereotype.Component; @EnableConfigurationProperties @ConfigurationProperties("master") public class MasterConfig { + /** + * The master RPC server listen port. + */ private int listenPort; + /** + * The max batch size used to fetch command from database. + */ private int fetchCommandNum; + /** + * The thread number used to prepare processInstance. This number shouldn't bigger than fetchCommandNum. + */ private int preExecThreads; + /** + * todo: We may need to split the process/task into different thread size. + * The thread number used to handle processInstance and task event. + * Will create two thread poll to execute {@link WorkflowExecuteRunnable} and {@link TaskExecuteRunnable}. + */ private int execThreads; + /** + * The task dispatch thread pool size. + */ private int dispatchTaskNumber; + /** + * Worker select strategy. + */ private HostSelector hostSelector; + /** + * Master heart beat task execute interval. + */ private int heartbeatInterval; + /** + * task submit max retry times. + */ private int taskCommitRetryTimes; + /** + * task submit retry interval/ms. + */ private int taskCommitInterval; + /** + * state wheel check interval/ms, if this value is bigger, may increase the delay of task/processInstance. + */ private int stateWheelInterval; private double maxCpuLoadAvg; private double reservedMemory; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 0dbfb73a2c..d75595de1a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -39,6 +39,8 @@ import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.spi.utils.JSONUtils; +import org.apache.commons.collections.CollectionUtils; + import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -54,11 +56,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import io.micrometer.core.annotation.Counted; -import io.micrometer.core.annotation.Timed; - -import org.apache.commons.lang3.time.StopWatch; - /** * TaskUpdateQueue consumer */ @@ -124,7 +121,7 @@ public class TaskPriorityQueueConsumer extends Thread { try { List failedDispatchTasks = this.batchDispatch(fetchTaskNum); - if (!failedDispatchTasks.isEmpty()) { + if (CollectionUtils.isNotEmpty(failedDispatchTasks)) { TaskMetrics.incTaskDispatchFailed(failedDispatchTasks.size()); for (TaskPriority dispatchFailedTask : failedDispatchTasks) { taskPriorityQueue.put(dispatchFailedTask); @@ -157,11 +154,15 @@ public class TaskPriorityQueueConsumer extends Thread { } consumerThreadPoolExecutor.submit(() -> { - boolean dispatchResult = this.dispatchTask(taskPriority); - if (!dispatchResult) { - failedDispatchTasks.add(taskPriority); + try { + boolean dispatchResult = this.dispatchTask(taskPriority); + if (!dispatchResult) { + failedDispatchTasks.add(taskPriority); + } + } finally { + // make sure the latch countDown + latch.countDown(); } - latch.countDown(); }); } @@ -171,10 +172,10 @@ public class TaskPriorityQueueConsumer extends Thread { } /** - * dispatch task + * Dispatch task to worker. * * @param taskPriority taskPriority - * @return result + * @return dispatch result, return true if dispatch success, return false if dispatch failed. */ protected boolean dispatchTask(TaskPriority taskPriority) { TaskMetrics.incTaskDispatch(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index ae9a461d10..23577a90db 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -49,7 +49,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** - * master scheduler thread + * Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed. */ @Service public class MasterSchedulerService extends Thread { @@ -163,11 +163,8 @@ public class MasterSchedulerService extends Thread { MasterServerMetrics.incMasterConsumeCommand(commands.size()); for (ProcessInstance processInstance : processInstances) { - if (processInstance == null) { - continue; - } - WorkflowExecuteRunnable workflowExecuteThread = new WorkflowExecuteRunnable( + WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable( processInstance , processService , nettyExecutorManager @@ -175,11 +172,11 @@ public class MasterSchedulerService extends Thread { , masterConfig , stateWheelExecuteThread); - this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread); + this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable); if (processInstance.getTimeout() > 0) { stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); } - workflowExecuteThreadPool.startWorkflow(workflowExecuteThread); + workflowExecuteThreadPool.startWorkflow(workflowExecuteRunnable); } } @@ -203,7 +200,7 @@ public class MasterSchedulerService extends Thread { logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId()); } } catch (Exception e) { - logger.error("handle command error ", e); + logger.error("handle command {} error ", command.getId(), e); processService.moveToErrorCommand(command, e.toString()); } finally { latch.countDown(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index 7635209ebb..e85ddf08bd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -31,8 +31,9 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey; -import org.apache.hadoop.util.ThreadUtil; +import org.apache.commons.lang3.ThreadUtils; +import java.time.Duration; import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; @@ -84,6 +85,7 @@ public class StateWheelExecuteThread extends Thread { @Override public void run() { + Duration checkInterval = Duration.ofMillis(masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS); while (Stopper.isRunning()) { try { checkTask4Timeout(); @@ -93,7 +95,11 @@ public class StateWheelExecuteThread extends Thread { } catch (Exception e) { logger.error("state wheel thread check error:", e); } - ThreadUtil.sleepAtLeastIgnoreInterrupts((long) masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS); + try { + ThreadUtils.sleep(checkInterval); + } catch (InterruptedException e) { + logger.error("state wheel thread sleep error", e); + } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 82f942e03b..014abd30bc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -805,11 +805,11 @@ public class WorkflowExecuteRunnable implements Runnable { */ @Override public void run() { - if (this.taskInstanceMap.size() > 0) { + if (this.taskInstanceMap.size() > 0 || isStart) { + logger.warn("The workflow has already been started"); return; } try { - isStart = false; buildFlowDag(); initTaskQueue(); submitPostNode(null); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index 36edf1371b..d8e2fbbeda 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -48,6 +48,9 @@ import org.springframework.util.concurrent.ListenableFutureCallback; import com.google.common.base.Strings; +/** + * Used to execute {@link WorkflowExecuteRunnable}, when + */ @Component public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { @@ -71,7 +74,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { /** * multi-thread filter, avoid handling workflow at the same time */ - private ConcurrentHashMap multiThreadFilterMap = new ConcurrentHashMap(); + private ConcurrentHashMap multiThreadFilterMap = new ConcurrentHashMap<>(); @PostConstruct private void init() { @@ -94,7 +97,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { } /** - * start workflow + * Start the given workflow. */ public void startWorkflow(WorkflowExecuteRunnable workflowExecuteThread) { ProcessInstanceMetrics.incProcessInstanceSubmit(); @@ -102,13 +105,14 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { } /** - * execute workflow + * Handle the events belong to the given workflow. */ public void executeEvent(WorkflowExecuteRunnable workflowExecuteThread) { if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) { return; } if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) { + logger.warn("The workflow:{} has been executed by another thread", workflowExecuteThread.getKey()); return; } multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread); @@ -123,8 +127,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { @Override public void onSuccess(Object result) { - // if an exception occurs, first, the error message cannot be printed in the log; - // secondly, the `multiThreadFilterMap` cannot remove the `workflowExecuteThread`, resulting in the state of process instance cannot be changed and memory leak try { if (workflowExecuteThread.workFlowFinish()) { stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance()); @@ -134,8 +136,10 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { } } catch (Exception e) { logger.error("handle events {} success, but notify changed error", processInstanceId, e); + } finally { + // make sure the process has been removed from multiThreadFilterMap + multiThreadFilterMap.remove(workflowExecuteThread.getKey()); } - multiThreadFilterMap.remove(workflowExecuteThread.getKey()); } }); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java index 56033730bc..842cb86fd9 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java @@ -303,6 +303,8 @@ public class NettyRemotingClient { logger.error(msg, future.cause()); throw new RemotingException(msg); } + } catch (RemotingException remotingException) { + throw remotingException; } catch (Exception e) { logger.error("Send command {} to address {} encounter error.", command, host.getAddress()); throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e); @@ -384,10 +386,10 @@ public class NettyRemotingClient { if (this.responseFutureExecutor != null) { this.responseFutureExecutor.shutdownNow(); } + logger.info("netty client closed"); } catch (Exception ex) { logger.error("netty client close exception", ex); } - logger.info("netty client closed"); } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index a976c05c99..4bc9de32f2 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.service.process; -import io.micrometer.core.annotation.Counted; import static java.util.stream.Collectors.toSet; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; @@ -161,6 +160,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; +import io.micrometer.core.annotation.Counted; + /** * process relative dao that some mappers in this. */ @@ -1252,8 +1253,9 @@ public class ProcessServiceImpl implements ProcessService { Thread.sleep(commitInterval); } catch (Exception e) { logger.error("task commit to db failed", e); + } finally { + retryTimes += 1; } - retryTimes += 1; } return task; } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java index 8d630beeb0..fef5f8ff79 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java @@ -29,15 +29,12 @@ import org.springframework.stereotype.Service; */ @Service public class TaskPriorityQueueImpl implements TaskPriorityQueue { - /** - * queue size - */ - private static final Integer QUEUE_MAX_SIZE = 3000; /** - * queue + * Task queue, this queue is unbounded, this means it will cause OutOfMemoryError. + * The master will stop to generate the task if memory is too high. */ - private PriorityBlockingQueue queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE); + private final PriorityBlockingQueue queue = new PriorityBlockingQueue<>(3000); /** * put task takePriorityInfo