Browse Source

[Bug] [Master] Worker failover will cause task cannot be failover (#10631)

* fix worker failover may lose event
3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
66624c5c86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 77
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
  2. 21
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
  3. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
  4. 7
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
  5. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  6. 49
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
  7. 73
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  8. 14
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  9. 27
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  10. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
  11. 21
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  12. 62
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java
  13. 104
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
  14. 2
      dolphinscheduler-master/src/main/resources/application.yaml
  15. 2
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java
  16. 3
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
  17. 2
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
  18. 21
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
  19. 4
      dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java
  20. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
  21. 42
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
  22. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
  23. 8
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  24. 23
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  25. 53
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  26. 409
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
  27. 23
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
  28. 25
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java
  29. 26
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  30. 3
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  31. 86
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
  32. 3
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  33. 4
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java

77
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java

@ -20,10 +20,12 @@ package org.apache.dolphinscheduler.common.enums;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import lombok.Data;
/** /**
* state event * state event
*/ */
@Data
public class StateEvent { public class StateEvent {
/** /**
@ -45,79 +47,4 @@ public class StateEvent {
private Channel channel; private Channel channel;
public ExecutionStatus getExecutionStatus() {
return executionStatus;
}
public void setExecutionStatus(ExecutionStatus executionStatus) {
this.executionStatus = executionStatus;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public long getTaskCode() {
return taskCode;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
public String getContext() {
return context;
}
public void setContext(String context) {
this.context = context;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public void setTaskCode(long taskCode) {
this.taskCode = taskCode;
}
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
@Override
public String toString() {
return "State Event :"
+ "key: " + key
+ " type: " + type
+ " executeStatus: " + executionStatus
+ " task instance id: " + taskInstanceId
+ " process instance id: " + processInstanceId
+ " context: " + context
;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public void setType(StateEventType type) {
this.type = type;
}
public StateEventType getType() {
return this.type;
}
} }

21
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java

@ -19,14 +19,14 @@ package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.shell.ShellExecutor; import org.apache.dolphinscheduler.common.shell.ShellExecutor;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang.SystemUtils; import org.apache.commons.lang.SystemUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.File;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean; import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean; import java.lang.management.RuntimeMXBean;
@ -460,24 +460,23 @@ public class OSUtils {
} }
/** /**
* check memory and cpu usage * Check memory and cpu usage is overload the given thredshod.
* *
* @param maxCpuloadAvg maxCpuloadAvg * @param maxCpuLoadAvg maxCpuLoadAvg
* @param reservedMemory reservedMemory * @param reservedMemory reservedMemory
* @return check memory and cpu usage * @return True, if the cpu or memory exceed the given thredshod.
*/ */
public static Boolean checkResource(double maxCpuloadAvg, double reservedMemory) { public static Boolean isOverload(double maxCpuLoadAvg, double reservedMemory) {
// system load average // system load average
double loadAverage = loadAverage(); double loadAverage = loadAverage();
// system available physical memory // system available physical memory
double availablePhysicalMemorySize = availablePhysicalMemorySize(); double availablePhysicalMemorySize = availablePhysicalMemorySize();
if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) { if (loadAverage > maxCpuLoadAvg || availablePhysicalMemorySize < reservedMemory) {
logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G", logger.warn("Current cpu load average {} is too high or available memory {}G is too low, under max.cpuLoad.avg={} and reserved.memory={}G",
loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory); loadAverage, availablePhysicalMemorySize, maxCpuLoadAvg, reservedMemory);
return false;
} else {
return true; return true;
} }
return false;
} }
} }

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java vendored

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master.cache; package org.apache.dolphinscheduler.server.master.cache;
import lombok.NonNull;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import java.util.Collection; import java.util.Collection;
@ -55,7 +57,7 @@ public interface ProcessInstanceExecCacheManager {
* @param processInstanceId processInstanceId * @param processInstanceId processInstanceId
* @param workflowExecuteThread if it is null, will not be cached * @param workflowExecuteThread if it is null, will not be cached
*/ */
void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread); void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread);
/** /**
* get all WorkflowExecuteThread from cache * get all WorkflowExecuteThread from cache

7
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java vendored

@ -30,6 +30,8 @@ import org.springframework.stereotype.Component;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import lombok.NonNull;
/** /**
* cache of process instance id and WorkflowExecuteThread * cache of process instance id and WorkflowExecuteThread
*/ */
@ -59,10 +61,7 @@ public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecC
} }
@Override @Override
public void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread) { public void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread) {
if (workflowExecuteThread == null) {
return;
}
processInstanceExecMaps.put(processInstanceId, workflowExecuteThread); processInstanceExecMaps.put(processInstanceId, workflowExecuteThread);
} }

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -131,9 +131,8 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
for (TaskPriority dispatchFailedTask : failedDispatchTasks) { for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask); taskPriorityQueue.put(dispatchFailedTask);
} }
// If there are tasks in a cycle that cannot find the worker group, // If the all task dispatch failed, will sleep for 1s to avoid the master cpu higher.
// sleep for 1 second if (fetchTaskNum == failedDispatchTasks.size()) {
if (taskPriorityQueue.size() <= failedDispatchTasks.size()) {
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS); TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
} }
} }
@ -218,8 +217,7 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
} }
private Command toCommand(TaskExecutionContext taskExecutionContext) { private Command toCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(); TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(taskExecutionContext);
requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(taskExecutionContext));
return requestCommand.convert2Command(); return requestCommand.convert2Command();
} }

49
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java

