diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java index 405df09d3e..7f4be924dd 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java @@ -20,10 +20,12 @@ package org.apache.dolphinscheduler.common.enums; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import io.netty.channel.Channel; +import lombok.Data; /** * state event */ +@Data public class StateEvent { /** @@ -45,79 +47,4 @@ public class StateEvent { private Channel channel; - public ExecutionStatus getExecutionStatus() { - return executionStatus; - } - - public void setExecutionStatus(ExecutionStatus executionStatus) { - this.executionStatus = executionStatus; - } - - public int getTaskInstanceId() { - return taskInstanceId; - } - - public long getTaskCode() { - return taskCode; - } - - public int getProcessInstanceId() { - return processInstanceId; - } - - public void setProcessInstanceId(int processInstanceId) { - this.processInstanceId = processInstanceId; - } - - public String getContext() { - return context; - } - - public void setContext(String context) { - this.context = context; - } - - public void setTaskInstanceId(int taskInstanceId) { - this.taskInstanceId = taskInstanceId; - } - - public void setTaskCode(long taskCode) { - this.taskCode = taskCode; - } - - public Channel getChannel() { - return channel; - } - - public void setChannel(Channel channel) { - this.channel = channel; - } - - @Override - public String toString() { - return "State Event :" - + "key: " + key - + " type: " + type - + " executeStatus: " + executionStatus - + " task instance id: " + taskInstanceId - + " process instance id: " + processInstanceId - + " context: " + context - ; - } - - public String getKey() { - return key; - } - - public void setKey(String key) { - this.key = key; - } - - public void setType(StateEventType type) { - this.type = type; - } - - public StateEventType getType() { - return this.type; - } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java index 723a6639c3..a072d5a232 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java @@ -19,14 +19,14 @@ package org.apache.dolphinscheduler.common.utils; import org.apache.dolphinscheduler.common.shell.ShellExecutor; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang.SystemUtils; +import org.apache.commons.lang3.StringUtils; import java.io.BufferedReader; +import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; -import java.io.File; import java.lang.management.ManagementFactory; import java.lang.management.OperatingSystemMXBean; import java.lang.management.RuntimeMXBean; @@ -460,24 +460,23 @@ public class OSUtils { } /** - * check memory and cpu usage + * Check memory and cpu usage is overload the given thredshod. * - * @param maxCpuloadAvg maxCpuloadAvg + * @param maxCpuLoadAvg maxCpuLoadAvg * @param reservedMemory reservedMemory - * @return check memory and cpu usage + * @return True, if the cpu or memory exceed the given thredshod. */ - public static Boolean checkResource(double maxCpuloadAvg, double reservedMemory) { + public static Boolean isOverload(double maxCpuLoadAvg, double reservedMemory) { // system load average double loadAverage = loadAverage(); // system available physical memory double availablePhysicalMemorySize = availablePhysicalMemorySize(); - if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) { - logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G", - loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory); - return false; - } else { + if (loadAverage > maxCpuLoadAvg || availablePhysicalMemorySize < reservedMemory) { + logger.warn("Current cpu load average {} is too high or available memory {}G is too low, under max.cpuLoad.avg={} and reserved.memory={}G", + loadAverage, availablePhysicalMemorySize, maxCpuLoadAvg, reservedMemory); return true; } + return false; } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java index 2f5f6dc472..59e064105a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.server.master.cache; +import lombok.NonNull; + import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import java.util.Collection; @@ -55,7 +57,7 @@ public interface ProcessInstanceExecCacheManager { * @param processInstanceId processInstanceId * @param workflowExecuteThread if it is null, will not be cached */ - void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread); + void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread); /** * get all WorkflowExecuteThread from cache diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java index dc562d37bd..8f00029a3e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java @@ -30,6 +30,8 @@ import org.springframework.stereotype.Component; import com.google.common.collect.ImmutableList; +import lombok.NonNull; + /** * cache of process instance id and WorkflowExecuteThread */ @@ -59,10 +61,7 @@ public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecC } @Override - public void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread) { - if (workflowExecuteThread == null) { - return; - } + public void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread) { processInstanceExecMaps.put(processInstanceId, workflowExecuteThread); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index eab33a07a6..e8cf4b2919 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -131,9 +131,8 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread { for (TaskPriority dispatchFailedTask : failedDispatchTasks) { taskPriorityQueue.put(dispatchFailedTask); } - // If there are tasks in a cycle that cannot find the worker group, - // sleep for 1 second - if (taskPriorityQueue.size() <= failedDispatchTasks.size()) { + // If the all task dispatch failed, will sleep for 1s to avoid the master cpu higher. + if (fetchTaskNum == failedDispatchTasks.size()) { TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS); } } @@ -218,8 +217,7 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread { } private Command toCommand(TaskExecutionContext taskExecutionContext) { - TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(); - requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(taskExecutionContext)); + TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(taskExecutionContext); return requestCommand.convert2Command(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java index 1693972dac..ad0e479e5b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java @@ -17,11 +17,13 @@ package org.apache.dolphinscheduler.server.master.metrics; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; public final class ProcessInstanceMetrics { @@ -29,15 +31,25 @@ public final class ProcessInstanceMetrics { throw new UnsupportedOperationException("Utility class"); } + private static final Timer COMMAND_QUERY_TIMETER = + Timer.builder("ds.workflow.command.query.duration") + .description("Command query duration") + .register(Metrics.globalRegistry); + + private static final Timer PROCESS_INSTANCE_GENERATE_TIMER = + Timer.builder("ds.workflow.instance.generate.duration") + .description("Process instance generated duration") + .register(Metrics.globalRegistry); + private static final Counter PROCESS_INSTANCE_SUBMIT_COUNTER = - Counter.builder("ds.workflow.instance.submit.count") - .description("Process instance submit total count") - .register(Metrics.globalRegistry); + Counter.builder("ds.workflow.instance.submit.count") + .description("Process instance submit total count") + .register(Metrics.globalRegistry); private static final Counter PROCESS_INSTANCE_TIMEOUT_COUNTER = - Counter.builder("ds.workflow.instance.timeout.count") - .description("Process instance timeout total count") - .register(Metrics.globalRegistry); + Counter.builder("ds.workflow.instance.timeout.count") + .description("Process instance timeout total count") + .register(Metrics.globalRegistry); private static final Counter PROCESS_INSTANCE_FINISH_COUNTER = Counter.builder("ds.workflow.instance.finish.count") @@ -55,19 +67,27 @@ public final class ProcessInstanceMetrics { .register(Metrics.globalRegistry); private static final Counter PROCESS_INSTANCE_STOP_COUNTER = - Counter.builder("ds.workflow.instance.stop.count") - .description("Process instance stop total count") - .register(Metrics.globalRegistry); + Counter.builder("ds.workflow.instance.stop.count") + .description("Process instance stop total count") + .register(Metrics.globalRegistry); private static final Counter PROCESS_INSTANCE_FAILOVER_COUNTER = - Counter.builder("ds.workflow.instance.failover.count") - .description("Process instance failover total count") - .register(Metrics.globalRegistry); + Counter.builder("ds.workflow.instance.failover.count") + .description("Process instance failover total count") + .register(Metrics.globalRegistry); + + public static void recordCommandQueryTime(long milliseconds) { + COMMAND_QUERY_TIMETER.record(milliseconds, TimeUnit.MILLISECONDS); + } + + public static void recordProcessInstanceGenerateTime(long milliseconds) { + PROCESS_INSTANCE_GENERATE_TIMER.record(milliseconds, TimeUnit.MILLISECONDS); + } public static synchronized void registerProcessInstanceRunningGauge(Supplier function) { Gauge.builder("ds.workflow.instance.running", function) - .description("The current running process instance count") - .register(Metrics.globalRegistry); + .description("The current running process instance count") + .register(Metrics.globalRegistry); } public static void incProcessInstanceSubmit() { @@ -97,5 +117,4 @@ public final class ProcessInstanceMetrics { public static void incProcessInstanceFailover() { PROCESS_INSTANCE_FAILOVER_COUNTER.increment(); } - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 3f5a715d7c..bce8753781 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics; +import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -62,31 +63,19 @@ public class MasterSchedulerService extends BaseDaemonThread { */ private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class); - /** - * dolphinscheduler database interface - */ @Autowired private ProcessService processService; - /** - * master config - */ @Autowired private MasterConfig masterConfig; - /** - * alert manager - */ @Autowired private ProcessAlertManager processAlertManager; - /** - * netty remoting client - */ private NettyRemotingClient nettyRemotingClient; @Autowired - NettyExecutorManager nettyExecutorManager; + private NettyExecutorManager nettyExecutorManager; /** * master prepare exec service @@ -108,6 +97,8 @@ public class MasterSchedulerService extends BaseDaemonThread { @Autowired private CuringGlobalParamsService curingGlobalParamsService; + private String masterAddress; + protected MasterSchedulerService() { super("MasterCommandLoopThread"); } @@ -119,6 +110,7 @@ public class MasterSchedulerService extends BaseDaemonThread { this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads()); NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); + this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort()); } @Override @@ -142,13 +134,13 @@ public class MasterSchedulerService extends BaseDaemonThread { public void run() { while (Stopper.isRunning()) { try { - boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory()); - if (!runCheckFlag) { + boolean isOverload = OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory()); + if (isOverload) { MasterServerMetrics.incMasterOverload(); Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } - scheduleProcess(); + scheduleWorkflow(); } catch (InterruptedException interruptedException) { logger.warn("Master schedule service interrupted, close the loop", interruptedException); Thread.currentThread().interrupt(); @@ -160,13 +152,12 @@ public class MasterSchedulerService extends BaseDaemonThread { } /** - * 1. get command by slot - * 2. donot handle command if slot is empty + * Query command from database by slot, and transform to workflow instance, then submit to workflowExecuteThreadPool. */ - private void scheduleProcess() throws InterruptedException { + private void scheduleWorkflow() throws InterruptedException { List commands = findCommands(); if (CollectionUtils.isEmpty(commands)) { - //indicate that no command ,sleep for 1s + // indicate that no command ,sleep for 1s Thread.sleep(Constants.SLEEP_TIME_MILLIS); return; } @@ -181,7 +172,7 @@ public class MasterSchedulerService extends BaseDaemonThread { try { LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); logger.info("Master schedule service starting workflow instance"); - WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable( + final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable( processInstance , processService , nettyExecutorManager @@ -194,9 +185,14 @@ public class MasterSchedulerService extends BaseDaemonThread { if (processInstance.getTimeout() > 0) { stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); } - workflowExecuteThreadPool.startWorkflow(workflowExecuteRunnable); + ProcessInstanceMetrics.incProcessInstanceSubmit(); + workflowExecuteThreadPool.submit(workflowExecuteRunnable); logger.info("Master schedule service started workflow instance"); + } catch (Exception ex) { + processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId()); + stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId()); + logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex); } finally { LoggerUtils.removeWorkflowInstanceIdMDC(); } @@ -204,21 +200,21 @@ public class MasterSchedulerService extends BaseDaemonThread { } private List command2ProcessInstance(List commands) throws InterruptedException { + long commandTransformStartTime = System.currentTimeMillis(); logger.info("Master schedule service transforming command to ProcessInstance, commandSize: {}", commands.size()); List processInstances = Collections.synchronizedList(new ArrayList<>(commands.size())); CountDownLatch latch = new CountDownLatch(commands.size()); for (final Command command : commands) { masterPrepareExecService.execute(() -> { try { + // todo: this check is not safe, the slot may change after command transform. // slot check again SlotCheckState slotCheckState = slotCheck(command); if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) { logger.info("Master handle command {} skip, slot check state: {}", command.getId(), slotCheckState); return; } - ProcessInstance processInstance = processService.handleCommand(logger, - getLocalAddress(), - command); + ProcessInstance processInstance = processService.handleCommand(masterAddress, command); if (processInstance != null) { processInstances.add(processInstance); logger.info("Master handle command {} end, create process instance {}", command.getId(), processInstance.getId()); @@ -236,24 +232,26 @@ public class MasterSchedulerService extends BaseDaemonThread { latch.await(); logger.info("Master schedule service transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}", commands.size(), processInstances.size()); + ProcessInstanceMetrics.recordProcessInstanceGenerateTime(System.currentTimeMillis() - commandTransformStartTime); return processInstances; } private List findCommands() { + long scheduleStartTime = System.currentTimeMillis(); + int thisMasterSlot = ServerNodeManager.getSlot(); + int masterCount = ServerNodeManager.getMasterSize(); + if (masterCount <= 0) { + logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot); + return Collections.emptyList(); + } int pageNumber = 0; int pageSize = masterConfig.getFetchCommandNum(); - List result = new ArrayList<>(); - if (Stopper.isRunning()) { - int thisMasterSlot = ServerNodeManager.getSlot(); - int masterCount = ServerNodeManager.getMasterSize(); - if (masterCount > 0) { - result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot); - if (CollectionUtils.isNotEmpty(result)) { - logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}", - result.size(), thisMasterSlot, masterCount); - } - } + final List result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot); + if (CollectionUtils.isNotEmpty(result)) { + logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}", + result.size(), thisMasterSlot, masterCount); } + ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime); return result; } @@ -271,7 +269,4 @@ public class MasterSchedulerService extends BaseDaemonThread { return state; } - private String getLocalAddress() { - return NetUtils.getAddr(masterConfig.getListenPort()); - } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index 53acb4ce83..65c7db924d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -59,24 +59,24 @@ public class StateWheelExecuteThread extends BaseDaemonThread { private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class); /** - * process timeout check list + * ProcessInstance timeout check list, element is the processInstanceId. */ - private ConcurrentLinkedQueue processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>(); /** * task time out check list */ - private ConcurrentLinkedQueue taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>(); /** * task retry check list */ - private ConcurrentLinkedQueue taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>(); /** * task state check list */ - private ConcurrentLinkedQueue taskInstanceStateCheckList = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue taskInstanceStateCheckList = new ConcurrentLinkedQueue<>(); @Autowired private MasterConfig masterConfig; @@ -116,8 +116,8 @@ public class StateWheelExecuteThread extends BaseDaemonThread { logger.info("Success add workflow instance into timeout check list"); } - public void removeProcess4TimeoutCheck(ProcessInstance processInstance) { - boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstance.getId()); + public void removeProcess4TimeoutCheck(int processInstanceId) { + boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstanceId); if (removeFlag) { logger.info("Success remove workflow instance from timeout check list"); } else { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 9fa452c66e..42654665a2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -435,6 +435,7 @@ public class WorkflowExecuteRunnable implements Runnable { if (task.getState().typeIsFinished()) { if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) { + logger.warn("The task instance is already complete, stateEvent: {}", stateEvent); return true; } taskFinished(task); @@ -461,11 +462,9 @@ public class WorkflowExecuteRunnable implements Runnable { } private void taskFinished(TaskInstance taskInstance) { - logger.info("work flow {} task id:{} code:{} state:{} ", - processInstance.getId(), - taskInstance.getId(), - taskInstance.getTaskCode(), - taskInstance.getState()); + logger.info("TaskInstance finished task code:{} state:{} ", + taskInstance.getTaskCode(), + taskInstance.getState()); activeTaskProcessorMaps.remove(taskInstance.getTaskCode()); stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance); @@ -481,12 +480,13 @@ public class WorkflowExecuteRunnable implements Runnable { } } else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) { // retry task + logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState()); retryTaskInstance(taskInstance); } else if (taskInstance.getState().typeIsFailure()) { completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); // There are child nodes and the failure policy is: CONTINUE - if (DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag) - && processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) { + if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE + && DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) { submitPostNode(Long.toString(taskInstance.getTaskCode())); } else { errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); @@ -495,6 +495,7 @@ public class WorkflowExecuteRunnable implements Runnable { } } } else if (taskInstance.getState().typeIsFinished()) { + // todo: when the task instance type is pause, then it should not in completeTaskMap completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); } @@ -536,7 +537,7 @@ public class WorkflowExecuteRunnable implements Runnable { } TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance); if (newTaskInstance == null) { - logger.error("retry fail, new taskInstancce is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId()); + logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId()); return; } waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance); @@ -634,7 +635,7 @@ public class WorkflowExecuteRunnable implements Runnable { * check if task instance exist by task code */ public boolean checkTaskInstanceByCode(long taskCode) { - if (taskInstanceMap == null || taskInstanceMap.size() == 0) { + if (taskInstanceMap.isEmpty()) { return false; } for (TaskInstance taskInstance : taskInstanceMap.values()) { @@ -649,7 +650,7 @@ public class WorkflowExecuteRunnable implements Runnable { * check if task instance exist by id */ public boolean checkTaskInstanceById(int taskInstanceId) { - if (taskInstanceMap == null || taskInstanceMap.size() == 0) { + if (taskInstanceMap.isEmpty()) { return false; } return taskInstanceMap.containsKey(taskInstanceId); @@ -697,7 +698,7 @@ public class WorkflowExecuteRunnable implements Runnable { if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) { // serial wait execution type needs to wake up the waiting process - if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()){ + if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()) { endProcess(); return true; } @@ -1296,6 +1297,10 @@ public class WorkflowExecuteRunnable implements Runnable { } } + public Collection getAllTaskInstances() { + return taskInstanceMap.values(); + } + private void setVarPoolValue(Map allProperty, Map allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) { //for this taskInstance all the param in this part is IN. thisProperty.setDirect(Direct.IN); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index eb8bcd94bb..4085e99051 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -51,7 +51,7 @@ import com.google.common.base.Strings; import lombok.NonNull; /** - * Used to execute {@link WorkflowExecuteRunnable}, when + * Used to execute {@link WorkflowExecuteRunnable}. */ @Component public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { @@ -99,14 +99,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { logger.info("Submit state event success, stateEvent: {}", stateEvent); } - /** - * Start the given workflow. - */ - public void startWorkflow(WorkflowExecuteRunnable workflowExecuteThread) { - ProcessInstanceMetrics.incProcessInstanceSubmit(); - submit(workflowExecuteThread); - } - /** * Handle the events belong to the given workflow. */ @@ -138,7 +130,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { try { LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId()); if (workflowExecuteThread.workFlowFinish()) { - stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance()); + stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance().getId()); processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId); notifyProcessChanged(workflowExecuteThread.getProcessInstance()); logger.info("Workflow instance is finished."); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 329f72d1b1..16aa9d9957 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -96,6 +96,8 @@ import org.slf4j.LoggerFactory; import com.zaxxer.hikari.HikariDataSource; +import lombok.NonNull; + public abstract class BaseTaskProcessor implements ITaskProcessor { protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass())); @@ -114,22 +116,19 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { protected int commitInterval; - protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); + protected ProcessService processService; - protected MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class); + protected MasterConfig masterConfig; - protected TaskPluginManager taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class); + protected TaskPluginManager taskPluginManager; protected String threadLoggerInfoName; @Override - public void init(TaskInstance taskInstance, ProcessInstance processInstance) { - if (processService == null) { - processService = SpringApplicationContext.getBean(ProcessService.class); - } - if (masterConfig == null) { - masterConfig = SpringApplicationContext.getBean(MasterConfig.class); - } + public void init(@NonNull TaskInstance taskInstance, @NonNull ProcessInstance processInstance) { + processService = SpringApplicationContext.getBean(ProcessService.class); + masterConfig = SpringApplicationContext.getBean(MasterConfig.class); + taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class); this.taskInstance = taskInstance; this.processInstance = processInstance; this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes(); @@ -245,7 +244,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { @Override public String getType() { - return null; + throw new UnsupportedOperationException("This abstract class doesn's has type"); } @Override diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java index 5b3d788c8c..d9ca82a45c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java @@ -20,64 +20,22 @@ package org.apache.dolphinscheduler.server.master.runner.task; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import java.util.Objects; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NonNull; /** - * task instance key, processInstanceId + * Used to identify a task instance. */ +@Data +@AllArgsConstructor public class TaskInstanceKey { - private int processInstanceId; - private long taskCode; - private int taskVersion; + private final int processInstanceId; + private final long taskCode; + private final int taskVersion; - public TaskInstanceKey(int processInstanceId, long taskCode, int taskVersion) { - this.processInstanceId = processInstanceId; - this.taskCode = taskCode; - this.taskVersion = taskVersion; - } - - public int getProcessInstanceId() { - return processInstanceId; - } - - public long getTaskCode() { - return taskCode; - } - - public int getTaskVersion() { - return taskVersion; - } - - public static TaskInstanceKey getTaskInstanceKey(ProcessInstance processInstance, TaskInstance taskInstance) { - if (processInstance == null || taskInstance == null) { - return null; - } + public static TaskInstanceKey getTaskInstanceKey(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) { return new TaskInstanceKey(processInstance.getId(), taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()); } - @Override - public String toString() { - return "TaskKey{" - + "processInstanceId=" + processInstanceId - + ", taskCode=" + taskCode - + ", taskVersion=" + taskVersion - + '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TaskInstanceKey taskInstanceKey = (TaskInstanceKey) o; - return processInstanceId == taskInstanceKey.processInstanceId && taskCode == taskInstanceKey.taskCode && taskVersion == taskInstanceKey.taskVersion; - } - - @Override - public int hashCode() { - return Objects.hash(processInstanceId, taskCode, taskVersion); - } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java index 32f6562ca0..b377ceedbf 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java @@ -17,19 +17,19 @@ package org.apache.dolphinscheduler.server.master.service; -import static com.google.common.base.Preconditions.checkNotNull; - import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; @@ -49,6 +49,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,7 @@ import org.springframework.stereotype.Component; import io.micrometer.core.annotation.Counted; import io.micrometer.core.annotation.Timed; +import lombok.NonNull; /** * failover service @@ -67,15 +69,20 @@ public class FailoverService { private final MasterConfig masterConfig; private final ProcessService processService; private final WorkflowExecuteThreadPool workflowExecuteThreadPool; - - public FailoverService(RegistryClient registryClient, - MasterConfig masterConfig, - ProcessService processService, - WorkflowExecuteThreadPool workflowExecuteThreadPool) { - this.registryClient = checkNotNull(registryClient); - this.masterConfig = checkNotNull(masterConfig); - this.processService = checkNotNull(processService); - this.workflowExecuteThreadPool = checkNotNull(workflowExecuteThreadPool); + private final ProcessInstanceExecCacheManager cacheManager; + private final String localAddress; + + public FailoverService(@NonNull RegistryClient registryClient, + @NonNull MasterConfig masterConfig, + @NonNull ProcessService processService, + @NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool, + @NonNull ProcessInstanceExecCacheManager cacheManager) { + this.registryClient = registryClient; + this.masterConfig = masterConfig; + this.processService = processService; + this.workflowExecuteThreadPool = workflowExecuteThreadPool; + this.cacheManager = cacheManager; + this.localAddress = NetUtils.getAddr(masterConfig.getListenPort()); } /** @@ -88,7 +95,7 @@ public class FailoverService { if (CollectionUtils.isEmpty(hosts)) { return; } - LOGGER.info("Master failover service {} begin to failover hosts:{}", getLocalAddress(), hosts); + LOGGER.info("Master failover service {} begin to failover hosts:{}", localAddress, hosts); for (String host : hosts) { failoverMasterWithLock(host); @@ -174,11 +181,10 @@ public class FailoverService { } /** - * failover worker tasks + * Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP tasks belong the given worker, + * and failover these tasks. *

- * 1. kill yarn job if there are yarn jobs in tasks. - * 2. change task state from running to need failover. - * 3. failover all tasks when workerHost is null + * Note: When we do worker failover, the master will only failover the processInstance belongs to the current master. * * @param workerHost worker host */ @@ -188,29 +194,40 @@ public class FailoverService { } long startTime = System.currentTimeMillis(); - List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); - Map processInstanceCacheMap = new HashMap<>(); + // we query the task instance from cache, so that we can directly update the cache + final List needFailoverTaskInstanceList = cacheManager.getAll() + .stream() + .flatMap(workflowExecuteRunnable -> workflowExecuteRunnable.getAllTaskInstances().stream()) + .filter(taskInstance -> + workerHost.equals(taskInstance.getHost()) && ExecutionStatus.isNeedFailoverWorkflowInstanceState(taskInstance.getState())) + .collect(Collectors.toList()); + final Map processInstanceCacheMap = new HashMap<>(); LOGGER.info("start worker[{}] failover, task list size:{}", workerHost, needFailoverTaskInstanceList.size()); - List workerServers = registryClient.getServerList(NodeType.WORKER); + final List workerServers = registryClient.getServerList(NodeType.WORKER); for (TaskInstance taskInstance : needFailoverTaskInstanceList) { - ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId()); - if (processInstance == null) { - processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId()); + try { + ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId()); if (processInstance == null) { - LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null", - taskInstance.getProcessInstanceId(), taskInstance.getId()); + processInstance = cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId()).getProcessInstance(); + if (processInstance == null) { + LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null", + taskInstance.getProcessInstanceId(), taskInstance.getId()); + continue; + } + processInstanceCacheMap.put(processInstance.getId(), processInstance); + } + + // only failover the task owned myself if worker down. + if (!StringUtils.equalsIgnoreCase(processInstance.getHost(), localAddress)) { continue; } - processInstanceCacheMap.put(processInstance.getId(), processInstance); - } - // only failover the task owned myself if worker down. - if (!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) { - continue; + LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); + failoverTaskInstance(processInstance, taskInstance, workerServers); + } finally { + LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } - - LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); - failoverTaskInstance(processInstance, taskInstance, workerServers); } LOGGER.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime); } @@ -221,17 +238,14 @@ public class FailoverService { * 1. kill yarn job if run on worker and there are yarn jobs in tasks. * 2. change task state from running to need failover. * 3. try to notify local master + * * @param processInstance * @param taskInstance - * @param servers if failover master, servers container master servers and worker servers; if failover worker, servers contain worker servers. + * @param servers if failover master, servers container master servers and worker servers; if failover worker, servers contain worker servers. */ - private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance, List servers) { - if (processInstance == null) { - LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null", - taskInstance.getProcessInstanceId(), taskInstance.getId()); - return; - } + private void failoverTaskInstance(@NonNull ProcessInstance processInstance, TaskInstance taskInstance, List servers) { if (!checkTaskInstanceNeedFailover(servers, taskInstance)) { + LOGGER.info("The taskInstance doesn't need to failover"); return; } TaskMetrics.incTaskFailover(); @@ -240,6 +254,7 @@ public class FailoverService { taskInstance.setProcessInstance(processInstance); if (!isMasterTask) { + LOGGER.info("The failover taskInstance is not master task"); TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() .buildTaskInstanceRelatedInfo(taskInstance) .buildProcessInstanceRelatedInfo(processInstance) @@ -249,6 +264,8 @@ public class FailoverService { // only kill yarn job if exists , the local thread has exited ProcessUtils.killYarnJob(taskExecutionContext); } + } else { + LOGGER.info("The failover taskInstance is a master task"); } taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); @@ -278,7 +295,7 @@ public class FailoverService { while (iterator.hasNext()) { String host = iterator.next(); if (registryClient.checkNodeExists(host, NodeType.MASTER)) { - if (!getLocalAddress().equals(host)) { + if (!localAddress.equals(host)) { iterator.remove(); } } @@ -390,11 +407,8 @@ public class FailoverService { return serverStartupTime; } - /** - * get local address - */ - String getLocalAddress() { - return NetUtils.getAddr(masterConfig.getListenPort()); + public String getLocalAddress() { + return localAddress; } } diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml index 619322bfab..a908ac3072 100644 --- a/dolphinscheduler-master/src/main/resources/application.yaml +++ b/dolphinscheduler-master/src/main/resources/application.yaml @@ -92,7 +92,7 @@ master: pre-exec-threads: 10 # master execute thread number to limit process instances in parallel exec-threads: 100 - # master dispatch task number per batch + # master dispatch task number per batch, if all the tasks dispatch failed in a batch, will sleep 1s. dispatch-task-number: 3 # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight host-selector: lower_weight diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java index d7b53f3c34..96bdfe4914 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java @@ -56,7 +56,7 @@ public class ProcessInstanceExecCacheManagerImplTest { Assert.assertTrue(processInstanceExecCacheManager.contains(1)); } - @Test + @Test(expected = NullPointerException.class) public void testCacheNull() { processInstanceExecCacheManager.cache(2, null); WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(2); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java index 06265d2e35..b2c7d63cd7 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java @@ -49,8 +49,7 @@ public class ExecutionContextTestUtils { .buildProcessDefinitionRelatedInfo(processDefinition) .create(); - TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(); - requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(context)); + TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(context); Command command = requestCommand.convert2Command(); ExecutionContext executionContext = new ExecutionContext(command, ExecutorType.WORKER); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java index 88c6655d2a..fdd79552b1 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java @@ -97,7 +97,7 @@ public class NettyExecutorManagerTest { private Command toCommand(TaskExecutionContext taskExecutionContext) { TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(); - requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(taskExecutionContext)); + requestCommand.setTaskExecutionContext(taskExecutionContext); return requestCommand.convert2Command(); } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java index 4e3d99347a..fc9bcca3aa 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java @@ -18,8 +18,8 @@ package org.apache.dolphinscheduler.server.master.service; import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH; + import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doNothing; @@ -31,7 +31,9 @@ import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -45,7 +47,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; @@ -63,7 +64,6 @@ import com.google.common.collect.Lists; @PrepareForTest({RegistryClient.class}) @PowerMockIgnore({"javax.management.*"}) public class FailoverServiceTest { - @InjectMocks private FailoverService failoverService; @Mock @@ -78,6 +78,9 @@ public class FailoverServiceTest { @Mock private WorkflowExecuteThreadPool workflowExecuteThreadPool; + @Mock + private ProcessInstanceExecCacheManager cacheManager; + private static int masterPort = 5678; private static int workerPort = 1234; @@ -95,6 +98,7 @@ public class FailoverServiceTest { springApplicationContext.setApplicationContext(applicationContext); given(masterConfig.getListenPort()).willReturn(masterPort); + failoverService = new FailoverService(registryClient, masterConfig, processService, workflowExecuteThreadPool, cacheManager); testMasterHost = failoverService.getLocalAddress(); String ip = testMasterHost.split(":")[0]; @@ -182,7 +186,16 @@ public class FailoverServiceTest { @Test public void failoverWorkTest() { + workerTaskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + WorkflowExecuteRunnable workflowExecuteRunnable = Mockito.mock(WorkflowExecuteRunnable.class); + Mockito.when(workflowExecuteRunnable.getAllTaskInstances()).thenReturn(Lists.newArrayList(workerTaskInstance)); + Mockito.when(workflowExecuteRunnable.getProcessInstance()).thenReturn(processInstance); + + Mockito.when(cacheManager.getAll()).thenReturn(Lists.newArrayList(workflowExecuteRunnable)); + Mockito.when(cacheManager.getByProcessInstanceId(Mockito.anyInt())).thenReturn(workflowExecuteRunnable); + + failoverService.failoverServerWhenDown(testWorkerHost, NodeType.WORKER); - Assert.assertEquals(workerTaskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); + Assert.assertEquals(ExecutionStatus.NEED_FAULT_TOLERANCE, workerTaskInstance.getState()); } } diff --git a/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java index b7aecf7ee6..64259f2f4f 100644 --- a/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java +++ b/dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java @@ -20,10 +20,6 @@ package org.apache.dolphinscheduler.meter; -import javax.annotation.PostConstruct; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java index 9baa321a9e..527a269411 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.remote.command; import java.io.Serializable; @@ -25,6 +26,8 @@ import java.util.concurrent.atomic.AtomicLong; */ public class Command implements Serializable { + private static final long serialVersionUID = -1L; + private static final AtomicLong REQUEST_ID = new AtomicLong(1); public static final byte MAGIC = (byte) 0xbabe; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java index 5b2e33922c..000e3f4e02 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java @@ -18,39 +18,23 @@ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import java.io.Serializable; -/** - * execute task request command - */ -public class TaskExecuteRequestCommand implements Serializable { - - /** - * task execution context - */ - private String taskExecutionContext; - - public String getTaskExecutionContext() { - return taskExecutionContext; - } +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; - public void setTaskExecutionContext(String taskExecutionContext) { - this.taskExecutionContext = taskExecutionContext; - } +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TaskExecuteRequestCommand implements Serializable { - public TaskExecuteRequestCommand() { - } + private static final long serialVersionUID = -1L; - public TaskExecuteRequestCommand(String taskExecutionContext) { - this.taskExecutionContext = taskExecutionContext; - } + private TaskExecutionContext taskExecutionContext; - /** - * package request command - * - * @return command - */ public Command convert2Command() { Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_REQUEST); @@ -59,10 +43,4 @@ public class TaskExecuteRequestCommand implements Serializable { return command; } - @Override - public String toString() { - return "TaskExecuteRequestCommand{" - + "taskExecutionContext='" + taskExecutionContext + '\'' - + '}'; - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java index 2637d6a11a..125b2d82db 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.SystemUtils; + import java.io.File; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -43,6 +44,8 @@ import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import lombok.NonNull; + /** * mainly used to get the start command line of a process. */ @@ -181,7 +184,10 @@ public class ProcessUtils { * @param taskExecutionContext taskExecutionContext * @return yarn application ids */ - public static List killYarnJob(TaskExecutionContext taskExecutionContext) { + public static List killYarnJob(@NonNull TaskExecutionContext taskExecutionContext) { + if (taskExecutionContext.getLogPath() == null) { + return Collections.emptyList(); + } try { Thread.sleep(Constants.SLEEP_TIME_MILLIS); String log; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 9ff8689fd3..8d61a9a460 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -52,16 +52,16 @@ import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval; import org.apache.dolphinscheduler.spi.enums.ResourceType; -import org.slf4j.Logger; -import org.springframework.transaction.annotation.Transactional; import java.util.Date; import java.util.List; import java.util.Map; +import org.springframework.transaction.annotation.Transactional; + public interface ProcessService { @Transactional - ProcessInstance handleCommand(Logger logger, String host, Command command); + ProcessInstance handleCommand(String host, Command command); void moveToErrorCommand(Command command, String message); @@ -161,8 +161,6 @@ public interface ProcessService { void changeOutParam(TaskInstance taskInstance); - List convertIntListToString(List intList); - Schedule querySchedule(int id); List queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index a8ae3f33a9..fc436ee4bd 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.service.process; -import static java.util.stream.Collectors.toSet; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; @@ -31,6 +30,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; +import static java.util.stream.Collectors.toSet; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -173,13 +174,6 @@ public class ProcessServiceImpl implements ProcessService { private final Logger logger = LoggerFactory.getLogger(getClass()); - private final int[] stateArray = new int[] {ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), - ExecutionStatus.DISPATCH.ordinal(), - ExecutionStatus.RUNNING_EXECUTION.ordinal(), - ExecutionStatus.DELAY_EXECUTION.ordinal(), - ExecutionStatus.READY_PAUSE.ordinal(), - ExecutionStatus.READY_STOP.ordinal()}; - @Autowired private UserMapper userMapper; @@ -290,7 +284,7 @@ public class ProcessServiceImpl implements ProcessService { */ @Override @Transactional - public ProcessInstance handleCommand(Logger logger, String host, Command command) { + public ProcessInstance handleCommand(String host, Command command) { ProcessInstance processInstance = constructProcessInstance(command, host); // cannot construct process instance, return null if (processInstance == null) { @@ -1973,8 +1967,7 @@ public class ProcessServiceImpl implements ProcessService { * @param intList intList * @return string list */ - @Override - public List convertIntListToString(List intList) { + private List convertIntListToString(List intList) { if (intList == null) { return new ArrayList<>(); } @@ -2039,12 +2032,12 @@ public class ProcessServiceImpl implements ProcessService { */ @Override public List queryNeedFailoverProcessInstances(String host) { - return processInstanceMapper.queryByHostAndStatus(host, stateArray); + return processInstanceMapper.queryByHostAndStatus(host, ExecutionStatus.getNeedFailoverWorkflowInstanceState()); } @Override public List queryNeedFailoverProcessInstanceHost() { - return processInstanceMapper.queryNeedFailoverProcessInstanceHost(stateArray); + return processInstanceMapper.queryNeedFailoverProcessInstanceHost(ExecutionStatus.getNeedFailoverWorkflowInstanceState()); } /** @@ -2081,7 +2074,7 @@ public class ProcessServiceImpl implements ProcessService { @Override public List queryNeedFailoverTaskInstances(String host) { return taskInstanceMapper.queryByHostAndStatus(host, - stateArray); + ExecutionStatus.getNeedFailoverWorkflowInstanceState()); } /** @@ -2215,7 +2208,7 @@ public class ProcessServiceImpl implements ProcessService { return processInstanceMapper.queryLastRunningProcess(definitionCode, startTime, endTime, - stateArray); + ExecutionStatus.getNeedFailoverWorkflowInstanceState()); } /** diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 0a859235ac..0dd9734bd1 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -17,7 +17,12 @@ package org.apache.dolphinscheduler.service.process; -import com.fasterxml.jackson.databind.JsonNode; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; + +import static org.mockito.ArgumentMatchers.any; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Flag; @@ -72,9 +77,17 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.dp.InputType; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.OptionSourceType; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ValueType; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; -import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.cron.CronUtilsTest; +import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.spi.params.base.FormType; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -88,17 +101,7 @@ import org.powermock.reflect.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; -import static org.mockito.ArgumentMatchers.any; +import com.fasterxml.jackson.databind.JsonNode; /** * process service test @@ -289,8 +292,8 @@ public class ProcessServiceTest { command.setProcessDefinitionCode(222); command.setCommandType(CommandType.REPEAT_RUNNING); command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\"" - + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}"); - Assert.assertNull(processService.handleCommand(logger, host, command)); + + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}"); + Assert.assertNull(processService.handleCommand(host, command)); int definitionVersion = 1; long definitionCode = 123; @@ -325,7 +328,7 @@ public class ProcessServiceTest { Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition)); Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance); - Assert.assertNotNull(processService.handleCommand(logger, host, command1)); + Assert.assertNotNull(processService.handleCommand(host, command1)); Command command2 = new Command(); command2.setId(2); @@ -335,7 +338,7 @@ public class ProcessServiceTest { command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS); command2.setProcessInstanceId(processInstanceId); Mockito.when(commandMapper.deleteById(2)).thenReturn(1); - Assert.assertNotNull(processService.handleCommand(logger, host, command2)); + Assert.assertNotNull(processService.handleCommand(host, command2)); Command command3 = new Command(); command3.setId(3); @@ -345,7 +348,7 @@ public class ProcessServiceTest { command3.setCommandParam("{\"WaitingThreadInstanceId\":222}"); command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); Mockito.when(commandMapper.deleteById(3)).thenReturn(1); - Assert.assertNotNull(processService.handleCommand(logger, host, command3)); + Assert.assertNotNull(processService.handleCommand(host, command3)); Command command4 = new Command(); command4.setId(4); @@ -355,7 +358,7 @@ public class ProcessServiceTest { command4.setCommandType(CommandType.REPEAT_RUNNING); command4.setProcessInstanceId(processInstanceId); Mockito.when(commandMapper.deleteById(4)).thenReturn(1); - Assert.assertNotNull(processService.handleCommand(logger, host, command4)); + Assert.assertNotNull(processService.handleCommand(host, command4)); Command command5 = new Command(); command5.setId(5); @@ -374,7 +377,7 @@ public class ProcessServiceTest { processDefinition.getGlobalParamList(), CommandType.START_PROCESS, processInstance.getScheduleTime(), null)).thenReturn("\"testStartParam1\""); - ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5); + ProcessInstance processInstance1 = processService.handleCommand(host, command5); Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\"")); ProcessDefinition processDefinition1 = new ProcessDefinition(); @@ -399,7 +402,7 @@ public class ProcessServiceTest { Mockito.when(processInstanceMapper.queryDetailById(223)).thenReturn(processInstance2); Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1); Mockito.when(commandMapper.deleteById(1)).thenReturn(1); - Assert.assertNotNull(processService.handleCommand(logger, host, command1)); + Assert.assertNotNull(processService.handleCommand(host, command1)); Command command6 = new Command(); command6.setId(6); @@ -410,7 +413,7 @@ public class ProcessServiceTest { Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists); Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true); Mockito.when(commandMapper.deleteById(6)).thenReturn(1); - ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6); + ProcessInstance processInstance6 = processService.handleCommand(host, command6); Assert.assertTrue(processInstance6 != null); processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_DISCARD); @@ -429,7 +432,7 @@ public class ProcessServiceTest { command7.setProcessDefinitionVersion(1); Mockito.when(commandMapper.deleteById(7)).thenReturn(1); Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null); - ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7); + ProcessInstance processInstance8 = processService.handleCommand(host, command7); Assert.assertTrue(processInstance8 != null); ProcessDefinition processDefinition2 = new ProcessDefinition(); @@ -453,7 +456,7 @@ public class ProcessServiceTest { Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(12L, 1, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists); Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1); Mockito.when(commandMapper.deleteById(9)).thenReturn(1); - ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9); + ProcessInstance processInstance10 = processService.handleCommand(host, command9); Assert.assertTrue(processInstance10 == null); } @@ -494,7 +497,7 @@ public class ProcessServiceTest { Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance); // will throw exception when command id is 0 and delete fail - processService.handleCommand(logger, host, command1); + processService.handleCommand(host, command1); } @Test diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java index 4e1958c671..2ada887868 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java @@ -22,13 +22,25 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; +import java.io.Serializable; import java.util.Date; import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + /** * to master/worker task transport */ -public class TaskExecutionContext { +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TaskExecutionContext implements Serializable { + + private static final long serialVersionUID = -1L; /** * task id @@ -248,399 +260,4 @@ public class TaskExecutionContext { * max memory */ private Integer memoryMax; - - public String getTaskLogName() { - return taskLogName; - } - - public void setTaskLogName(String taskLogName) { - this.taskLogName = taskLogName; - } - - public Map getResources() { - return resources; - } - - public void setResources(Map resources) { - this.resources = resources; - } - - public Map getParamsMap() { - return paramsMap; - } - - public void setParamsMap(Map paramsMap) { - this.paramsMap = paramsMap; - } - - public int getTaskInstanceId() { - return taskInstanceId; - } - - public void setTaskInstanceId(int taskInstanceId) { - this.taskInstanceId = taskInstanceId; - } - - public String getTaskName() { - return taskName; - } - - public void setTaskName(String taskName) { - this.taskName = taskName; - } - - public Date getFirstSubmitTime() { - return firstSubmitTime; - } - - public void setFirstSubmitTime(Date firstSubmitTime) { - this.firstSubmitTime = firstSubmitTime; - } - - public Date getStartTime() { - return startTime; - } - - public void setStartTime(Date startTime) { - this.startTime = startTime; - } - - public String getTaskType() { - return taskType; - } - - public void setTaskType(String taskType) { - this.taskType = taskType; - } - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public String getExecutePath() { - return executePath; - } - - public void setExecutePath(String executePath) { - this.executePath = executePath; - } - - public String getLogPath() { - return logPath; - } - - public void setLogPath(String logPath) { - this.logPath = logPath; - } - - public String getTaskJson() { - return taskJson; - } - - public void setTaskJson(String taskJson) { - this.taskJson = taskJson; - } - - public int getProcessId() { - return processId; - } - - public void setProcessId(int processId) { - this.processId = processId; - } - - public String getAppIds() { - return appIds; - } - - public void setAppIds(String appIds) { - this.appIds = appIds; - } - - public int getProcessInstanceId() { - return processInstanceId; - } - - public void setProcessInstanceId(int processInstanceId) { - this.processInstanceId = processInstanceId; - } - - public Date getScheduleTime() { - return scheduleTime; - } - - public void setScheduleTime(Date scheduleTime) { - this.scheduleTime = scheduleTime; - } - - public String getGlobalParams() { - return globalParams; - } - - public void setGlobalParams(String globalParams) { - this.globalParams = globalParams; - } - - public int getExecutorId() { - return executorId; - } - - public void setExecutorId(int executorId) { - this.executorId = executorId; - } - - public int getCmdTypeIfComplement() { - return cmdTypeIfComplement; - } - - public void setCmdTypeIfComplement(int cmdTypeIfComplement) { - this.cmdTypeIfComplement = cmdTypeIfComplement; - } - - public String getTenantCode() { - return tenantCode; - } - - public void setTenantCode(String tenantCode) { - this.tenantCode = tenantCode; - } - - public String getQueue() { - return queue; - } - - public void setQueue(String queue) { - this.queue = queue; - } - - public int getProcessDefineId() { - return processDefineId; - } - - public void setProcessDefineId(int processDefineId) { - this.processDefineId = processDefineId; - } - - public int getProjectId() { - return projectId; - } - - public void setProjectId(int projectId) { - this.projectId = projectId; - } - - public String getTaskParams() { - return taskParams; - } - - public void setTaskParams(String taskParams) { - this.taskParams = taskParams; - } - - public String getEnvFile() { - return envFile; - } - - public void setEnvFile(String envFile) { - this.envFile = envFile; - } - - public String getEnvironmentConfig() { - return environmentConfig; - } - - public void setEnvironmentConfig(String config) { - this.environmentConfig = config; - } - - public Map getDefinedParams() { - return definedParams; - } - - public void setDefinedParams(Map definedParams) { - this.definedParams = definedParams; - } - - public String getTaskAppId() { - return taskAppId; - } - - public void setTaskAppId(String taskAppId) { - this.taskAppId = taskAppId; - } - - public TaskTimeoutStrategy getTaskTimeoutStrategy() { - return taskTimeoutStrategy; - } - - public void setTaskTimeoutStrategy(TaskTimeoutStrategy taskTimeoutStrategy) { - this.taskTimeoutStrategy = taskTimeoutStrategy; - } - - public int getTaskTimeout() { - return taskTimeout; - } - - public void setTaskTimeout(int taskTimeout) { - this.taskTimeout = taskTimeout; - } - - public String getWorkerGroup() { - return workerGroup; - } - - public void setWorkerGroup(String workerGroup) { - this.workerGroup = workerGroup; - } - - public int getDelayTime() { - return delayTime; - } - - public void setDelayTime(int delayTime) { - this.delayTime = delayTime; - } - - public ResourceParametersHelper getResourceParametersHelper() { - return resourceParametersHelper; - } - - public void setResourceParametersHelper(ResourceParametersHelper resourceParametersHelper) { - this.resourceParametersHelper = resourceParametersHelper; - } - - public String getVarPool() { - return varPool; - } - - public void setVarPool(String varPool) { - this.varPool = varPool; - } - - public int getDryRun() { - return dryRun; - } - - public void setDryRun(int dryRun) { - this.dryRun = dryRun; - } - - public Long getProcessDefineCode() { - return processDefineCode; - } - - public void setProcessDefineCode(Long processDefineCode) { - this.processDefineCode = processDefineCode; - } - - public int getProcessDefineVersion() { - return processDefineVersion; - } - - public void setProcessDefineVersion(int processDefineVersion) { - this.processDefineVersion = processDefineVersion; - } - - public long getProjectCode() { - return projectCode; - } - - public void setProjectCode(long projectCode) { - this.projectCode = projectCode; - } - - public DataQualityTaskExecutionContext getDataQualityTaskExecutionContext() { - return dataQualityTaskExecutionContext; - } - - public void setDataQualityTaskExecutionContext(DataQualityTaskExecutionContext dataQualityTaskExecutionContext) { - this.dataQualityTaskExecutionContext = dataQualityTaskExecutionContext; - } - - public void setCurrentExecutionStatus(ExecutionStatus currentExecutionStatus) { - this.currentExecutionStatus = currentExecutionStatus; - } - - public ExecutionStatus getCurrentExecutionStatus() { - return currentExecutionStatus; - } - - public Date getEndTime() { - return endTime; - } - - public void setEndTime(Date endTime) { - this.endTime = endTime; - } - - public Integer getCpuQuota() { - return cpuQuota; - } - - public void setCpuQuota(Integer cpuQuota) { - this.cpuQuota = cpuQuota; - } - - public Integer getMemoryMax() { - return memoryMax; - } - - public void setMemoryMax(Integer memoryMax) { - this.memoryMax = memoryMax; - } - - public K8sTaskExecutionContext getK8sTaskExecutionContext() { - return k8sTaskExecutionContext; - } - - public void setK8sTaskExecutionContext(K8sTaskExecutionContext k8sTaskExecutionContext) { - this.k8sTaskExecutionContext = k8sTaskExecutionContext; - } - - @Override - public String toString() { - return "TaskExecutionContext{" - + "taskInstanceId=" + taskInstanceId - + ", taskName='" + taskName + '\'' - + ", currentExecutionStatus=" + currentExecutionStatus - + ", firstSubmitTime=" + firstSubmitTime - + ", startTime=" + startTime - + ", taskType='" + taskType + '\'' - + ", host='" + host + '\'' - + ", executePath='" + executePath + '\'' - + ", logPath='" + logPath + '\'' - + ", taskJson='" + taskJson + '\'' - + ", processId=" + processId - + ", processDefineCode=" + processDefineCode - + ", processDefineVersion=" + processDefineVersion - + ", appIds='" + appIds + '\'' - + ", processInstanceId=" + processInstanceId - + ", scheduleTime=" + scheduleTime - + ", globalParams='" + globalParams + '\'' - + ", executorId=" + executorId - + ", cmdTypeIfComplement=" + cmdTypeIfComplement - + ", tenantCode='" + tenantCode + '\'' - + ", queue='" + queue + '\'' - + ", projectCode=" + projectCode - + ", taskParams='" + taskParams + '\'' - + ", envFile='" + envFile + '\'' - + ", dryRun='" + dryRun + '\'' - + ", definedParams=" + definedParams - + ", taskAppId='" + taskAppId + '\'' - + ", taskTimeoutStrategy=" + taskTimeoutStrategy - + ", taskTimeout=" + taskTimeout - + ", workerGroup='" + workerGroup + '\'' - + ", environmentConfig='" + environmentConfig + '\'' - + ", delayTime=" + delayTime - + ", resources=" + resources - + ", sqlTaskExecutionContext=" + sqlTaskExecutionContext - + ", k8sTaskExecutionContext=" + k8sTaskExecutionContext - + ", dataQualityTaskExecutionContext=" + dataQualityTaskExecutionContext - + '}'; - } - } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java index 82c8db12d4..411de30c34 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java @@ -78,6 +78,15 @@ public enum ExecutionStatus { private static HashMap EXECUTION_STATUS_MAP = new HashMap<>(); + private static final int[] NEED_FAILOVER_STATES = new int[] { + ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), + ExecutionStatus.DISPATCH.ordinal(), + ExecutionStatus.RUNNING_EXECUTION.ordinal(), + ExecutionStatus.DELAY_EXECUTION.ordinal(), + ExecutionStatus.READY_PAUSE.ordinal(), + ExecutionStatus.READY_STOP.ordinal() + }; + static { for (ExecutionStatus executionStatus : ExecutionStatus.values()) { EXECUTION_STATUS_MAP.put(executionStatus.code, executionStatus); @@ -180,4 +189,18 @@ public enum ExecutionStatus { } throw new IllegalArgumentException("invalid status : " + status); } + + public static boolean isNeedFailoverWorkflowInstanceState(ExecutionStatus executionStatus) { + return + ExecutionStatus.SUBMITTED_SUCCESS == executionStatus + || ExecutionStatus.DISPATCH == executionStatus + || ExecutionStatus.RUNNING_EXECUTION == executionStatus + || ExecutionStatus.DELAY_EXECUTION == executionStatus + || ExecutionStatus.READY_PAUSE == executionStatus + || ExecutionStatus.READY_STOP == executionStatus; + } + + public static int[] getNeedFailoverWorkflowInstanceState() { + return NEED_FAILOVER_STATES; + } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java index 1e69d873ab..6c68cc00ec 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java @@ -22,22 +22,20 @@ import java.util.function.Supplier; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Metrics; +import lombok.experimental.UtilityClass; -public final class WorkerServerMetrics { - - public WorkerServerMetrics() { - throw new UnsupportedOperationException("Utility class"); - } +@UtilityClass +public class WorkerServerMetrics { private static final Counter WORKER_OVERLOAD_COUNTER = - Counter.builder("ds.worker.overload.count") - .description("overloaded workers count") - .register(Metrics.globalRegistry); + Counter.builder("ds.worker.overload.count") + .description("overloaded workers count") + .register(Metrics.globalRegistry); private static final Counter WORKER_SUBMIT_QUEUE_IS_FULL_COUNTER = - Counter.builder("ds.worker.full.submit.queue.count") - .description("full worker submit queues count") - .register(Metrics.globalRegistry); + Counter.builder("ds.worker.full.submit.queue.count") + .description("full worker submit queues count") + .register(Metrics.globalRegistry); public static void incWorkerOverloadCount() { WORKER_OVERLOAD_COUNTER.increment(); @@ -49,8 +47,9 @@ public final class WorkerServerMetrics { public static void registerWorkerRunningTaskGauge(Supplier supplier) { Gauge.builder("ds.task.running", supplier) - .description("number of running tasks on workers") - .register(Metrics.globalRegistry); + .description("number of running tasks on workers") + .register(Metrics.globalRegistry); } + } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 8264ea52f0..3f70974344 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel; import org.apache.dolphinscheduler.server.worker.cache.ResponseCache; import java.util.Arrays; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; @@ -92,9 +93,6 @@ public class TaskCallbackService { * change remote channel */ public void changeRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) { - if (REMOTE_CHANNELS.containsKey(taskInstanceId)) { - REMOTE_CHANNELS.remove(taskInstanceId); - } REMOTE_CHANNELS.put(taskInstanceId, channel); } @@ -104,19 +102,19 @@ public class TaskCallbackService { * @param taskInstanceId taskInstanceId * @return callback channel */ - private NettyRemoteChannel getRemoteChannel(int taskInstanceId) { + private Optional getRemoteChannel(int taskInstanceId) { Channel newChannel; NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId); if (nettyRemoteChannel != null) { if (nettyRemoteChannel.isActive()) { - return nettyRemoteChannel; + return Optional.of(nettyRemoteChannel); } newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost()); if (newChannel != null) { - return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId); + return Optional.of(getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId)); } } - return null; + return Optional.empty(); } public long pause(int ntries) { @@ -151,15 +149,13 @@ public class TaskCallbackService { * @param command command */ public void send(int taskInstanceId, Command command) { - NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId); - if (nettyRemoteChannel != null) { - nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener() { - + Optional nettyRemoteChannel = getRemoteChannel(taskInstanceId); + if (nettyRemoteChannel.isPresent()) { + nettyRemoteChannel.get().writeAndFlush(command).addListener(new ChannelFutureListener() { @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - // remove(taskInstanceId); - return; + public void operationComplete(ChannelFuture future) { + if (!future.isSuccess()) { + logger.error("Send callback command error, taskInstanceId: {}, command: {}", taskInstanceId, command); } } }); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 533f40fa41..c14a2e891e 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -111,8 +111,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { } logger.info("task execute request command : {}", taskRequestCommand); - String contextJson = taskRequestCommand.getTaskExecutionContext(); - TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class); + TaskExecutionContext taskExecutionContext = taskRequestCommand.getTaskExecutionContext(); if (taskExecutionContext == null) { logger.error("task execution context is null"); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java index 03df8de451..f691f7c8de 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.runner; +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.remote.command.Command; @@ -34,7 +35,7 @@ import org.springframework.stereotype.Component; * Retry Report Task Status Thread */ @Component -public class RetryReportTaskStatusThread implements Runnable { +public class RetryReportTaskStatusThread extends BaseDaemonThread { private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class); @@ -46,11 +47,14 @@ public class RetryReportTaskStatusThread implements Runnable { @Autowired private TaskCallbackService taskCallbackService; - public void start() { + protected RetryReportTaskStatusThread() { + super("RetryReportTaskStatusThread"); + } + + @Override + public synchronized void start() { logger.info("Retry report task status thread starting"); - Thread thread = new Thread(this, "RetryReportTaskStatusThread"); - thread.setDaemon(true); - thread.start(); + super.start(); logger.info("Retry report task status thread started"); } @@ -59,7 +63,7 @@ public class RetryReportTaskStatusThread implements Runnable { */ @Override public void run() { - ResponseCache instance = ResponseCache.get(); + final ResponseCache instance = ResponseCache.get(); while (Stopper.isRunning()) { @@ -67,33 +71,57 @@ public class RetryReportTaskStatusThread implements Runnable { ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL); try { - if (!instance.getRunningCache().isEmpty()) { - Map runningCache = instance.getRunningCache(); - for (Map.Entry entry : runningCache.entrySet()) { - Integer taskInstanceId = entry.getKey(); - Command runningCommand = entry.getValue(); - taskCallbackService.send(taskInstanceId, runningCommand); - } + // todo: Only retry the send failed command + retryRunningCommand(instance); + retryResponseCommand(instance); + retryRecallCommand(instance); + } catch (Exception e) { + logger.warn("Retry report task status error", e); + } + } + } + + private void retryRunningCommand(ResponseCache instance) { + if (!instance.getRunningCache().isEmpty()) { + Map runningCache = instance.getRunningCache(); + for (Map.Entry entry : runningCache.entrySet()) { + Integer taskInstanceId = entry.getKey(); + Command runningCommand = entry.getValue(); + try { + taskCallbackService.send(taskInstanceId, runningCommand); + } catch (Exception ex) { + logger.error("Retry send running command to master error, taskInstanceId: {}, command: {}", taskInstanceId, runningCommand); } + } + } + } - if (!instance.getResponseCache().isEmpty()) { - Map responseCache = instance.getResponseCache(); - for (Map.Entry entry : responseCache.entrySet()) { - Integer taskInstanceId = entry.getKey(); - Command responseCommand = entry.getValue(); - taskCallbackService.send(taskInstanceId, responseCommand); - } + private void retryResponseCommand(ResponseCache instance) { + if (!instance.getResponseCache().isEmpty()) { + Map responseCache = instance.getResponseCache(); + for (Map.Entry entry : responseCache.entrySet()) { + Integer taskInstanceId = entry.getKey(); + Command responseCommand = entry.getValue(); + try { + taskCallbackService.send(taskInstanceId, responseCommand); + } catch (Exception ex) { + logger.error("Retry send response command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand); } - if (!instance.getRecallCache().isEmpty()) { - Map recallCache = instance.getRecallCache(); - for (Map.Entry entry : recallCache.entrySet()) { - Integer taskInstanceId = entry.getKey(); - Command responseCommand = entry.getValue(); - taskCallbackService.send(taskInstanceId, responseCommand); - } + } + } + } + + private void retryRecallCommand(ResponseCache instance) { + if (!instance.getRecallCache().isEmpty()) { + Map recallCache = instance.getRecallCache(); + for (Map.Entry entry : recallCache.entrySet()) { + Integer taskInstanceId = entry.getKey(); + Command responseCommand = entry.getValue(); + try { + taskCallbackService.send(taskInstanceId, responseCommand); + } catch (Exception ex) { + logger.error("Retry send recall command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand); } - } catch (Exception e) { - logger.warn("Retry report task status error", e); } } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 9349056507..7d49005395 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -132,9 +132,10 @@ public class TaskExecuteThread implements Runnable, Delayed { taskExecutionContext.setEndTime(new Date()); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); + logger.info("[WorkflowInstance-{}][TaskInstance-{}] Task dry run success", + taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); return; } - try { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); logger.info("script path : {}", taskExecutionContext.getExecutePath()); diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java index 13c533f1cf..2187a19281 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java @@ -88,7 +88,7 @@ public class TaskExecuteProcessorTest { command = new Command(); command.setType(CommandType.TASK_EXECUTE_REQUEST); ackCommand = new TaskExecuteRunningCommand().convert2Command(); - taskRequestCommand = new TaskExecuteRequestCommand(); + taskRequestCommand = new TaskExecuteRequestCommand(taskExecutionContext); alertClientService = PowerMockito.mock(AlertClientService.class); workerExecService = PowerMockito.mock(ExecutorService.class); PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class))) @@ -127,8 +127,6 @@ public class TaskExecuteProcessorTest { PowerMockito.mockStatic(JSONUtils.class); PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class)) .thenReturn(taskRequestCommand); - PowerMockito.when(JSONUtils.parseObject(taskRequestCommand.getTaskExecutionContext(), TaskExecutionContext.class)) - .thenReturn(taskExecutionContext); PowerMockito.mockStatic(FileUtils.class); PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),