@ -17,11 +17,13 @@
package org.apache.dolphinscheduler.server.master.metrics; package org.apache.dolphinscheduler.server.master.metrics;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier; import java.util.function.Supplier;
import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
public final class ProcessInstanceMetrics { public final class ProcessInstanceMetrics {
@ -29,15 +31,25 @@ public final class ProcessInstanceMetrics {
throw new UnsupportedOperationException("Utility class"); throw new UnsupportedOperationException("Utility class");
} }
private static final Timer COMMAND_QUERY_TIMETER =
Timer.builder("ds.workflow.command.query.duration")
.description("Command query duration")
.register(Metrics.globalRegistry);
private static final Timer PROCESS_INSTANCE_GENERATE_TIMER =
Timer.builder("ds.workflow.instance.generate.duration")
.description("Process instance generated duration")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_SUBMIT_COUNTER = private static final Counter PROCESS_INSTANCE_SUBMIT_COUNTER =
Counter.builder("ds.workflow.instance.submit.count") Counter.builder("ds.workflow.instance.submit.count")
.description("Process instance submit total count") .description("Process instance submit total count")
.register(Metrics.globalRegistry); .register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_TIMEOUT_COUNTER = private static final Counter PROCESS_INSTANCE_TIMEOUT_COUNTER =
Counter.builder("ds.workflow.instance.timeout.count") Counter.builder("ds.workflow.instance.timeout.count")
.description("Process instance timeout total count") .description("Process instance timeout total count")
.register(Metrics.globalRegistry); .register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_FINISH_COUNTER = private static final Counter PROCESS_INSTANCE_FINISH_COUNTER =
Counter.builder("ds.workflow.instance.finish.count") Counter.builder("ds.workflow.instance.finish.count")
@ -55,19 +67,27 @@ public final class ProcessInstanceMetrics {
.register(Metrics.globalRegistry); .register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_STOP_COUNTER = private static final Counter PROCESS_INSTANCE_STOP_COUNTER =
Counter.builder("ds.workflow.instance.stop.count") Counter.builder("ds.workflow.instance.stop.count")
.description("Process instance stop total count") .description("Process instance stop total count")
.register(Metrics.globalRegistry); .register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_FAILOVER_COUNTER = private static final Counter PROCESS_INSTANCE_FAILOVER_COUNTER =
Counter.builder("ds.workflow.instance.failover.count") Counter.builder("ds.workflow.instance.failover.count")
.description("Process instance failover total count") .description("Process instance failover total count")
.register(Metrics.globalRegistry); .register(Metrics.globalRegistry);
public static void recordCommandQueryTime(long milliseconds) {
COMMAND_QUERY_TIMETER.record(milliseconds, TimeUnit.MILLISECONDS);
}
public static void recordProcessInstanceGenerateTime(long milliseconds) {
PROCESS_INSTANCE_GENERATE_TIMER.record(milliseconds, TimeUnit.MILLISECONDS);
}
public static synchronized void registerProcessInstanceRunningGauge(Supplier<Number> function) { public static synchronized void registerProcessInstanceRunningGauge(Supplier<Number> function) {
Gauge.builder("ds.workflow.instance.running", function) Gauge.builder("ds.workflow.instance.running", function)
.description("The current running process instance count") .description("The current running process instance count")
.register(Metrics.globalRegistry); .register(Metrics.globalRegistry);
} }
public static void incProcessInstanceSubmit() { public static void incProcessInstanceSubmit() {
@ -97,5 +117,4 @@ public final class ProcessInstanceMetrics {
public static void incProcessInstanceFailover() { public static void incProcessInstanceFailover() {
PROCESS_INSTANCE_FAILOVER_COUNTER.increment(); PROCESS_INSTANCE_FAILOVER_COUNTER.increment();
} }
} }

73
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics; import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -62,31 +63,19 @@ public class MasterSchedulerService extends BaseDaemonThread {
*/ */
private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class); private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class);
/**
* dolphinscheduler database interface
*/
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
/**
* master config
*/
@Autowired @Autowired
private MasterConfig masterConfig; private MasterConfig masterConfig;
/**
* alert manager
*/
@Autowired @Autowired
private ProcessAlertManager processAlertManager; private ProcessAlertManager processAlertManager;
/**
* netty remoting client
*/
private NettyRemotingClient nettyRemotingClient; private NettyRemotingClient nettyRemotingClient;
@Autowired @Autowired
NettyExecutorManager nettyExecutorManager; private NettyExecutorManager nettyExecutorManager;
/** /**
* master prepare exec service * master prepare exec service
@ -108,6 +97,8 @@ public class MasterSchedulerService extends BaseDaemonThread {
@Autowired @Autowired
private CuringGlobalParamsService curingGlobalParamsService; private CuringGlobalParamsService curingGlobalParamsService;
private String masterAddress;
protected MasterSchedulerService() { protected MasterSchedulerService() {
super("MasterCommandLoopThread"); super("MasterCommandLoopThread");
} }
@ -119,6 +110,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads()); this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig(); NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig); this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
} }
@Override @Override
@ -142,13 +134,13 @@ public class MasterSchedulerService extends BaseDaemonThread {
public void run() { public void run() {
while (Stopper.isRunning()) { while (Stopper.isRunning()) {
try { try {
boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory()); boolean isOverload = OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
if (!runCheckFlag) { if (isOverload) {
MasterServerMetrics.incMasterOverload(); MasterServerMetrics.incMasterOverload();
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue; continue;
} }
scheduleProcess(); scheduleWorkflow();
} catch (InterruptedException interruptedException) { } catch (InterruptedException interruptedException) {
logger.warn("Master schedule service interrupted, close the loop", interruptedException); logger.warn("Master schedule service interrupted, close the loop", interruptedException);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -160,13 +152,12 @@ public class MasterSchedulerService extends BaseDaemonThread {
} }
/** /**
* 1. get command by slot * Query command from database by slot, and transform to workflow instance, then submit to workflowExecuteThreadPool.
* 2. donot handle command if slot is empty
*/ */
private void scheduleProcess() throws InterruptedException { private void scheduleWorkflow() throws InterruptedException {
List<Command> commands = findCommands(); List<Command> commands = findCommands();
if (CollectionUtils.isEmpty(commands)) { if (CollectionUtils.isEmpty(commands)) {
//indicate that no command ,sleep for 1s // indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
return; return;
} }
@ -181,7 +172,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
try { try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
logger.info("Master schedule service starting workflow instance"); logger.info("Master schedule service starting workflow instance");
WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable( final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
processInstance processInstance
, processService , processService
, nettyExecutorManager , nettyExecutorManager
@ -194,9 +185,14 @@ public class MasterSchedulerService extends BaseDaemonThread {
if (processInstance.getTimeout() > 0) { if (processInstance.getTimeout() > 0) {
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
} }
workflowExecuteThreadPool.startWorkflow(workflowExecuteRunnable); ProcessInstanceMetrics.incProcessInstanceSubmit();
workflowExecuteThreadPool.submit(workflowExecuteRunnable);
logger.info("Master schedule service started workflow instance"); logger.info("Master schedule service started workflow instance");
} catch (Exception ex) {
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
} finally { } finally {
LoggerUtils.removeWorkflowInstanceIdMDC(); LoggerUtils.removeWorkflowInstanceIdMDC();
} }
@ -204,21 +200,21 @@ public class MasterSchedulerService extends BaseDaemonThread {
} }
private List<ProcessInstance> command2ProcessInstance(List<Command> commands) throws InterruptedException { private List<ProcessInstance> command2ProcessInstance(List<Command> commands) throws InterruptedException {
long commandTransformStartTime = System.currentTimeMillis();
logger.info("Master schedule service transforming command to ProcessInstance, commandSize: {}", commands.size()); logger.info("Master schedule service transforming command to ProcessInstance, commandSize: {}", commands.size());
List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size())); List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
CountDownLatch latch = new CountDownLatch(commands.size()); CountDownLatch latch = new CountDownLatch(commands.size());
for (final Command command : commands) { for (final Command command : commands) {
masterPrepareExecService.execute(() -> { masterPrepareExecService.execute(() -> {
try { try {
// todo: this check is not safe, the slot may change after command transform.
// slot check again // slot check again
SlotCheckState slotCheckState = slotCheck(command); SlotCheckState slotCheckState = slotCheck(command);
if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) { if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) {
logger.info("Master handle command {} skip, slot check state: {}", command.getId(), slotCheckState); logger.info("Master handle command {} skip, slot check state: {}", command.getId(), slotCheckState);
return; return;
} }
ProcessInstance processInstance = processService.handleCommand(logger, ProcessInstance processInstance = processService.handleCommand(masterAddress, command);
getLocalAddress(),
command);
if (processInstance != null) { if (processInstance != null) {
processInstances.add(processInstance); processInstances.add(processInstance);
logger.info("Master handle command {} end, create process instance {}", command.getId(), processInstance.getId()); logger.info("Master handle command {} end, create process instance {}", command.getId(), processInstance.getId());
@ -236,24 +232,26 @@ public class MasterSchedulerService extends BaseDaemonThread {
latch.await(); latch.await();
logger.info("Master schedule service transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}", logger.info("Master schedule service transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}",
commands.size(), processInstances.size()); commands.size(), processInstances.size());
ProcessInstanceMetrics.recordProcessInstanceGenerateTime(System.currentTimeMillis() - commandTransformStartTime);
return processInstances; return processInstances;
} }
private List<Command> findCommands() { private List<Command> findCommands() {
long scheduleStartTime = System.currentTimeMillis();
int thisMasterSlot = ServerNodeManager.getSlot();
int masterCount = ServerNodeManager.getMasterSize();
if (masterCount <= 0) {
logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot);
return Collections.emptyList();
}
int pageNumber = 0; int pageNumber = 0;
int pageSize = masterConfig.getFetchCommandNum(); int pageSize = masterConfig.getFetchCommandNum();
List<Command> result = new ArrayList<>(); final List<Command> result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
if (Stopper.isRunning()) { if (CollectionUtils.isNotEmpty(result)) {
int thisMasterSlot = ServerNodeManager.getSlot(); logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}",
int masterCount = ServerNodeManager.getMasterSize(); result.size(), thisMasterSlot, masterCount);
if (masterCount > 0) {
result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
if (CollectionUtils.isNotEmpty(result)) {
logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}",
result.size(), thisMasterSlot, masterCount);
}
}
} }
ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime);
return result; return result;
} }
@ -271,7 +269,4 @@ public class MasterSchedulerService extends BaseDaemonThread {
return state; return state;
} }
private String getLocalAddress() {
return NetUtils.getAddr(masterConfig.getListenPort());
}
} }

14
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -59,24 +59,24 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class); private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
/** /**
* process timeout check list * ProcessInstance timeout check list, element is the processInstanceId.
*/ */
private ConcurrentLinkedQueue<Integer> processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<Integer> processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
/** /**
* task time out check list * task time out check list
*/ */
private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
/** /**
* task retry check list * task retry check list
*/ */
private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>();
/** /**
* task state check list * task state check list
*/ */
private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceStateCheckList = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceStateCheckList = new ConcurrentLinkedQueue<>();
@Autowired @Autowired
private MasterConfig masterConfig; private MasterConfig masterConfig;
@ -116,8 +116,8 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
logger.info("Success add workflow instance into timeout check list"); logger.info("Success add workflow instance into timeout check list");
} }
public void removeProcess4TimeoutCheck(ProcessInstance processInstance) { public void removeProcess4TimeoutCheck(int processInstanceId) {
boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstance.getId()); boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstanceId);
if (removeFlag) { if (removeFlag) {
logger.info("Success remove workflow instance from timeout check list"); logger.info("Success remove workflow instance from timeout check list");
} else { } else {

27
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -435,6 +435,7 @@ public class WorkflowExecuteRunnable implements Runnable {
if (task.getState().typeIsFinished()) { if (task.getState().typeIsFinished()) {
if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) { if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) {
logger.warn("The task instance is already complete, stateEvent: {}", stateEvent);
return true; return true;
} }
taskFinished(task); taskFinished(task);
@ -461,11 +462,9 @@ public class WorkflowExecuteRunnable implements Runnable {
} }
private void taskFinished(TaskInstance taskInstance) { private void taskFinished(TaskInstance taskInstance) {
logger.info("work flow {} task id:{} code:{} state:{} ", logger.info("TaskInstance finished task code:{} state:{} ",
processInstance.getId(), taskInstance.getTaskCode(),
taskInstance.getId(), taskInstance.getState());
taskInstance.getTaskCode(),
taskInstance.getState());
activeTaskProcessorMaps.remove(taskInstance.getTaskCode()); activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance); stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
@ -481,12 +480,13 @@ public class WorkflowExecuteRunnable implements Runnable {
} }
} else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) { } else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
// retry task // retry task
logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
retryTaskInstance(taskInstance); retryTaskInstance(taskInstance);
} else if (taskInstance.getState().typeIsFailure()) { } else if (taskInstance.getState().typeIsFailure()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// There are child nodes and the failure policy is: CONTINUE // There are child nodes and the failure policy is: CONTINUE
if (DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag) if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE
&& processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) { && DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) {
submitPostNode(Long.toString(taskInstance.getTaskCode())); submitPostNode(Long.toString(taskInstance.getTaskCode()));
} else { } else {
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
@ -495,6 +495,7 @@ public class WorkflowExecuteRunnable implements Runnable {
} }
} }
} else if (taskInstance.getState().typeIsFinished()) { } else if (taskInstance.getState().typeIsFinished()) {
// todo: when the task instance type is pause, then it should not in completeTaskMap
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
} }
@ -536,7 +537,7 @@ public class WorkflowExecuteRunnable implements Runnable {
} }
TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance); TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
if (newTaskInstance == null) { if (newTaskInstance == null) {
logger.error("retry fail, new taskInstancce is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId()); logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId());
return; return;
} }
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance); waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
@ -634,7 +635,7 @@ public class WorkflowExecuteRunnable implements Runnable {
* check if task instance exist by task code * check if task instance exist by task code
*/ */
public boolean checkTaskInstanceByCode(long taskCode) { public boolean checkTaskInstanceByCode(long taskCode) {
if (taskInstanceMap == null || taskInstanceMap.size() == 0) { if (taskInstanceMap.isEmpty()) {
return false; return false;
} }
for (TaskInstance taskInstance : taskInstanceMap.values()) { for (TaskInstance taskInstance : taskInstanceMap.values()) {
@ -649,7 +650,7 @@ public class WorkflowExecuteRunnable implements Runnable {
* check if task instance exist by id * check if task instance exist by id
*/ */
public boolean checkTaskInstanceById(int taskInstanceId) { public boolean checkTaskInstanceById(int taskInstanceId) {
if (taskInstanceMap == null || taskInstanceMap.size() == 0) { if (taskInstanceMap.isEmpty()) {
return false; return false;
} }
return taskInstanceMap.containsKey(taskInstanceId); return taskInstanceMap.containsKey(taskInstanceId);
@ -697,7 +698,7 @@ public class WorkflowExecuteRunnable implements Runnable {
if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) { if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
// serial wait execution type needs to wake up the waiting process // serial wait execution type needs to wake up the waiting process
if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()){ if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()) {
endProcess(); endProcess();
return true; return true;
} }
@ -1296,6 +1297,10 @@ public class WorkflowExecuteRunnable implements Runnable {
} }
} }
public Collection<TaskInstance> getAllTaskInstances() {
return taskInstanceMap.values();
}
private void setVarPoolValue(Map<String, Property> allProperty, Map<String, TaskInstance> allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) { private void setVarPoolValue(Map<String, Property> allProperty, Map<String, TaskInstance> allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) {
//for this taskInstance all the param in this part is IN. //for this taskInstance all the param in this part is IN.
thisProperty.setDirect(Direct.IN); thisProperty.setDirect(Direct.IN);

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -51,7 +51,7 @@ import com.google.common.base.Strings;
import lombok.NonNull; import lombok.NonNull;
/** /**
* Used to execute {@link WorkflowExecuteRunnable}, when * Used to execute {@link WorkflowExecuteRunnable}.
*/ */
@Component @Component
public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@ -99,14 +99,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
logger.info("Submit state event success, stateEvent: {}", stateEvent); logger.info("Submit state event success, stateEvent: {}", stateEvent);
} }
/**
* Start the given workflow.
*/
public void startWorkflow(WorkflowExecuteRunnable workflowExecuteThread) {
ProcessInstanceMetrics.incProcessInstanceSubmit();
submit(workflowExecuteThread);
}
/** /**
* Handle the events belong to the given workflow. * Handle the events belong to the given workflow.
*/ */
@ -138,7 +130,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
try { try {
LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId()); LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId());
if (workflowExecuteThread.workFlowFinish()) { if (workflowExecuteThread.workFlowFinish()) {
stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance()); stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance().getId());
processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId); processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
notifyProcessChanged(workflowExecuteThread.getProcessInstance()); notifyProcessChanged(workflowExecuteThread.getProcessInstance());
logger.info("Workflow instance is finished."); logger.info("Workflow instance is finished.");

21
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -96,6 +96,8 @@ import org.slf4j.LoggerFactory;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import lombok.NonNull;
public abstract class BaseTaskProcessor implements ITaskProcessor { public abstract class BaseTaskProcessor implements ITaskProcessor {
protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass())); protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
@ -114,22 +116,19 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected int commitInterval; protected int commitInterval;
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); protected ProcessService processService;
protected MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class); protected MasterConfig masterConfig;
protected TaskPluginManager taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class); protected TaskPluginManager taskPluginManager;
protected String threadLoggerInfoName; protected String threadLoggerInfoName;
@Override @Override
public void init(TaskInstance taskInstance, ProcessInstance processInstance) { public void init(@NonNull TaskInstance taskInstance, @NonNull ProcessInstance processInstance) {
if (processService == null) { processService = SpringApplicationContext.getBean(ProcessService.class);
processService = SpringApplicationContext.getBean(ProcessService.class); masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
} taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
if (masterConfig == null) {
masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
}
this.taskInstance = taskInstance; this.taskInstance = taskInstance;
this.processInstance = processInstance; this.processInstance = processInstance;
this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes(); this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes();
@ -245,7 +244,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
@Override @Override
public String getType() { public String getType() {
return null; throw new UnsupportedOperationException("This abstract class doesn's has type");
} }
@Override @Override

62
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskInstanceKey.java

@ -20,64 +20,22 @@ package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.Objects; import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NonNull;
/** /**
* task instance key, processInstanceId * Used to identify a task instance.
*/ */
@Data
@AllArgsConstructor
public class TaskInstanceKey { public class TaskInstanceKey {
private int processInstanceId; private final int processInstanceId;
private long taskCode; private final long taskCode;
private int taskVersion; private final int taskVersion;
public TaskInstanceKey(int processInstanceId, long taskCode, int taskVersion) { public static TaskInstanceKey getTaskInstanceKey(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
this.processInstanceId = processInstanceId;
this.taskCode = taskCode;
this.taskVersion = taskVersion;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public long getTaskCode() {
return taskCode;
}
public int getTaskVersion() {
return taskVersion;
}
public static TaskInstanceKey getTaskInstanceKey(ProcessInstance processInstance, TaskInstance taskInstance) {
if (processInstance == null || taskInstance == null) {
return null;
}
return new TaskInstanceKey(processInstance.getId(), taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()); return new TaskInstanceKey(processInstance.getId(), taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
} }
@Override
public String toString() {
return "TaskKey{"
+ "processInstanceId=" + processInstanceId
+ ", taskCode=" + taskCode
+ ", taskVersion=" + taskVersion
+ '}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskInstanceKey taskInstanceKey = (TaskInstanceKey) o;
return processInstanceId == taskInstanceKey.processInstanceId && taskCode == taskInstanceKey.taskCode && taskVersion == taskInstanceKey.taskVersion;
}
@Override
public int hashCode() {
return Objects.hash(processInstanceId, taskCode, taskVersion);
}
} }

104
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java

@ -17,19 +17,19 @@
package org.apache.dolphinscheduler.server.master.service; package org.apache.dolphinscheduler.server.master.service;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
@ -49,6 +49,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -56,6 +57,7 @@ import org.springframework.stereotype.Component;
import io.micrometer.core.annotation.Counted; import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed; import io.micrometer.core.annotation.Timed;
import lombok.NonNull;
/** /**
* failover service * failover service
@ -67,15 +69,20 @@ public class FailoverService {
private final MasterConfig masterConfig; private final MasterConfig masterConfig;
private final ProcessService processService; private final ProcessService processService;
private final WorkflowExecuteThreadPool workflowExecuteThreadPool; private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
private final ProcessInstanceExecCacheManager cacheManager;
public FailoverService(RegistryClient registryClient, private final String localAddress;
MasterConfig masterConfig,
ProcessService processService, public FailoverService(@NonNull RegistryClient registryClient,
WorkflowExecuteThreadPool workflowExecuteThreadPool) { @NonNull MasterConfig masterConfig,
this.registryClient = checkNotNull(registryClient); @NonNull ProcessService processService,
this.masterConfig = checkNotNull(masterConfig); @NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
this.processService = checkNotNull(processService); @NonNull ProcessInstanceExecCacheManager cacheManager) {
this.workflowExecuteThreadPool = checkNotNull(workflowExecuteThreadPool); this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
this.workflowExecuteThreadPool = workflowExecuteThreadPool;
this.cacheManager = cacheManager;
this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
} }
/** /**
@ -88,7 +95,7 @@ public class FailoverService {
if (CollectionUtils.isEmpty(hosts)) { if (CollectionUtils.isEmpty(hosts)) {
return; return;
} }
LOGGER.info("Master failover service {} begin to failover hosts:{}", getLocalAddress(), hosts); LOGGER.info("Master failover service {} begin to failover hosts:{}", localAddress, hosts);
for (String host : hosts) { for (String host : hosts) {
failoverMasterWithLock(host); failoverMasterWithLock(host);
@ -174,11 +181,10 @@ public class FailoverService {
} }
/** /**
* failover worker tasks * Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP tasks belong the given worker,
* and failover these tasks.
* <p> * <p>
* 1. kill yarn job if there are yarn jobs in tasks. * Note: When we do worker failover, the master will only failover the processInstance belongs to the current master.
* 2. change task state from running to need failover.
* 3. failover all tasks when workerHost is null
* *
* @param workerHost worker host * @param workerHost worker host
*/ */
@ -188,29 +194,40 @@ public class FailoverService {
} }
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); // we query the task instance from cache, so that we can directly update the cache
Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>(); final List<TaskInstance> needFailoverTaskInstanceList = cacheManager.getAll()
.stream()
.flatMap(workflowExecuteRunnable -> workflowExecuteRunnable.getAllTaskInstances().stream())
.filter(taskInstance ->
workerHost.equals(taskInstance.getHost()) && ExecutionStatus.isNeedFailoverWorkflowInstanceState(taskInstance.getState()))
.collect(Collectors.toList());
final Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
LOGGER.info("start worker[{}] failover, task list size:{}", workerHost, needFailoverTaskInstanceList.size()); LOGGER.info("start worker[{}] failover, task list size:{}", workerHost, needFailoverTaskInstanceList.size());
List<Server> workerServers = registryClient.getServerList(NodeType.WORKER); final List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
for (TaskInstance taskInstance : needFailoverTaskInstanceList) { for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId()); LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
if (processInstance == null) { try {
processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
if (processInstance == null) { if (processInstance == null) {
LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null", processInstance = cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId()).getProcessInstance();
taskInstance.getProcessInstanceId(), taskInstance.getId()); if (processInstance == null) {
LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null",
taskInstance.getProcessInstanceId(), taskInstance.getId());
continue;
}
processInstanceCacheMap.put(processInstance.getId(), processInstance);
}
// only failover the task owned myself if worker down.
if (!StringUtils.equalsIgnoreCase(processInstance.getHost(), localAddress)) {
continue; continue;
} }
processInstanceCacheMap.put(processInstance.getId(), processInstance);
}
// only failover the task owned myself if worker down. LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
if (!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) { failoverTaskInstance(processInstance, taskInstance, workerServers);
continue; } finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
} }
LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance, workerServers);
} }
LOGGER.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime); LOGGER.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime);
} }
@ -221,17 +238,14 @@ public class FailoverService {
* 1. kill yarn job if run on worker and there are yarn jobs in tasks. * 1. kill yarn job if run on worker and there are yarn jobs in tasks.
* 2. change task state from running to need failover. * 2. change task state from running to need failover.
* 3. try to notify local master * 3. try to notify local master
*
* @param processInstance * @param processInstance
* @param taskInstance * @param taskInstance
* @param servers if failover master, servers container master servers and worker servers; if failover worker, servers contain worker servers. * @param servers if failover master, servers container master servers and worker servers; if failover worker, servers contain worker servers.
*/ */
private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance, List<Server> servers) { private void failoverTaskInstance(@NonNull ProcessInstance processInstance, TaskInstance taskInstance, List<Server> servers) {
if (processInstance == null) {
LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null",
taskInstance.getProcessInstanceId(), taskInstance.getId());
return;
}
if (!checkTaskInstanceNeedFailover(servers, taskInstance)) { if (!checkTaskInstanceNeedFailover(servers, taskInstance)) {
LOGGER.info("The taskInstance doesn't need to failover");
return; return;
} }
TaskMetrics.incTaskFailover(); TaskMetrics.incTaskFailover();
@ -240,6 +254,7 @@ public class FailoverService {
taskInstance.setProcessInstance(processInstance); taskInstance.setProcessInstance(processInstance);
if (!isMasterTask) { if (!isMasterTask) {
LOGGER.info("The failover taskInstance is not master task");
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance) .buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance) .buildProcessInstanceRelatedInfo(processInstance)
@ -249,6 +264,8 @@ public class FailoverService {
// only kill yarn job if exists , the local thread has exited // only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext); ProcessUtils.killYarnJob(taskExecutionContext);
} }
} else {
LOGGER.info("The failover taskInstance is a master task");
} }
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
@ -278,7 +295,7 @@ public class FailoverService {
while (iterator.hasNext()) { while (iterator.hasNext()) {
String host = iterator.next(); String host = iterator.next();
if (registryClient.checkNodeExists(host, NodeType.MASTER)) { if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
if (!getLocalAddress().equals(host)) { if (!localAddress.equals(host)) {
iterator.remove(); iterator.remove();
} }
} }
@ -390,11 +407,8 @@ public class FailoverService {
return serverStartupTime; return serverStartupTime;
} }
/** public String getLocalAddress() {
* get local address return localAddress;
*/
String getLocalAddress() {
return NetUtils.getAddr(masterConfig.getListenPort());
} }
} }

2
dolphinscheduler-master/src/main/resources/application.yaml

@ -92,7 +92,7 @@ master:
pre-exec-threads: 10 pre-exec-threads: 10
# master execute thread number to limit process instances in parallel # master execute thread number to limit process instances in parallel
exec-threads: 100 exec-threads: 100
# master dispatch task number per batch # master dispatch task number per batch, if all the tasks dispatch failed in a batch, will sleep 1s.
dispatch-task-number: 3 dispatch-task-number: 3
# master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
host-selector: lower_weight host-selector: lower_weight

2
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java vendored

@ -56,7 +56,7 @@ public class ProcessInstanceExecCacheManagerImplTest {
Assert.assertTrue(processInstanceExecCacheManager.contains(1)); Assert.assertTrue(processInstanceExecCacheManager.contains(1));
} }
@Test @Test(expected = NullPointerException.class)
public void testCacheNull() { public void testCacheNull() {
processInstanceExecCacheManager.cache(2, null); processInstanceExecCacheManager.cache(2, null);
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(2); WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(2);

3
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java

@ -49,8 +49,7 @@ public class ExecutionContextTestUtils {
.buildProcessDefinitionRelatedInfo(processDefinition) .buildProcessDefinitionRelatedInfo(processDefinition)
.create(); .create();
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(); TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(context);
requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(context));
Command command = requestCommand.convert2Command(); Command command = requestCommand.convert2Command();
ExecutionContext executionContext = new ExecutionContext(command, ExecutorType.WORKER); ExecutionContext executionContext = new ExecutionContext(command, ExecutorType.WORKER);

2
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java

@ -97,7 +97,7 @@ public class NettyExecutorManagerTest {
private Command toCommand(TaskExecutionContext taskExecutionContext) { private Command toCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(); TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(taskExecutionContext)); requestCommand.setTaskExecutionContext(taskExecutionContext);
return requestCommand.convert2Command(); return requestCommand.convert2Command();
} }
} }

21
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java

@ -18,8 +18,8 @@
package org.apache.dolphinscheduler.server.master.service; package org.apache.dolphinscheduler.server.master.service;
import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE; import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
@ -31,7 +31,9 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -45,7 +47,6 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PowerMockIgnore;
@ -63,7 +64,6 @@ import com.google.common.collect.Lists;
@PrepareForTest({RegistryClient.class}) @PrepareForTest({RegistryClient.class})
@PowerMockIgnore({"javax.management.*"}) @PowerMockIgnore({"javax.management.*"})
public class FailoverServiceTest { public class FailoverServiceTest {
@InjectMocks
private FailoverService failoverService; private FailoverService failoverService;
@Mock @Mock
@ -78,6 +78,9 @@ public class FailoverServiceTest {
@Mock @Mock
private WorkflowExecuteThreadPool workflowExecuteThreadPool; private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@Mock
private ProcessInstanceExecCacheManager cacheManager;
private static int masterPort = 5678; private static int masterPort = 5678;
private static int workerPort = 1234; private static int workerPort = 1234;
@ -95,6 +98,7 @@ public class FailoverServiceTest {
springApplicationContext.setApplicationContext(applicationContext); springApplicationContext.setApplicationContext(applicationContext);
given(masterConfig.getListenPort()).willReturn(masterPort); given(masterConfig.getListenPort()).willReturn(masterPort);
failoverService = new FailoverService(registryClient, masterConfig, processService, workflowExecuteThreadPool, cacheManager);
testMasterHost = failoverService.getLocalAddress(); testMasterHost = failoverService.getLocalAddress();
String ip = testMasterHost.split(":")[0]; String ip = testMasterHost.split(":")[0];
@ -182,7 +186,16 @@ public class FailoverServiceTest {
@Test @Test
public void failoverWorkTest() { public void failoverWorkTest() {
workerTaskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
WorkflowExecuteRunnable workflowExecuteRunnable = Mockito.mock(WorkflowExecuteRunnable.class);
Mockito.when(workflowExecuteRunnable.getAllTaskInstances()).thenReturn(Lists.newArrayList(workerTaskInstance));
Mockito.when(workflowExecuteRunnable.getProcessInstance()).thenReturn(processInstance);
Mockito.when(cacheManager.getAll()).thenReturn(Lists.newArrayList(workflowExecuteRunnable));
Mockito.when(cacheManager.getByProcessInstanceId(Mockito.anyInt())).thenReturn(workflowExecuteRunnable);
failoverService.failoverServerWhenDown(testWorkerHost, NodeType.WORKER); failoverService.failoverServerWhenDown(testWorkerHost, NodeType.WORKER);
Assert.assertEquals(workerTaskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE); Assert.assertEquals(ExecutionStatus.NEED_FAULT_TOLERANCE, workerTaskInstance.getState());
} }
} }

4
dolphinscheduler-meter/src/main/java/org/apache/dolphinscheduler/meter/MeterConfiguration.java

@ -20,10 +20,6 @@
package org.apache.dolphinscheduler.meter; package org.apache.dolphinscheduler.meter;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.command; package org.apache.dolphinscheduler.remote.command;
import java.io.Serializable; import java.io.Serializable;
@ -25,6 +26,8 @@ import java.util.concurrent.atomic.AtomicLong;
*/ */
public class Command implements Serializable { public class Command implements Serializable {
private static final long serialVersionUID = -1L;
private static final AtomicLong REQUEST_ID = new AtomicLong(1); private static final AtomicLong REQUEST_ID = new AtomicLong(1);
public static final byte MAGIC = (byte) 0xbabe; public static final byte MAGIC = (byte) 0xbabe;

42
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java

@ -18,39 +18,23 @@
package org.apache.dolphinscheduler.remote.command; package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.io.Serializable; import java.io.Serializable;
/** import lombok.AllArgsConstructor;
* execute task request command import lombok.Data;
*/ import lombok.NoArgsConstructor;
public class TaskExecuteRequestCommand implements Serializable {
/**
* task execution context
*/
private String taskExecutionContext;
public String getTaskExecutionContext() {
return taskExecutionContext;
}
public void setTaskExecutionContext(String taskExecutionContext) { @Data
this.taskExecutionContext = taskExecutionContext; @NoArgsConstructor
} @AllArgsConstructor
public class TaskExecuteRequestCommand implements Serializable {
public TaskExecuteRequestCommand() { private static final long serialVersionUID = -1L;
}
public TaskExecuteRequestCommand(String taskExecutionContext) { private TaskExecutionContext taskExecutionContext;
this.taskExecutionContext = taskExecutionContext;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command() { public Command convert2Command() {
Command command = new Command(); Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_REQUEST); command.setType(CommandType.TASK_EXECUTE_REQUEST);
@ -59,10 +43,4 @@ public class TaskExecuteRequestCommand implements Serializable {
return command; return command;
} }
@Override
public String toString() {
return "TaskExecuteRequestCommand{"
+ "taskExecutionContext='" + taskExecutionContext + '\''
+ '}';
}
} }

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils; import org.apache.commons.lang3.SystemUtils;
import java.io.File; import java.io.File;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
@ -43,6 +44,8 @@ import java.util.regex.Pattern;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import lombok.NonNull;
/** /**
* mainly used to get the start command line of a process. * mainly used to get the start command line of a process.
*/ */
@ -181,7 +184,10 @@ public class ProcessUtils {
* @param taskExecutionContext taskExecutionContext * @param taskExecutionContext taskExecutionContext
* @return yarn application ids * @return yarn application ids
*/ */
public static List<String> killYarnJob(TaskExecutionContext taskExecutionContext) { public static List<String> killYarnJob(@NonNull TaskExecutionContext taskExecutionContext) {
if (taskExecutionContext.getLogPath() == null) {
return Collections.emptyList();
}
try { try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
String log; String log;

8
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -52,16 +52,16 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval; import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.slf4j.Logger;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.springframework.transaction.annotation.Transactional;
public interface ProcessService { public interface ProcessService {
@Transactional @Transactional
ProcessInstance handleCommand(Logger logger, String host, Command command); ProcessInstance handleCommand(String host, Command command);
void moveToErrorCommand(Command command, String message); void moveToErrorCommand(Command command, String message);
@ -161,8 +161,6 @@ public interface ProcessService {
void changeOutParam(TaskInstance taskInstance); void changeOutParam(TaskInstance taskInstance);
List<String> convertIntListToString(List<Integer> intList);
Schedule querySchedule(int id); Schedule querySchedule(int id);
List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode); List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode);

23
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.service.process; package org.apache.dolphinscheduler.service.process;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
@ -31,6 +30,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
import static java.util.stream.Collectors.toSet;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
@ -173,13 +174,6 @@ public class ProcessServiceImpl implements ProcessService {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
private final int[] stateArray = new int[] {ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.DISPATCH.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal()};
@Autowired @Autowired
private UserMapper userMapper; private UserMapper userMapper;
@ -290,7 +284,7 @@ public class ProcessServiceImpl implements ProcessService {
*/ */
@Override @Override
@Transactional @Transactional
public ProcessInstance handleCommand(Logger logger, String host, Command command) { public ProcessInstance handleCommand(String host, Command command) {
ProcessInstance processInstance = constructProcessInstance(command, host); ProcessInstance processInstance = constructProcessInstance(command, host);
// cannot construct process instance, return null // cannot construct process instance, return null
if (processInstance == null) { if (processInstance == null) {
@ -1973,8 +1967,7 @@ public class ProcessServiceImpl implements ProcessService {
* @param intList intList * @param intList intList
* @return string list * @return string list
*/ */
@Override private List<String> convertIntListToString(List<Integer> intList) {
public List<String> convertIntListToString(List<Integer> intList) {
if (intList == null) { if (intList == null) {
return new ArrayList<>(); return new ArrayList<>();
} }
@ -2039,12 +2032,12 @@ public class ProcessServiceImpl implements ProcessService {
*/ */
@Override @Override
public List<ProcessInstance> queryNeedFailoverProcessInstances(String host) { public List<ProcessInstance> queryNeedFailoverProcessInstances(String host) {
return processInstanceMapper.queryByHostAndStatus(host, stateArray); return processInstanceMapper.queryByHostAndStatus(host, ExecutionStatus.getNeedFailoverWorkflowInstanceState());
} }
@Override @Override
public List<String> queryNeedFailoverProcessInstanceHost() { public List<String> queryNeedFailoverProcessInstanceHost() {
return processInstanceMapper.queryNeedFailoverProcessInstanceHost(stateArray); return processInstanceMapper.queryNeedFailoverProcessInstanceHost(ExecutionStatus.getNeedFailoverWorkflowInstanceState());
} }
/** /**
@ -2081,7 +2074,7 @@ public class ProcessServiceImpl implements ProcessService {
@Override @Override
public List<TaskInstance> queryNeedFailoverTaskInstances(String host) { public List<TaskInstance> queryNeedFailoverTaskInstances(String host) {
return taskInstanceMapper.queryByHostAndStatus(host, return taskInstanceMapper.queryByHostAndStatus(host,
stateArray); ExecutionStatus.getNeedFailoverWorkflowInstanceState());
} }
/** /**
@ -2215,7 +2208,7 @@ public class ProcessServiceImpl implements ProcessService {
return processInstanceMapper.queryLastRunningProcess(definitionCode, return processInstanceMapper.queryLastRunningProcess(definitionCode,
startTime, startTime,
endTime, endTime,
stateArray); ExecutionStatus.getNeedFailoverWorkflowInstanceState());
} }
/** /**

53
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -17,7 +17,12 @@
package org.apache.dolphinscheduler.service.process; package org.apache.dolphinscheduler.service.process;
import com.fasterxml.jackson.databind.JsonNode; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.mockito.ArgumentMatchers.any;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
@ -72,9 +77,17 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.dp.InputType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.OptionSourceType; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.OptionSourceType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ValueType; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ValueType;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.cron.CronUtilsTest; import org.apache.dolphinscheduler.service.cron.CronUtilsTest;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.spi.params.base.FormType; import org.apache.dolphinscheduler.spi.params.base.FormType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -88,17 +101,7 @@ import org.powermock.reflect.Whitebox;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import com.fasterxml.jackson.databind.JsonNode;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.mockito.ArgumentMatchers.any;
/** /**
* process service test * process service test
@ -289,8 +292,8 @@ public class ProcessServiceTest {
command.setProcessDefinitionCode(222); command.setProcessDefinitionCode(222);
command.setCommandType(CommandType.REPEAT_RUNNING); command.setCommandType(CommandType.REPEAT_RUNNING);
command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\"" command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\""
+ CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}"); + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}");
Assert.assertNull(processService.handleCommand(logger, host, command)); Assert.assertNull(processService.handleCommand(host, command));
int definitionVersion = 1; int definitionVersion = 1;
long definitionCode = 123; long definitionCode = 123;
@ -325,7 +328,7 @@ public class ProcessServiceTest {
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(), Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition)); processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance); Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
Assert.assertNotNull(processService.handleCommand(logger, host, command1)); Assert.assertNotNull(processService.handleCommand(host, command1));
Command command2 = new Command(); Command command2 = new Command();
command2.setId(2); command2.setId(2);
@ -335,7 +338,7 @@ public class ProcessServiceTest {
command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS); command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
command2.setProcessInstanceId(processInstanceId); command2.setProcessInstanceId(processInstanceId);
Mockito.when(commandMapper.deleteById(2)).thenReturn(1); Mockito.when(commandMapper.deleteById(2)).thenReturn(1);
Assert.assertNotNull(processService.handleCommand(logger, host, command2)); Assert.assertNotNull(processService.handleCommand(host, command2));
Command command3 = new Command(); Command command3 = new Command();
command3.setId(3); command3.setId(3);
@ -345,7 +348,7 @@ public class ProcessServiceTest {
command3.setCommandParam("{\"WaitingThreadInstanceId\":222}"); command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
Mockito.when(commandMapper.deleteById(3)).thenReturn(1); Mockito.when(commandMapper.deleteById(3)).thenReturn(1);
Assert.assertNotNull(processService.handleCommand(logger, host, command3)); Assert.assertNotNull(processService.handleCommand(host, command3));
Command command4 = new Command(); Command command4 = new Command();
command4.setId(4); command4.setId(4);
@ -355,7 +358,7 @@ public class ProcessServiceTest {
command4.setCommandType(CommandType.REPEAT_RUNNING); command4.setCommandType(CommandType.REPEAT_RUNNING);
command4.setProcessInstanceId(processInstanceId); command4.setProcessInstanceId(processInstanceId);
Mockito.when(commandMapper.deleteById(4)).thenReturn(1); Mockito.when(commandMapper.deleteById(4)).thenReturn(1);
Assert.assertNotNull(processService.handleCommand(logger, host, command4)); Assert.assertNotNull(processService.handleCommand(host, command4));
Command command5 = new Command(); Command command5 = new Command();
command5.setId(5); command5.setId(5);
@ -374,7 +377,7 @@ public class ProcessServiceTest {
processDefinition.getGlobalParamList(), processDefinition.getGlobalParamList(),
CommandType.START_PROCESS, CommandType.START_PROCESS,
processInstance.getScheduleTime(), null)).thenReturn("\"testStartParam1\""); processInstance.getScheduleTime(), null)).thenReturn("\"testStartParam1\"");
ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5); ProcessInstance processInstance1 = processService.handleCommand(host, command5);
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\"")); Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
ProcessDefinition processDefinition1 = new ProcessDefinition(); ProcessDefinition processDefinition1 = new ProcessDefinition();
@ -399,7 +402,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryDetailById(223)).thenReturn(processInstance2); Mockito.when(processInstanceMapper.queryDetailById(223)).thenReturn(processInstance2);
Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1); Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1);
Mockito.when(commandMapper.deleteById(1)).thenReturn(1); Mockito.when(commandMapper.deleteById(1)).thenReturn(1);
Assert.assertNotNull(processService.handleCommand(logger, host, command1)); Assert.assertNotNull(processService.handleCommand(host, command1));
Command command6 = new Command(); Command command6 = new Command();
command6.setId(6); command6.setId(6);
@ -410,7 +413,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists); Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true); Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true);
Mockito.when(commandMapper.deleteById(6)).thenReturn(1); Mockito.when(commandMapper.deleteById(6)).thenReturn(1);
ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6); ProcessInstance processInstance6 = processService.handleCommand(host, command6);
Assert.assertTrue(processInstance6 != null); Assert.assertTrue(processInstance6 != null);
processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_DISCARD); processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_DISCARD);
@ -429,7 +432,7 @@ public class ProcessServiceTest {
command7.setProcessDefinitionVersion(1); command7.setProcessDefinitionVersion(1);
Mockito.when(commandMapper.deleteById(7)).thenReturn(1); Mockito.when(commandMapper.deleteById(7)).thenReturn(1);
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null); Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null);
ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7); ProcessInstance processInstance8 = processService.handleCommand(host, command7);
Assert.assertTrue(processInstance8 != null); Assert.assertTrue(processInstance8 != null);
ProcessDefinition processDefinition2 = new ProcessDefinition(); ProcessDefinition processDefinition2 = new ProcessDefinition();
@ -453,7 +456,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(12L, 1, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists); Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(12L, 1, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1); Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1);
Mockito.when(commandMapper.deleteById(9)).thenReturn(1); Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9); ProcessInstance processInstance10 = processService.handleCommand(host, command9);
Assert.assertTrue(processInstance10 == null); Assert.assertTrue(processInstance10 == null);
} }
@ -494,7 +497,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance); Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
// will throw exception when command id is 0 and delete fail // will throw exception when command id is 0 and delete fail
processService.handleCommand(logger, host, command1); processService.handleCommand(host, command1);
} }
@Test @Test

409
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java

@ -22,13 +22,25 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import java.io.Serializable;
import java.util.Date; import java.util.Date;
import java.util.Map; import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/** /**
* to master/worker task transport * to master/worker task transport
*/ */
public class TaskExecutionContext { @Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TaskExecutionContext implements Serializable {
private static final long serialVersionUID = -1L;
/** /**
* task id * task id
@ -248,399 +260,4 @@ public class TaskExecutionContext {
* max memory * max memory
*/ */
private Integer memoryMax; private Integer memoryMax;
public String getTaskLogName() {
return taskLogName;
}
public void setTaskLogName(String taskLogName) {
this.taskLogName = taskLogName;
}
public Map<String, String> getResources() {
return resources;
}
public void setResources(Map<String, String> resources) {
this.resources = resources;
}
public Map<String, Property> getParamsMap() {
return paramsMap;
}
public void setParamsMap(Map<String, Property> paramsMap) {
this.paramsMap = paramsMap;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public Date getFirstSubmitTime() {
return firstSubmitTime;
}
public void setFirstSubmitTime(Date firstSubmitTime) {
this.firstSubmitTime = firstSubmitTime;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getTaskType() {
return taskType;
}
public void setTaskType(String taskType) {
this.taskType = taskType;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public String getTaskJson() {
return taskJson;
}
public void setTaskJson(String taskJson) {
this.taskJson = taskJson;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public String getAppIds() {
return appIds;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
public Date getScheduleTime() {
return scheduleTime;
}
public void setScheduleTime(Date scheduleTime) {
this.scheduleTime = scheduleTime;
}
public String getGlobalParams() {
return globalParams;
}
public void setGlobalParams(String globalParams) {
this.globalParams = globalParams;
}
public int getExecutorId() {
return executorId;
}
public void setExecutorId(int executorId) {
this.executorId = executorId;
}
public int getCmdTypeIfComplement() {
return cmdTypeIfComplement;
}
public void setCmdTypeIfComplement(int cmdTypeIfComplement) {
this.cmdTypeIfComplement = cmdTypeIfComplement;
}
public String getTenantCode() {
return tenantCode;
}
public void setTenantCode(String tenantCode) {
this.tenantCode = tenantCode;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public int getProcessDefineId() {
return processDefineId;
}
public void setProcessDefineId(int processDefineId) {
this.processDefineId = processDefineId;
}
public int getProjectId() {
return projectId;
}
public void setProjectId(int projectId) {
this.projectId = projectId;
}
public String getTaskParams() {
return taskParams;
}
public void setTaskParams(String taskParams) {
this.taskParams = taskParams;
}
public String getEnvFile() {
return envFile;
}
public void setEnvFile(String envFile) {
this.envFile = envFile;
}
public String getEnvironmentConfig() {
return environmentConfig;
}
public void setEnvironmentConfig(String config) {
this.environmentConfig = config;
}
public Map<String, String> getDefinedParams() {
return definedParams;
}
public void setDefinedParams(Map<String, String> definedParams) {
this.definedParams = definedParams;
}
public String getTaskAppId() {
return taskAppId;
}
public void setTaskAppId(String taskAppId) {
this.taskAppId = taskAppId;
}
public TaskTimeoutStrategy getTaskTimeoutStrategy() {
return taskTimeoutStrategy;
}
public void setTaskTimeoutStrategy(TaskTimeoutStrategy taskTimeoutStrategy) {
this.taskTimeoutStrategy = taskTimeoutStrategy;
}
public int getTaskTimeout() {
return taskTimeout;
}
public void setTaskTimeout(int taskTimeout) {
this.taskTimeout = taskTimeout;
}
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
public int getDelayTime() {
return delayTime;
}
public void setDelayTime(int delayTime) {
this.delayTime = delayTime;
}
public ResourceParametersHelper getResourceParametersHelper() {
return resourceParametersHelper;
}
public void setResourceParametersHelper(ResourceParametersHelper resourceParametersHelper) {
this.resourceParametersHelper = resourceParametersHelper;
}
public String getVarPool() {
return varPool;
}
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public int getDryRun() {
return dryRun;
}
public void setDryRun(int dryRun) {
this.dryRun = dryRun;
}
public Long getProcessDefineCode() {
return processDefineCode;
}
public void setProcessDefineCode(Long processDefineCode) {
this.processDefineCode = processDefineCode;
}
public int getProcessDefineVersion() {
return processDefineVersion;
}
public void setProcessDefineVersion(int processDefineVersion) {
this.processDefineVersion = processDefineVersion;
}
public long getProjectCode() {
return projectCode;
}
public void setProjectCode(long projectCode) {
this.projectCode = projectCode;
}
public DataQualityTaskExecutionContext getDataQualityTaskExecutionContext() {
return dataQualityTaskExecutionContext;
}
public void setDataQualityTaskExecutionContext(DataQualityTaskExecutionContext dataQualityTaskExecutionContext) {
this.dataQualityTaskExecutionContext = dataQualityTaskExecutionContext;
}
public void setCurrentExecutionStatus(ExecutionStatus currentExecutionStatus) {
this.currentExecutionStatus = currentExecutionStatus;
}
public ExecutionStatus getCurrentExecutionStatus() {
return currentExecutionStatus;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
public Integer getCpuQuota() {
return cpuQuota;
}
public void setCpuQuota(Integer cpuQuota) {
this.cpuQuota = cpuQuota;
}
public Integer getMemoryMax() {
return memoryMax;
}
public void setMemoryMax(Integer memoryMax) {
this.memoryMax = memoryMax;
}
public K8sTaskExecutionContext getK8sTaskExecutionContext() {
return k8sTaskExecutionContext;
}
public void setK8sTaskExecutionContext(K8sTaskExecutionContext k8sTaskExecutionContext) {
this.k8sTaskExecutionContext = k8sTaskExecutionContext;
}
@Override
public String toString() {
return "TaskExecutionContext{"
+ "taskInstanceId=" + taskInstanceId
+ ", taskName='" + taskName + '\''
+ ", currentExecutionStatus=" + currentExecutionStatus
+ ", firstSubmitTime=" + firstSubmitTime
+ ", startTime=" + startTime
+ ", taskType='" + taskType + '\''
+ ", host='" + host + '\''
+ ", executePath='" + executePath + '\''
+ ", logPath='" + logPath + '\''
+ ", taskJson='" + taskJson + '\''
+ ", processId=" + processId
+ ", processDefineCode=" + processDefineCode
+ ", processDefineVersion=" + processDefineVersion
+ ", appIds='" + appIds + '\''
+ ", processInstanceId=" + processInstanceId
+ ", scheduleTime=" + scheduleTime
+ ", globalParams='" + globalParams + '\''
+ ", executorId=" + executorId
+ ", cmdTypeIfComplement=" + cmdTypeIfComplement
+ ", tenantCode='" + tenantCode + '\''
+ ", queue='" + queue + '\''
+ ", projectCode=" + projectCode
+ ", taskParams='" + taskParams + '\''
+ ", envFile='" + envFile + '\''
+ ", dryRun='" + dryRun + '\''
+ ", definedParams=" + definedParams
+ ", taskAppId='" + taskAppId + '\''
+ ", taskTimeoutStrategy=" + taskTimeoutStrategy
+ ", taskTimeout=" + taskTimeout
+ ", workerGroup='" + workerGroup + '\''
+ ", environmentConfig='" + environmentConfig + '\''
+ ", delayTime=" + delayTime
+ ", resources=" + resources
+ ", sqlTaskExecutionContext=" + sqlTaskExecutionContext
+ ", k8sTaskExecutionContext=" + k8sTaskExecutionContext
+ ", dataQualityTaskExecutionContext=" + dataQualityTaskExecutionContext
+ '}';
}
} }

23
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java

@ -78,6 +78,15 @@ public enum ExecutionStatus {
private static HashMap<Integer, ExecutionStatus> EXECUTION_STATUS_MAP = new HashMap<>(); private static HashMap<Integer, ExecutionStatus> EXECUTION_STATUS_MAP = new HashMap<>();
private static final int[] NEED_FAILOVER_STATES = new int[] {
ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.DISPATCH.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal()
};
static { static {
for (ExecutionStatus executionStatus : ExecutionStatus.values()) { for (ExecutionStatus executionStatus : ExecutionStatus.values()) {
EXECUTION_STATUS_MAP.put(executionStatus.code, executionStatus); EXECUTION_STATUS_MAP.put(executionStatus.code, executionStatus);
@ -180,4 +189,18 @@ public enum ExecutionStatus {
} }
throw new IllegalArgumentException("invalid status : " + status); throw new IllegalArgumentException("invalid status : " + status);
} }
public static boolean isNeedFailoverWorkflowInstanceState(ExecutionStatus executionStatus) {
return
ExecutionStatus.SUBMITTED_SUCCESS == executionStatus
|| ExecutionStatus.DISPATCH == executionStatus
|| ExecutionStatus.RUNNING_EXECUTION == executionStatus
|| ExecutionStatus.DELAY_EXECUTION == executionStatus
|| ExecutionStatus.READY_PAUSE == executionStatus
|| ExecutionStatus.READY_STOP == executionStatus;
}
public static int[] getNeedFailoverWorkflowInstanceState() {
return NEED_FAILOVER_STATES;
}
} }

25
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java

@ -22,22 +22,20 @@ import java.util.function.Supplier;
import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Metrics;
import lombok.experimental.UtilityClass;
public final class WorkerServerMetrics { @UtilityClass
public class WorkerServerMetrics {
public WorkerServerMetrics() {
throw new UnsupportedOperationException("Utility class");
}
private static final Counter WORKER_OVERLOAD_COUNTER = private static final Counter WORKER_OVERLOAD_COUNTER =
Counter.builder("ds.worker.overload.count") Counter.builder("ds.worker.overload.count")
.description("overloaded workers count") .description("overloaded workers count")
.register(Metrics.globalRegistry); .register(Metrics.globalRegistry);
private static final Counter WORKER_SUBMIT_QUEUE_IS_FULL_COUNTER = private static final Counter WORKER_SUBMIT_QUEUE_IS_FULL_COUNTER =
Counter.builder("ds.worker.full.submit.queue.count") Counter.builder("ds.worker.full.submit.queue.count")
.description("full worker submit queues count") .description("full worker submit queues count")
.register(Metrics.globalRegistry); .register(Metrics.globalRegistry);
public static void incWorkerOverloadCount() { public static void incWorkerOverloadCount() {
WORKER_OVERLOAD_COUNTER.increment(); WORKER_OVERLOAD_COUNTER.increment();
@ -49,8 +47,9 @@ public final class WorkerServerMetrics {
public static void registerWorkerRunningTaskGauge(Supplier<Number> supplier) { public static void registerWorkerRunningTaskGauge(Supplier<Number> supplier) {
Gauge.builder("ds.task.running", supplier) Gauge.builder("ds.task.running", supplier)
.description("number of running tasks on workers") .description("number of running tasks on workers")
.register(Metrics.globalRegistry); .register(Metrics.globalRegistry);
} }
} }

26
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache; import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import java.util.Arrays; import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -92,9 +93,6 @@ public class TaskCallbackService {
* change remote channel * change remote channel
*/ */
public void changeRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) { public void changeRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) {
if (REMOTE_CHANNELS.containsKey(taskInstanceId)) {
REMOTE_CHANNELS.remove(taskInstanceId);
}
REMOTE_CHANNELS.put(taskInstanceId, channel); REMOTE_CHANNELS.put(taskInstanceId, channel);
} }
@ -104,19 +102,19 @@ public class TaskCallbackService {
* @param taskInstanceId taskInstanceId * @param taskInstanceId taskInstanceId
* @return callback channel * @return callback channel
*/ */
private NettyRemoteChannel getRemoteChannel(int taskInstanceId) { private Optional<NettyRemoteChannel> getRemoteChannel(int taskInstanceId) {
Channel newChannel; Channel newChannel;
NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId); NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
if (nettyRemoteChannel != null) { if (nettyRemoteChannel != null) {
if (nettyRemoteChannel.isActive()) { if (nettyRemoteChannel.isActive()) {
return nettyRemoteChannel; return Optional.of(nettyRemoteChannel);
} }
newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost()); newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
if (newChannel != null) { if (newChannel != null) {
return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId); return Optional.of(getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId));
} }
} }
return null; return Optional.empty();
} }
public long pause(int ntries) { public long pause(int ntries) {
@ -151,15 +149,13 @@ public class TaskCallbackService {
* @param command command * @param command command
*/ */
public void send(int taskInstanceId, Command command) { public void send(int taskInstanceId, Command command) {
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId); Optional<NettyRemoteChannel> nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if (nettyRemoteChannel != null) { if (nettyRemoteChannel.isPresent()) {
nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener() { nettyRemoteChannel.get().writeAndFlush(command).addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) { if (!future.isSuccess()) {
// remove(taskInstanceId); logger.error("Send callback command error, taskInstanceId: {}, command: {}", taskInstanceId, command);
return;
} }
} }
}); });

3
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -111,8 +111,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
} }
logger.info("task execute request command : {}", taskRequestCommand); logger.info("task execute request command : {}", taskRequestCommand);
String contextJson = taskRequestCommand.getTaskExecutionContext(); TaskExecutionContext taskExecutionContext = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
if (taskExecutionContext == null) { if (taskExecutionContext == null) {
logger.error("task execution context is null"); logger.error("task execution context is null");

86
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.runner; package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
@ -34,7 +35,7 @@ import org.springframework.stereotype.Component;
* Retry Report Task Status Thread * Retry Report Task Status Thread
*/ */
@Component @Component
public class RetryReportTaskStatusThread implements Runnable { public class RetryReportTaskStatusThread extends BaseDaemonThread {
private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class); private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class);
@ -46,11 +47,14 @@ public class RetryReportTaskStatusThread implements Runnable {
@Autowired @Autowired
private TaskCallbackService taskCallbackService; private TaskCallbackService taskCallbackService;
public void start() { protected RetryReportTaskStatusThread() {
super("RetryReportTaskStatusThread");
}
@Override
public synchronized void start() {
logger.info("Retry report task status thread starting"); logger.info("Retry report task status thread starting");
Thread thread = new Thread(this, "RetryReportTaskStatusThread"); super.start();
thread.setDaemon(true);
thread.start();
logger.info("Retry report task status thread started"); logger.info("Retry report task status thread started");
} }
@ -59,7 +63,7 @@ public class RetryReportTaskStatusThread implements Runnable {
*/ */
@Override @Override
public void run() { public void run() {
ResponseCache instance = ResponseCache.get(); final ResponseCache instance = ResponseCache.get();
while (Stopper.isRunning()) { while (Stopper.isRunning()) {
@ -67,33 +71,57 @@ public class RetryReportTaskStatusThread implements Runnable {
ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL); ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);
try { try {
if (!instance.getRunningCache().isEmpty()) { // todo: Only retry the send failed command
Map<Integer, Command> runningCache = instance.getRunningCache(); retryRunningCommand(instance);
for (Map.Entry<Integer, Command> entry : runningCache.entrySet()) { retryResponseCommand(instance);
Integer taskInstanceId = entry.getKey(); retryRecallCommand(instance);
Command runningCommand = entry.getValue(); } catch (Exception e) {
taskCallbackService.send(taskInstanceId, runningCommand); logger.warn("Retry report task status error", e);
} }
}
}
private void retryRunningCommand(ResponseCache instance) {
if (!instance.getRunningCache().isEmpty()) {
Map<Integer, Command> runningCache = instance.getRunningCache();
for (Map.Entry<Integer, Command> entry : runningCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command runningCommand = entry.getValue();
try {
taskCallbackService.send(taskInstanceId, runningCommand);
} catch (Exception ex) {
logger.error("Retry send running command to master error, taskInstanceId: {}, command: {}", taskInstanceId, runningCommand);
} }
}
}
}
if (!instance.getResponseCache().isEmpty()) { private void retryResponseCommand(ResponseCache instance) {
Map<Integer, Command> responseCache = instance.getResponseCache(); if (!instance.getResponseCache().isEmpty()) {
for (Map.Entry<Integer, Command> entry : responseCache.entrySet()) { Map<Integer, Command> responseCache = instance.getResponseCache();
Integer taskInstanceId = entry.getKey(); for (Map.Entry<Integer, Command> entry : responseCache.entrySet()) {
Command responseCommand = entry.getValue(); Integer taskInstanceId = entry.getKey();
taskCallbackService.send(taskInstanceId, responseCommand); Command responseCommand = entry.getValue();
} try {
taskCallbackService.send(taskInstanceId, responseCommand);
} catch (Exception ex) {
logger.error("Retry send response command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
} }
if (!instance.getRecallCache().isEmpty()) { }
Map<Integer, Command> recallCache = instance.getRecallCache(); }
for (Map.Entry<Integer, Command> entry : recallCache.entrySet()) { }
Integer taskInstanceId = entry.getKey();
Command responseCommand = entry.getValue(); private void retryRecallCommand(ResponseCache instance) {
taskCallbackService.send(taskInstanceId, responseCommand); if (!instance.getRecallCache().isEmpty()) {
} Map<Integer, Command> recallCache = instance.getRecallCache();
for (Map.Entry<Integer, Command> entry : recallCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command responseCommand = entry.getValue();
try {
taskCallbackService.send(taskInstanceId, responseCommand);
} catch (Exception ex) {
logger.error("Retry send recall command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
} }
} catch (Exception e) {
logger.warn("Retry report task status error", e);
} }
} }
} }

3
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -132,9 +132,10 @@ public class TaskExecuteThread implements Runnable, Delayed {
taskExecutionContext.setEndTime(new Date()); taskExecutionContext.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
logger.info("[WorkflowInstance-{}][TaskInstance-{}] Task dry run success",
taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
return; return;
} }
try { try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
logger.info("script path : {}", taskExecutionContext.getExecutePath()); logger.info("script path : {}", taskExecutionContext.getExecutePath());

4
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java

@ -88,7 +88,7 @@ public class TaskExecuteProcessorTest {
command = new Command(); command = new Command();
command.setType(CommandType.TASK_EXECUTE_REQUEST); command.setType(CommandType.TASK_EXECUTE_REQUEST);
ackCommand = new TaskExecuteRunningCommand().convert2Command(); ackCommand = new TaskExecuteRunningCommand().convert2Command();
taskRequestCommand = new TaskExecuteRequestCommand(); taskRequestCommand = new TaskExecuteRequestCommand(taskExecutionContext);
alertClientService = PowerMockito.mock(AlertClientService.class); alertClientService = PowerMockito.mock(AlertClientService.class);
workerExecService = PowerMockito.mock(ExecutorService.class); workerExecService = PowerMockito.mock(ExecutorService.class);
PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class))) PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class)))
@ -127,8 +127,6 @@ public class TaskExecuteProcessorTest {
PowerMockito.mockStatic(JSONUtils.class); PowerMockito.mockStatic(JSONUtils.class);
PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class)) PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class))
.thenReturn(taskRequestCommand); .thenReturn(taskRequestCommand);
PowerMockito.when(JSONUtils.parseObject(taskRequestCommand.getTaskExecutionContext(), TaskExecutionContext.class))
.thenReturn(taskExecutionContext);
PowerMockito.mockStatic(FileUtils.class); PowerMockito.mockStatic(FileUtils.class);
PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(), PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),

Loading…
Cancel
Save