Browse Source

[Fix-10827] Fix network error cause worker cannot send message to master (#10886)

* Fix network error cause worker cannot send message to master
3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
cade66a9b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  2. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  3. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
  4. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
  5. 19
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
  6. 20
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
  7. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
  8. 17
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
  9. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
  10. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
  11. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
  12. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
  13. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  14. 16
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
  15. 6
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
  16. 25
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
  17. 20
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
  18. 47
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
  19. 57
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/BaseCommand.java
  20. 47
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
  21. 39
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
  22. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
  23. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java
  24. 24
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskDispatchCommand.java
  25. 48
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
  26. 212
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
  27. 111
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResultCommand.java
  28. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckMessage.java
  29. 101
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java
  30. 51
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java
  31. 48
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java
  32. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
  33. 18
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  34. 115
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
  35. 8
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  36. 139
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
  37. 43
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageSender.java
  38. 70
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteResultMessageSender.java
  39. 67
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteRunningMessageSender.java
  40. 59
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java
  41. 13
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
  42. 276
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  43. 78
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
  44. 71
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java
  45. 83
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java
  46. 17
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
  47. 50
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  48. 40
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
  49. 92
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java
  50. 75
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
  51. 18
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
  52. 134
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
  53. 85
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  54. 6
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
  55. 19
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
  56. 92
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java
  57. 12
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.config;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
@ -82,6 +83,10 @@ public class MasterConfig implements Validator {
private double reservedMemory = 0.3;
private Duration failoverInterval = Duration.ofMinutes(10);
private boolean killYarnJobWhenTaskFailover = true;
/**
* ip:listenPort
*/
private String masterAddress;
@Override
public boolean supports(Class<?> clazz) {
@ -124,5 +129,6 @@ public class MasterConfig implements Validator {
if (masterConfig.getMaxCpuLoadAvg() <= 0) {
masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
}
}

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
@ -241,7 +241,11 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
}
private Command toCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(taskExecutionContext);
// todo: we didn't set the host here, since right now we didn't need to retry this message.
TaskDispatchCommand requestCommand = new TaskDispatchCommand(taskExecutionContext,
masterConfig.getMasterAddress(),
taskExecutionContext.getHost(),
System.currentTimeMillis());
return requestCommand.convert2Command();
}

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java

@ -88,10 +88,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
@PostConstruct
public void init() {
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_REJECT, taskRecallProcessor);
}
/**

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java

@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
@ -108,9 +108,9 @@ public class TaskDelayEventHandler implements TaskEventHandler {
private void sendAckToWorker(TaskEvent taskEvent) {
// If event handle success, send ack to worker to otherwise the worker will retry this event
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
TaskExecuteRunningAckMessage taskExecuteRunningAckMessage =
new TaskExecuteRunningAckMessage(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckMessage.convert2Command());
}
@Override

19
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java

@ -20,8 +20,9 @@ package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskRejectAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
@ -34,13 +35,16 @@ public class TaskRejectByWorkerEventHandler implements TaskEventHandler {
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Autowired
private MasterConfig masterConfig;
@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
WorkflowExecuteRunnable workflowExecuteRunnable =
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(
processInstanceId);
if (workflowExecuteRunnable == null) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
@ -65,9 +69,12 @@ public class TaskRejectByWorkerEventHandler implements TaskEventHandler {
}
public void sendAckToWorker(TaskEvent taskEvent) {
TaskRecallAckCommand taskRecallAckCommand =
new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskRecallAckCommand.convert2Command());
TaskRejectAckCommand taskRejectAckMessage = new TaskRejectAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId(),
masterConfig.getMasterAddress(),
taskEvent.getWorkerAddress(),
System.currentTimeMillis());
taskEvent.getChannel().writeAndFlush(taskRejectAckMessage.convert2Command());
}
@Override

20
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java

@ -22,8 +22,9 @@ import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
@ -50,13 +51,16 @@ public class TaskResultEventHandler implements TaskEventHandler {
@Autowired
private ProcessService processService;
@Autowired
private MasterConfig masterConfig;
@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, TaskEventHandleException {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
WorkflowExecuteRunnable workflowExecuteRunnable =
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(
processInstanceId);
if (workflowExecuteRunnable == null) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
@ -105,9 +109,13 @@ public class TaskResultEventHandler implements TaskEventHandler {
}
public void sendAckToWorker(TaskEvent taskEvent) {
TaskExecuteResponseAckCommand taskExecuteResponseAckCommand =
new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
// we didn't set the receiver address, since the ack doen's need to retry
TaskExecuteAckCommand taskExecuteAckMessage = new TaskExecuteAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId(),
masterConfig.getMasterAddress(),
taskEvent.getWorkerAddress(),
System.currentTimeMillis());
taskEvent.getChannel().writeAndFlush(taskExecuteAckMessage.convert2Command());
}
@Override

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java

@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
@ -106,9 +106,9 @@ public class TaskRunningEventHandler implements TaskEventHandler {
private void sendAckToWorker(TaskEvent taskEvent) {
// If event handle success, send ack to worker to otherwise the worker will retry this event
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
TaskExecuteRunningAckMessage taskExecuteRunningAckMessage =
new TaskExecuteRunningAckMessage(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckMessage.convert2Command());
}
@Override

17
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java

@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
@ -55,15 +55,18 @@ public class TaskExecuteResponseProcessor implements NettyRequestProcessor {
*/
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskExecuteResponseCommand taskExecuteResponseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
TaskEvent taskResponseEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel);
TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(),
TaskExecuteResultCommand.class);
TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel);
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResponseEvent.getProcessInstanceId(), taskResponseEvent.getTaskInstanceId());
logger.info("Received task execute response, event: {}", taskResponseEvent);
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(),
taskResultEvent.getTaskInstanceId());
logger.info("Received task execute result, event: {}", taskResultEvent);
taskEventService.addEvent(taskResponseEvent);
taskEventService.addEvent(taskResultEvent);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java

@ -54,10 +54,10 @@ public class TaskExecuteRunningProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskExecuteRunningCommand taskExecuteRunningCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningCommand);
TaskExecuteRunningCommand taskExecuteRunningMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningMessage);
TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningCommand, channel);
TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel);
taskEventService.addEvent(taskEvent);
}

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java

@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
@ -54,8 +54,8 @@ public class TaskRecallProcessor implements NettyRequestProcessor {
*/
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_RECALL == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskRecallCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRecallCommand.class);
Preconditions.checkArgument(CommandType.TASK_REJECT == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskRejectCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRejectCommand.class);
TaskEvent taskEvent = TaskEvent.newRecallEvent(recallCommand, channel);
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(recallCommand.getProcessInstanceId(), recallCommand.getTaskInstanceId());

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java

@ -19,9 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import java.util.Date;
@ -120,7 +120,7 @@ public class TaskEvent {
return event;
}
public static TaskEvent newResultEvent(TaskExecuteResponseCommand command, Channel channel) {
public static TaskEvent newResultEvent(TaskExecuteResultCommand command, Channel channel) {
TaskEvent event = new TaskEvent();
event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId());
@ -138,7 +138,7 @@ public class TaskEvent {
return event;
}
public static TaskEvent newRecallEvent(TaskRecallCommand command, Channel channel) {
public static TaskEvent newRecallEvent(TaskRejectCommand command, Channel channel) {
TaskEvent event = new TaskEvent();
event.setTaskInstanceId(command.getTaskInstanceId());
event.setProcessInstanceId(command.getProcessInstanceId());

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java

@ -84,14 +84,14 @@ public class MasterRPCServer implements AutoCloseable {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT, taskExecuteResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT, taskRecallProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST, workflowExecutingDataRequestProcessor);
// logger server

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -126,8 +126,6 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstanceId);
if (removeFlag) {
logger.info("Success remove workflow instance from timeout check list");
} else {
logger.warn("Failed to remove workflow instance from timeout check list");
}
}

16
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java

@ -24,12 +24,11 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.mockito.Mockito;
@ -44,12 +43,15 @@ public class ExecutionContextTestUtils {
processInstance.setCommandType(CommandType.COMPLEMENT_DATA);
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext context = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(context);
TaskDispatchCommand requestCommand = new TaskDispatchCommand(context,
"127.0.0.1:5678",
"127.0.0.1:5678",
System.currentTimeMillis());
Command command = requestCommand.convert2Command();
ExecutionContext executionContext = new ExecutionContext(command, ExecutorType.WORKER, taskInstance);

6
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java

@ -18,11 +18,12 @@
package org.apache.dolphinscheduler.server.master.dispatch;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.junit.Ignore;
@ -60,7 +61,8 @@ public class ExecutorDispatcherTest {
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(port);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(org.apache.dolphinscheduler.remote.command.CommandType.TASK_EXECUTE_REQUEST, Mockito.mock(TaskExecuteProcessor.class));
nettyRemotingServer.registerProcessor(CommandType.TASK_DISPATCH_REQUEST, Mockito.mock(
TaskDispatchProcessor.class));
nettyRemotingServer.start();
//
workerConfig.setListenPort(port);

25
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java

@ -25,15 +25,14 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor;
import org.junit.Assert;
import org.junit.Ignore;
@ -49,16 +48,15 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@Ignore
public class NettyExecutorManagerTest {
@Autowired
private NettyExecutorManager nettyExecutorManager;
@Test
public void testExecute() throws ExecuteException {
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(org.apache.dolphinscheduler.remote.command.CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
nettyRemotingServer.registerProcessor(org.apache.dolphinscheduler.remote.command.CommandType.TASK_DISPATCH_REQUEST,
new TaskDispatchProcessor());
nettyRemotingServer.start();
TaskInstance taskInstance = Mockito.mock(TaskInstance.class);
ProcessDefinition processDefinition = Mockito.mock(ProcessDefinition.class);
@ -66,10 +64,10 @@ public class NettyExecutorManagerTest {
processInstance.setCommandType(CommandType.COMPLEMENT_DATA);
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext context = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, taskInstance);
executionContext.setHost(Host.of(NetUtils.getAddr(serverConfig.getListenPort())));
Boolean execute = nettyExecutorManager.execute(executionContext);
@ -94,10 +92,11 @@ public class NettyExecutorManagerTest {
nettyExecutorManager.execute(executionContext);
}
private Command toCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
requestCommand.setTaskExecutionContext(taskExecutionContext);
TaskDispatchCommand requestCommand = new TaskDispatchCommand(taskExecutionContext,
"127.0.0.1:5678",
"127.0.0.1:1234",
System.currentTimeMillis());
return requestCommand.convert2Command();
}
}

20
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java

@ -44,7 +44,7 @@ public class TaskAckProcessorTest {
private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
private TaskEventService taskEventService;
private ProcessService processService;
private TaskExecuteRunningCommand taskExecuteRunningCommand;
private TaskExecuteRunningCommand taskExecuteRunningMessage;
private TaskEvent taskResponseEvent;
private Channel channel;
@ -63,14 +63,16 @@ public class TaskAckProcessorTest {
channel = PowerMockito.mock(Channel.class);
taskResponseEvent = PowerMockito.mock(TaskEvent.class);
taskExecuteRunningCommand = new TaskExecuteRunningCommand();
taskExecuteRunningCommand.setStatus(1);
taskExecuteRunningCommand.setExecutePath("/dolphinscheduler/worker");
taskExecuteRunningCommand.setHost("localhost");
taskExecuteRunningCommand.setLogPath("/temp/worker.log");
taskExecuteRunningCommand.setStartTime(new Date());
taskExecuteRunningCommand.setTaskInstanceId(1);
taskExecuteRunningCommand.setProcessInstanceId(1);
taskExecuteRunningMessage = new TaskExecuteRunningCommand("127.0.0.1:5678",
" 127.0.0.1:1234",
System.currentTimeMillis());
taskExecuteRunningMessage.setStatus(1);
taskExecuteRunningMessage.setExecutePath("/dolphinscheduler/worker");
taskExecuteRunningMessage.setHost("localhost");
taskExecuteRunningMessage.setLogPath("/temp/worker.log");
taskExecuteRunningMessage.setStartTime(new Date());
taskExecuteRunningMessage.setTaskInstanceId(1);
taskExecuteRunningMessage.setProcessInstanceId(1);
}
@Test

47
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java

@ -17,9 +17,10 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
@ -76,26 +77,30 @@ public class TaskResponseServiceTest {
Mockito.when(channel.remoteAddress()).thenReturn(InetSocketAddress.createUnresolved("127.0.0.1", 1234));
TaskExecuteRunningCommand taskExecuteRunningCommand = new TaskExecuteRunningCommand();
taskExecuteRunningCommand.setProcessId(1);
taskExecuteRunningCommand.setTaskInstanceId(22);
taskExecuteRunningCommand.setStatus(ExecutionStatus.RUNNING_EXECUTION.getCode());
taskExecuteRunningCommand.setExecutePath("path");
taskExecuteRunningCommand.setLogPath("logPath");
taskExecuteRunningCommand.setHost("127.*.*.*");
taskExecuteRunningCommand.setStartTime(new Date());
ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningCommand, channel);
TaskExecuteResponseCommand taskExecuteResponseCommand = new TaskExecuteResponseCommand();
taskExecuteResponseCommand.setProcessInstanceId(1);
taskExecuteResponseCommand.setTaskInstanceId(22);
taskExecuteResponseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
taskExecuteResponseCommand.setEndTime(new Date());
taskExecuteResponseCommand.setVarPool("varPol");
taskExecuteResponseCommand.setAppIds("ids");
taskExecuteResponseCommand.setProcessId(1);
resultEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel);
TaskExecuteRunningCommand taskExecuteRunningMessage = new TaskExecuteRunningCommand("127.0.0.1:5678",
"127.0.0.1:1234",
System.currentTimeMillis());
taskExecuteRunningMessage.setProcessId(1);
taskExecuteRunningMessage.setTaskInstanceId(22);
taskExecuteRunningMessage.setStatus(ExecutionStatus.RUNNING_EXECUTION.getCode());
taskExecuteRunningMessage.setExecutePath("path");
taskExecuteRunningMessage.setLogPath("logPath");
taskExecuteRunningMessage.setHost("127.*.*.*");
taskExecuteRunningMessage.setStartTime(new Date());
ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel);
TaskExecuteResultCommand taskExecuteResultMessage = new TaskExecuteResultCommand(NetUtils.getAddr(1234),
NetUtils.getAddr(5678),
System.currentTimeMillis());
taskExecuteResultMessage.setProcessInstanceId(1);
taskExecuteResultMessage.setTaskInstanceId(22);
taskExecuteResultMessage.setStatus(ExecutionStatus.SUCCESS.getCode());
taskExecuteResultMessage.setEndTime(new Date());
taskExecuteResultMessage.setVarPool("varPol");
taskExecuteResultMessage.setAppIds("ids");
taskExecuteResultMessage.setProcessId(1);
resultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel);
taskInstance = new TaskInstance();
taskInstance.setId(22);

57
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/BaseCommand.java

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import java.io.Serializable;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* This is the base class for rpc message.
* <p>
* Since we use async mode, the client send a message and will wait the target server
* send ack for the message, the client will retry during a while if he doesn't receive an ack.
* <p>
* When there is a network error, the server cannot send ack to the client by the origin channel,
* since the client has closed the channel, so the server need to know the command source.
*/
@Data
@NoArgsConstructor
public abstract class BaseCommand implements Serializable {
private static final long serialVersionUID = -1L;
/**
* If the message receiver want to send ack to the sender, need to use this address.
*/
protected String messageSenderAddress;
/**
* The message receiver address.
*/
protected String messageReceiverAddress;
protected long messageSendTime;
protected BaseCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
this.messageSenderAddress = messageSenderAddress;
this.messageReceiverAddress = messageReceiverAddress;
this.messageSendTime = messageSendTime;
}
}

47
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java

@ -14,13 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import java.io.Serializable;
import lombok.Data;
/**
* command header
*/
@Data
public class CommandHeader implements Serializable {
/**
@ -34,12 +38,12 @@ public class CommandHeader implements Serializable {
private long opaque;
/**
* context length
* context length
*/
private int contextLength;
/**
* context
* context
*/
private byte[] context;
@ -48,43 +52,4 @@ public class CommandHeader implements Serializable {
*/
private int bodyLength;
public int getBodyLength() {
return bodyLength;
}
public void setBodyLength(int bodyLength) {
this.bodyLength = bodyLength;
}
public byte getType() {
return type;
}
public void setType(byte type) {
this.type = type;
}
public long getOpaque() {
return opaque;
}
public void setOpaque(long opaque) {
this.opaque = opaque;
}
public int getContextLength() {
return contextLength;
}
public void setContextLength(int contextLength) {
this.contextLength = contextLength;
}
public byte[] getContext() {
return context;
}
public void setContext(byte[] context) {
this.context = context;
}
}

39
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java

@ -64,9 +64,9 @@ public enum CommandType {
MASTER_RESPONSE,
/**
* execute task request
* dispatch task request
*/
TASK_EXECUTE_REQUEST,
TASK_DISPATCH_REQUEST,
/**
* task execute running, from worker to master
@ -81,56 +81,29 @@ public enum CommandType {
/**
* task execute response, from worker to master
*/
TASK_EXECUTE_RESPONSE,
TASK_EXECUTE_RESULT,
/**
* task execute response ack, from master to worker
*/
TASK_EXECUTE_RESPONSE_ACK,
TASK_EXECUTE_RESULT_ACK,
/**
* kill task
*/
TASK_KILL_REQUEST,
/**
* kill task response
*/
TASK_KILL_RESPONSE,
/**
* task recall
*/
TASK_RECALL,
TASK_REJECT,
/**
* task recall ack
*/
TASK_RECALL_ACK,
TASK_REJECT_ACK,
/**
* HEART_BEAT
*/
HEART_BEAT,
/**
* ping
*/
PING,
/**
* pong
*/
PONG,
/**
* alert send request
*/
ALERT_SEND_REQUEST,
/**
* alert send response
*/
ALERT_SEND_RESPONSE,
/**

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java

@ -26,9 +26,6 @@ import java.io.Serializable;
*/
public class HostUpdateCommand implements Serializable {
/**
* task id
*/
private int taskInstanceId;
private String processHost;

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java

@ -61,7 +61,7 @@ public class StateEventResponseCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_RESPONSE_ACK);
command.setType(CommandType.TASK_EXECUTE_RESULT_ACK);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;

24
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java → dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskDispatchCommand.java

@ -20,24 +20,36 @@ package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* The task dispatch message, means dispatch a task to worker.
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TaskExecuteRequestCommand implements Serializable {
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class TaskDispatchCommand extends BaseCommand {
private static final long serialVersionUID = -1L;
private TaskExecutionContext taskExecutionContext;
public TaskDispatchCommand(TaskExecutionContext taskExecutionContext,
String messageSenderAddress,
String messageReceiverAddress,
long messageSendTime) {
super(messageSenderAddress, messageReceiverAddress, messageSendTime);
this.taskExecutionContext = taskExecutionContext;
}
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_REQUEST);
command.setType(CommandType.TASK_DISPATCH_REQUEST);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;

48
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java → dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java

@ -19,41 +19,34 @@ package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* task recall ack command
* task execute response ack command
* from master to worker
*/
public class TaskRecallAckCommand implements Serializable {
@Data
@NoArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class TaskExecuteAckCommand extends BaseCommand {
private int taskInstanceId;
private int status;
public TaskRecallAckCommand() {
super();
}
public TaskRecallAckCommand(int status, int taskInstanceId) {
public TaskExecuteAckCommand(int status,
int taskInstanceId,
String sourceServerAddress,
String messageReceiverAddress,
long messageSendTime) {
super(sourceServerAddress, messageReceiverAddress, messageSendTime);
this.status = status;
this.taskInstanceId = taskInstanceId;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
/**
* package response command
*
@ -61,14 +54,9 @@ public class TaskRecallAckCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_RECALL_ACK);
command.setType(CommandType.TASK_EXECUTE_RESULT_ACK);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskRecallAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}';
}
}

212
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java

@ -1,212 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
import java.util.Date;
/**
* execute task response command
*/
public class TaskExecuteResponseCommand implements Serializable {
public TaskExecuteResponseCommand() {
}
public TaskExecuteResponseCommand(int taskInstanceId, int processInstanceId) {
this.taskInstanceId = taskInstanceId;
this.processInstanceId = processInstanceId;
}
/**
* task instance id
*/
private int taskInstanceId;
/**
* process instance id
*/
private int processInstanceId;
/**
* status
*/
private int status;
/**
* startTime
*/
private Date startTime;
/**
* host
*/
private String host;
/**
* logPath
*/
private String logPath;
/**
* executePath
*/
private String executePath;
/**
* end time
*/
private Date endTime;
/**
* processId
*/
private int processId;
/**
* appIds
*/
private String appIds;
/**
* varPool string
*/
private String varPool;
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public String getVarPool() {
return varPool;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public String getAppIds() {
return appIds;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
}
/**
* package response command
*
* @return command
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_RESPONSE);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskExecuteResponseCommand{"
+ "taskInstanceId=" + taskInstanceId
+ ", processInstanceId=" + processInstanceId
+ ", status=" + status
+ ", startTime=" + startTime
+ ", endTime=" + endTime
+ ", host=" + host
+ ", logPath=" + logPath
+ ", executePath=" + executePath
+ ", processId=" + processId
+ ", appIds='" + appIds + '\''
+ ", varPool=" + varPool
+ '}';
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
}

111
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResultCommand.java

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.util.Date;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* execute task response command
*/
@Data
@NoArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class TaskExecuteResultCommand extends BaseCommand {
public TaskExecuteResultCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
super(messageSenderAddress, messageReceiverAddress, messageSendTime);
}
/**
* task instance id
*/
private int taskInstanceId;
/**
* process instance id
*/
private int processInstanceId;
/**
* status
*/
private int status;
/**
* startTime
*/
private Date startTime;
/**
* host
*/
private String host;
/**
* logPath
*/
private String logPath;
/**
* executePath
*/
private String executePath;
/**
* end time
*/
private Date endTime;
/**
* processId
*/
private int processId;
/**
* appIds
*/
private String appIds;
/**
* varPool string
*/
private String varPool;
/**
* package response command
*
* @return command
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_RESULT);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
}

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckCommand.java → dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckMessage.java

@ -25,16 +25,16 @@ import java.io.Serializable;
* task execute running ack command
* from master to worker
*/
public class TaskExecuteRunningAckCommand implements Serializable {
public class TaskExecuteRunningAckMessage implements Serializable {
private int taskInstanceId;
private int status;
public TaskExecuteRunningAckCommand() {
public TaskExecuteRunningAckMessage() {
super();
}
public TaskExecuteRunningAckCommand(int status, int taskInstanceId) {
public TaskExecuteRunningAckMessage(int status, int taskInstanceId) {
this.status = status;
this.taskInstanceId = taskInstanceId;
}

101
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java

@ -19,14 +19,21 @@ package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
import java.util.Date;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* task execute running command
* from worker to master
* Task running message, means the task is running in worker.
*/
public class TaskExecuteRunningCommand implements Serializable {
@Data
@NoArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class TaskExecuteRunningCommand extends BaseCommand {
/**
* taskInstanceId
@ -73,76 +80,8 @@ public class TaskExecuteRunningCommand implements Serializable {
*/
private String appIds;
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public String getAppIds() {
return appIds;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
public TaskExecuteRunningCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
super(messageSenderAddress, messageReceiverAddress, messageSendTime);
}
/**
@ -158,18 +97,4 @@ public class TaskExecuteRunningCommand implements Serializable {
return command;
}
@Override
public String toString() {
return "TaskExecuteRunningCommand{"
+ "taskInstanceId=" + taskInstanceId
+ ", processInstanceId='" + processInstanceId + '\''
+ ", startTime=" + startTime
+ ", host='" + host + '\''
+ ", status=" + status
+ ", logPath='" + logPath + '\''
+ ", executePath='" + executePath + '\''
+ ", processId=" + processId + '\''
+ ", appIds='" + appIds + '\''
+ '}';
}
}

51
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseAckCommand.java → dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java

@ -19,39 +19,27 @@ package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* task execute response ack command
* from master to worker
*/
public class TaskExecuteResponseAckCommand implements Serializable {
@Data
@NoArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class TaskRejectAckCommand extends BaseCommand {
private int taskInstanceId;
private int status;
public TaskExecuteResponseAckCommand() {
super();
}
public TaskExecuteResponseAckCommand(int status, int taskInstanceId) {
this.status = status;
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
public TaskRejectAckCommand(int status,
int taskInstanceId,
String messageSenderAddress,
String messageReceiverAddress,
long messageSendTime) {
super(messageSenderAddress, messageReceiverAddress, messageSendTime);
this.status = status;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
@ -62,17 +50,10 @@ public class TaskExecuteResponseAckCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_RESPONSE_ACK);
command.setType(CommandType.TASK_REJECT_ACK);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskExecuteResponseAckCommand{"
+ "taskInstanceId=" + taskInstanceId
+ ", status=" + status
+ '}';
}
}

48
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java → dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java

@ -19,12 +19,19 @@ package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* kill task recall command
* Task reject message, means the task has been rejected by the worker.
*/
public class TaskRecallCommand implements Serializable {
@Data
@NoArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class TaskRejectCommand extends BaseCommand {
/**
* taskInstanceId
@ -41,28 +48,8 @@ public class TaskRecallCommand implements Serializable {
*/
private int processInstanceId;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
public TaskRejectCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
super(messageSenderAddress, messageReceiverAddress, messageSendTime);
}
/**
@ -72,18 +59,9 @@ public class TaskRecallCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_RECALL);
command.setType(CommandType.TASK_REJECT);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskRecallCommand{"
+ "taskInstanceId=" + taskInstanceId
+ ", host='" + host + '\''
+ ", processInstanceId=" + processInstanceId
+ '}';
}
}

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.remote.utils;
import lombok.NonNull;
import static org.apache.dolphinscheduler.common.Constants.COLON;
import java.io.Serializable;
@ -95,7 +96,7 @@ public class Host implements Serializable {
* @param address address
* @return host
*/
public static Host of(String address) {
public static Host of(@NonNull String address) {
String[] parts = splitAddress(address);
return new Host(parts[0], Integer.parseInt(parts[1]));
}

18
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -25,9 +25,10 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -79,9 +80,6 @@ public class WorkerServer implements IStoppable {
@Autowired
private AlertClientService alertClientService;
@Autowired
private RetryReportTaskStatusThread retryReportTaskStatusThread;
@Autowired
private WorkerManagerThread workerManagerThread;
@ -99,6 +97,12 @@ public class WorkerServer implements IStoppable {
@Autowired
private WorkerRpcServer workerRpcServer;
@Autowired
private WorkerRpcClient workerRpcClient;
@Autowired
private MessageRetryRunner messageRetryRunner;
/**
* worker server startup, not use web service
*
@ -112,7 +116,7 @@ public class WorkerServer implements IStoppable {
@PostConstruct
public void run() {
this.workerRpcServer.start();
this.workerRpcClient.start();
this.taskPluginManager.installPlugin();
this.workerRegistryClient.registry();
@ -122,7 +126,7 @@ public class WorkerServer implements IStoppable {
this.workerManagerThread.start();
this.retryReportTaskStatusThread.start();
this.messageRetryRunner.start();
/*
* registry hooks, which are called before the process exits

115
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java vendored

@ -1,115 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.cache;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.remote.command.Command;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Response Cache : cache worker send master result
*/
public class ResponseCache {
private static final ResponseCache instance = new ResponseCache();
private ResponseCache() {
}
public static ResponseCache get() {
return instance;
}
private final Map<Integer, Command> runningCache = new ConcurrentHashMap<>();
private final Map<Integer, Command> responseCache = new ConcurrentHashMap<>();
private final Map<Integer,Command> recallCache = new ConcurrentHashMap<>();
/**
* cache response
*
* @param taskInstanceId taskInstanceId
* @param command command
* @param event event ACK/RESULT
*/
public void cache(Integer taskInstanceId, Command command, TaskEventType event) {
switch (event) {
case RUNNING:
runningCache.put(taskInstanceId, command);
break;
case RESULT:
responseCache.put(taskInstanceId, command);
break;
case WORKER_REJECT:
recallCache.put(taskInstanceId, command);
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
}
/**
* recall response cache
*
* @param taskInstanceId taskInstanceId
*/
public void removeRecallCache(Integer taskInstanceId) {
recallCache.remove(taskInstanceId);
}
public Map<Integer, Command> getRecallCache() {
return recallCache;
}
/**
* remove running cache
*
* @param taskInstanceId taskInstanceId
*/
public void removeRunningCache(Integer taskInstanceId) {
runningCache.remove(taskInstanceId);
}
/**
* remove response cache
*
* @param taskInstanceId taskInstanceId
*/
public void removeResponseCache(Integer taskInstanceId) {
responseCache.remove(taskInstanceId);
}
/**
* get running cache
*
* @return getAckCache
*/
public Map<Integer, Command> getRunningCache() {
return runningCache;
}
/**
* getResponseCache
*
* @return getResponseCache
*/
public Map<Integer, Command> getResponseCache() {
return responseCache;
}
}

8
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.worker.config;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import java.time.Duration;
import java.util.Set;
@ -46,6 +48,10 @@ public class WorkerConfig implements Validator {
private Set<String> groups = Sets.newHashSet("default");
private String alertListenHost = "localhost";
private int alertListenPort = 50052;
/**
* This field doesn't need to set at config file, it will be calculated by workerIp:listenPort
*/
private String workerAddress;
@Override
public boolean supports(Class<?> clazz) {
@ -64,6 +70,6 @@ public class WorkerConfig implements Validator {
if (workerConfig.getMaxCpuLoadAvg() <= 0) {
workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
}
}

139
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java

@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.message;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.BaseCommand;
import org.apache.dolphinscheduler.remote.command.CommandType;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import lombok.NonNull;
@Component
public class MessageRetryRunner extends BaseDaemonThread {
private final Logger logger = LoggerFactory.getLogger(MessageRetryRunner.class);
protected MessageRetryRunner() {
super("WorkerMessageRetryRunnerThread");
}
private static long MESSAGE_RETRY_WINDOW = Duration.ofMinutes(5L).toMillis();
@Autowired
private ApplicationContext applicationContext;
private Map<CommandType, MessageSender<BaseCommand>> messageSenderMap = new HashMap<>();
private Map<Integer, Map<CommandType, BaseCommand>> needToRetryMessages = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
Map<String, MessageSender> messageSenders = applicationContext.getBeansOfType(MessageSender.class);
messageSenders.values().forEach(messageSender -> {
messageSenderMap.put(messageSender.getMessageType(), messageSender);
logger.info("Injected message sender: {}", messageSender.getClass().getName());
});
}
@Override
public synchronized void start() {
logger.info("Message retry runner staring");
super.start();
logger.info("Message retry runner started");
}
public void addRetryMessage(int taskInstanceId, @NonNull CommandType messageType, BaseCommand baseCommand) {
needToRetryMessages.computeIfAbsent(taskInstanceId, k -> new ConcurrentHashMap<>()).put(messageType,
baseCommand);
}
public void removeRetryMessage(int taskInstanceId, @NonNull CommandType messageType) {
Map<CommandType, BaseCommand> retryMessages = needToRetryMessages.get(taskInstanceId);
if (retryMessages != null) {
retryMessages.remove(messageType);
}
}
public void removeRetryMessages(int taskInstanceId) {
needToRetryMessages.remove(taskInstanceId);
}
public void updateMessageHost(int taskInstanceId, String messageReceiverHost) {
Map<CommandType, BaseCommand> needToRetryMessages = this.needToRetryMessages.get(taskInstanceId);
if (needToRetryMessages != null) {
needToRetryMessages.values().forEach(baseMessage -> {
baseMessage.setMessageReceiverAddress(messageReceiverHost);
});
}
}
public void run() {
while (Stopper.isRunning()) {
try {
if (needToRetryMessages.isEmpty()) {
Thread.sleep(MESSAGE_RETRY_WINDOW);
}
long now = System.currentTimeMillis();
for (Map.Entry<Integer, Map<CommandType, BaseCommand>> taskEntry : needToRetryMessages.entrySet()) {
Integer taskInstanceId = taskEntry.getKey();
LoggerUtils.setTaskInstanceIdMDC(taskInstanceId);
try {
for (Map.Entry<CommandType, BaseCommand> messageEntry : taskEntry.getValue().entrySet()) {
CommandType messageType = messageEntry.getKey();
BaseCommand message = messageEntry.getValue();
if (now - message.getMessageSendTime() > MESSAGE_RETRY_WINDOW) {
logger.info("Begin retry send message to master, message: {}", message);
message.setMessageSendTime(now);
messageSenderMap.get(messageType).sendMessage(message);
logger.info("Success send message to master, message: {}", message);
}
}
} catch (Exception e) {
logger.warn("Retry send message to master error", e);
} finally {
LoggerUtils.removeTaskInstanceIdMDC();
}
}
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (InterruptedException instance) {
logger.warn("The message retry thread is interrupted, will break this loop", instance);
Thread.currentThread().interrupt();
break;
} catch (Exception ex) {
logger.error("Retry send message failed, get an known exception.", ex);
}
}
}
}

43
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageSender.java

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.message;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.BaseCommand;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
public interface MessageSender<T extends BaseCommand> {
/**
* Send the message
*
* @throws RemotingException Cannot connect to the target host.
*/
void sendMessage(T message) throws RemotingException;
/**
* Build the message from task context and message received address.
*/
T buildMessage(TaskExecutionContext taskExecutionContext, String messageReceiverAddress);
/**
* The message type can be sent by this sender.
*/
CommandType getMessageType();
}

70
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteResultMessageSender.java

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.message;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TaskExecuteResultMessageSender implements MessageSender<TaskExecuteResultCommand> {
@Autowired
private WorkerConfig workerConfig;
@Autowired
private WorkerRpcClient workerRpcClient;
@Override
public void sendMessage(TaskExecuteResultCommand message) throws RemotingException {
workerRpcClient.send(Host.of(message.getMessageReceiverAddress()), message.convert2Command());
}
public TaskExecuteResultCommand buildMessage(TaskExecutionContext taskExecutionContext,
String messageReceiverAddress) {
TaskExecuteResultCommand taskExecuteResultMessage
= new TaskExecuteResultCommand(workerConfig.getWorkerAddress(),
messageReceiverAddress,
System.currentTimeMillis());
taskExecuteResultMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
taskExecuteResultMessage.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecuteResultMessage.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
taskExecuteResultMessage.setLogPath(taskExecutionContext.getLogPath());
taskExecuteResultMessage.setExecutePath(taskExecutionContext.getExecutePath());
taskExecuteResultMessage.setAppIds(taskExecutionContext.getAppIds());
taskExecuteResultMessage.setProcessId(taskExecutionContext.getProcessId());
taskExecuteResultMessage.setHost(taskExecutionContext.getHost());
taskExecuteResultMessage.setStartTime(taskExecutionContext.getStartTime());
taskExecuteResultMessage.setEndTime(taskExecutionContext.getEndTime());
taskExecuteResultMessage.setVarPool(taskExecutionContext.getVarPool());
taskExecuteResultMessage.setExecutePath(taskExecutionContext.getExecutePath());
return taskExecuteResultMessage;
}
@Override
public CommandType getMessageType() {
return CommandType.TASK_EXECUTE_RESULT;
}
}

67
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteRunningMessageSender.java

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.message;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import lombok.NonNull;
@Component
public class TaskExecuteRunningMessageSender implements MessageSender<TaskExecuteRunningCommand> {
@Autowired
private WorkerRpcClient workerRpcClient;
@Autowired
private WorkerConfig workerConfig;
@Override
public void sendMessage(TaskExecuteRunningCommand message) throws RemotingException {
workerRpcClient.send(Host.of(message.getMessageReceiverAddress()), message.convert2Command());
}
public TaskExecuteRunningCommand buildMessage(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull String messageReceiverAddress) {
TaskExecuteRunningCommand taskExecuteRunningMessage
= new TaskExecuteRunningCommand(workerConfig.getWorkerAddress(),
messageReceiverAddress,
System.currentTimeMillis());
taskExecuteRunningMessage.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecuteRunningMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
taskExecuteRunningMessage.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
taskExecuteRunningMessage.setLogPath(taskExecutionContext.getLogPath());
taskExecuteRunningMessage.setHost(taskExecutionContext.getHost());
taskExecuteRunningMessage.setStartTime(taskExecutionContext.getStartTime());
taskExecuteRunningMessage.setExecutePath(taskExecutionContext.getExecutePath());
return taskExecuteRunningMessage;
}
@Override
public CommandType getMessageType() {
return CommandType.TASK_EXECUTE_RUNNING;
}
}

59
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.message;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TaskRejectMessageSender implements MessageSender<TaskRejectCommand> {
@Autowired
private WorkerRpcClient workerRpcClient;
@Autowired
private WorkerConfig workerConfig;
@Override
public void sendMessage(TaskRejectCommand message) throws RemotingException {
workerRpcClient.send(Host.of(message.getMessageReceiverAddress()), message.convert2Command());
}
public TaskRejectCommand buildMessage(TaskExecutionContext taskExecutionContext, String masterAddress) {
TaskRejectCommand taskRejectMessage = new TaskRejectCommand(workerConfig.getWorkerAddress(),
masterAddress,
System.currentTimeMillis());
taskRejectMessage.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskRejectMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
taskRejectMessage.setHost(taskExecutionContext.getHost());
return taskRejectMessage;
}
@Override
public CommandType getMessageType() {
return CommandType.TASK_REJECT;
}
}

13
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java

@ -21,8 +21,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,22 +42,19 @@ public class HostUpdateProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(HostUpdateProcessor.class);
/**
* task callback service
*/
@Autowired
private TaskCallbackService taskCallbackService;
private MessageRetryRunner messageRetryRunner;
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
HostUpdateCommand updateCommand = JSONUtils.parseObject(command.getBody(), HostUpdateCommand.class);
if (updateCommand == null) {
logger.error("host update command is null");
return;
}
logger.info("received host update command : {}", updateCommand);
taskCallbackService.changeRemoteChannel(updateCommand.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));
messageRetryRunner.updateMessageHost(updateCommand.getTaskInstanceId(), updateCommand.getProcessHost());
}
}

276
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -1,276 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
/**
* task callback service
*/
@Service
public class TaskCallbackService {
private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class);
private static final int[] RETRY_BACKOFF = {1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200, 200};
@Autowired
private TaskExecuteRunningAckProcessor taskExecuteRunningProcessor;
@Autowired
private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
/**
* remote channels
*/
private static final ConcurrentHashMap<Integer, NettyRemoteChannel> REMOTE_CHANNELS = new ConcurrentHashMap<>();
/**
* netty remoting client
*/
private final NettyRemotingClient nettyRemotingClient;
public TaskCallbackService() {
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
}
/**
* add callback channel
*
* @param taskInstanceId taskInstanceId
* @param channel channel
*/
public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) {
REMOTE_CHANNELS.put(taskInstanceId, channel);
}
/**
* change remote channel
*/
public void changeRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) {
REMOTE_CHANNELS.put(taskInstanceId, channel);
}
/**
* get callback channel
*
* @param taskInstanceId taskInstanceId
* @return callback channel
*/
private Optional<NettyRemoteChannel> getRemoteChannel(int taskInstanceId) {
Channel newChannel;
NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
if (nettyRemoteChannel != null) {
if (nettyRemoteChannel.isActive()) {
return Optional.of(nettyRemoteChannel);
}
newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
if (newChannel != null) {
return Optional.of(getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId));
}
}
return Optional.empty();
}
public long pause(int ntries) {
return SLEEP_TIME_MILLIS * RETRY_BACKOFF[ntries % RETRY_BACKOFF.length];
}
private NettyRemoteChannel getRemoteChannel(Channel newChannel, long opaque, int taskInstanceId) {
NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, opaque);
addRemoteChannel(taskInstanceId, remoteChannel);
return remoteChannel;
}
private NettyRemoteChannel getRemoteChannel(Channel newChannel, int taskInstanceId) {
NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel);
addRemoteChannel(taskInstanceId, remoteChannel);
return remoteChannel;
}
/**
* remove callback channels
*
* @param taskInstanceId taskInstanceId
*/
public static void remove(int taskInstanceId) {
REMOTE_CHANNELS.remove(taskInstanceId);
}
/**
* send result
*
* @param taskInstanceId taskInstanceId
* @param command command
*/
public void send(int taskInstanceId, Command command) {
Optional<NettyRemoteChannel> nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if (nettyRemoteChannel.isPresent()) {
nettyRemoteChannel.get().writeAndFlush(command).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
logger.error("Send callback command error, taskInstanceId: {}, command: {}", taskInstanceId, command);
}
}
});
} else {
logger.warn("Remote channel of taskInstanceId is null: {}, cannot send command: {}", taskInstanceId, command);
}
}
/**
* build task execute running command
*
* @param taskExecutionContext taskExecutionContext
* @return TaskExecuteAckCommand
*/
private TaskExecuteRunningCommand buildTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteRunningCommand command = new TaskExecuteRunningCommand();
command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
command.setLogPath(taskExecutionContext.getLogPath());
command.setHost(taskExecutionContext.getHost());
command.setStartTime(taskExecutionContext.getStartTime());
command.setExecutePath(taskExecutionContext.getExecutePath());
return command;
}
/**
* build task execute response command
*
* @param taskExecutionContext taskExecutionContext
* @return TaskExecuteResponseCommand
*/
private TaskExecuteResponseCommand buildTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteResponseCommand command = new TaskExecuteResponseCommand();
command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
command.setLogPath(taskExecutionContext.getLogPath());
command.setExecutePath(taskExecutionContext.getExecutePath());
command.setAppIds(taskExecutionContext.getAppIds());
command.setProcessId(taskExecutionContext.getProcessId());
command.setHost(taskExecutionContext.getHost());
command.setStartTime(taskExecutionContext.getStartTime());
command.setEndTime(taskExecutionContext.getEndTime());
command.setVarPool(taskExecutionContext.getVarPool());
command.setExecutePath(taskExecutionContext.getExecutePath());
return command;
}
/**
* build TaskKillResponseCommand
*
* @param taskExecutionContext taskExecutionContext
* @return build TaskKillResponseCommand
*/
private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext) {
TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskKillResponseCommand.setHost(taskExecutionContext.getHost());
taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
return taskKillResponseCommand;
}
private TaskRecallCommand buildRecallCommand(TaskExecutionContext taskExecutionContext) {
TaskRecallCommand taskRecallCommand = new TaskRecallCommand();
taskRecallCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskRecallCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
taskRecallCommand.setHost(taskExecutionContext.getHost());
return taskRecallCommand;
}
/**
* send task execute running command
* todo unified callback command
*/
public void sendTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext);
// add response cache
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), TaskEventType.RUNNING);
send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
}
/**
* send task execute delay command
* todo unified callback command
*/
public void sendTaskExecuteDelayCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext);
send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
}
/**
* send task execute response command
* todo unified callback command
*/
public void sendTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteResponseCommand command = buildTaskExecuteResponseCommand(taskExecutionContext);
// add response cache
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), TaskEventType.RESULT);
send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
}
public void sendTaskKillResponseCommand(TaskExecutionContext taskExecutionContext) {
TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext);
send(taskExecutionContext.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
}
/**
* send task execute response command
*/
public void sendRecallCommand(TaskExecutionContext taskExecutionContext) {
TaskRecallCommand taskRecallCommand = buildRecallCommand(taskExecutionContext);
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command(), TaskEventType.WORKER_REJECT);
send(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command());
}
}

78
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java → dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java

@ -24,19 +24,18 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
@ -58,16 +57,13 @@ import io.micrometer.core.annotation.Timed;
import io.netty.channel.Channel;
/**
* worker request processor
* Used to handle {@link CommandType#TASK_DISPATCH_REQUEST}
*/
@Component
public class TaskExecuteProcessor implements NettyRequestProcessor {
public class TaskDispatchProcessor implements NettyRequestProcessor {
private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
private static final Logger logger = LoggerFactory.getLogger(TaskDispatchProcessor.class);
/**
* worker config
*/
@Autowired
private WorkerConfig workerConfig;
@ -75,7 +71,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
* task callback service
*/
@Autowired
private TaskCallbackService taskCallbackService;
private WorkerMessageSender workerMessageSender;
/**
* alert client service
@ -99,26 +95,27 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
@Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
Preconditions.checkArgument(CommandType.TASK_DISPATCH_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject(
command.getBody(), TaskExecuteRequestCommand.class);
TaskDispatchCommand taskDispatchCommand = JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class);
if (taskRequestCommand == null) {
logger.error("task execute request command is null");
if (taskDispatchCommand == null) {
logger.error("task execute request command content is null");
return;
}
logger.info("task execute request command : {}", taskRequestCommand);
final String masterAddress = taskDispatchCommand.getMessageSenderAddress();
logger.info("task execute request message: {}", taskDispatchCommand);
TaskExecutionContext taskExecutionContext = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext();
if (taskExecutionContext == null) {
logger.error("task execution context is null");
return;
}
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
@ -127,7 +124,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
// todo custom logger
taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
taskExecutionContext.setHost(workerConfig.getWorkerAddress());
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
@ -148,11 +145,14 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
// check if the OS user exists
if (!osUserExistFlag) {
logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
taskExecutionContext.getTenantCode(), taskExecutionContext.getTaskInstanceId());
taskExecutionContext.getTenantCode(),
taskExecutionContext.getTaskInstanceId());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(new Date());
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
return;
}
@ -164,33 +164,43 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
try {
FileUtils.createWorkDirIfAbsent(execLocalPath);
} catch (Throwable ex) {
logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", execLocalPath, taskExecutionContext.getTaskInstanceId());
logger.error("create executeLocalPath fail", ex);
logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}",
execLocalPath,
taskExecutionContext.getTaskInstanceId(),
ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
return;
}
}
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));
// delay task process
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L);
if (remainTime > 0) {
logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime);
logger.info("delay the execution of task instance {}, delay time: {} s",
taskExecutionContext.getTaskInstanceId(),
remainTime);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
taskExecutionContext.setStartTime(null);
taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
workerMessageSender.sendMessage(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
}
// submit task to manager
boolean offer = workerManager.offer(
new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager, storageOperate));
boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext,
masterAddress,
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate));
if (!offer) {
logger.warn("submit task to wait queue error, queue is full, queue size is {}, taskInstanceId: {}",
workerManager.getWaitSubmitQueueSize(), taskExecutionContext.getTaskInstanceId());
taskCallbackService.sendRecallCommand(taskExecutionContext);
workerManager.getWaitSubmitQueueSize(),
taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_REJECT);
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();

71
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java

@ -1,71 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**
* task execute running ack, from master to worker
*/
@Component
public class TaskExecuteResponseAckProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskExecuteResponseAckProcessor.class);
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE_ACK == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = JSONUtils.parseObject(
command.getBody(), TaskExecuteResponseAckCommand.class);
if (taskExecuteResponseAckCommand == null) {
logger.error("task execute response ack command is null");
return;
}
logger.info("task execute response ack command : {}", taskExecuteResponseAckCommand);
if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
ResponseCache.get().removeResponseCache(taskExecuteResponseAckCommand.getTaskInstanceId());
TaskCallbackService.remove(taskExecuteResponseAckCommand.getTaskInstanceId());
logger.debug("remove REMOTE_CHANNELS, task instance id:{}",
taskExecuteResponseAckCommand.getTaskInstanceId());
} else if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.FAILURE.getCode()) {
// master handle worker response error, will still retry
} else {
throw new IllegalArgumentException("Invalid task execute response ack status: "
+ taskExecuteResponseAckCommand.getStatus());
}
}
}

83
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**
* task execute running ack, from master to worker
*/
@Component
public class TaskExecuteResultAckProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskExecuteResultAckProcessor.class);
@Autowired
private MessageRetryRunner messageRetryRunner;
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT_ACK == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskExecuteAckCommand taskExecuteAckMessage = JSONUtils.parseObject(command.getBody(),
TaskExecuteAckCommand.class);
if (taskExecuteAckMessage == null) {
logger.error("task execute response ack command is null");
return;
}
logger.info("task execute response ack command : {}", taskExecuteAckMessage);
try {
LoggerUtils.setTaskInstanceIdMDC(taskExecuteAckMessage.getTaskInstanceId());
if (taskExecuteAckMessage.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
messageRetryRunner.removeRetryMessage(taskExecuteAckMessage.getTaskInstanceId(),
CommandType.TASK_EXECUTE_RESULT);
logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskExecuteAckMessage.getTaskInstanceId());
} else if (taskExecuteAckMessage.getStatus() == ExecutionStatus.FAILURE.getCode()) {
// master handle worker response error, will still retry
logger.error("Receive task execute result ack message, the message status is not success, message: {}",
taskExecuteAckMessage);
} else {
throw new IllegalArgumentException("Invalid task execute response ack status: "
+ taskExecuteAckMessage.getStatus());
}
} finally {
LoggerUtils.removeTaskInstanceIdMDC();
}
}
}

17
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java

@ -22,12 +22,13 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@ -42,13 +43,16 @@ public class TaskExecuteRunningAckProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskExecuteRunningAckProcessor.class);
@Autowired
private MessageRetryRunner messageRetryRunner;
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING_ACK == command.getType(),
String.format("invalid command type : %s", command.getType()));
String.format("invalid command type : %s", command.getType()));
TaskExecuteRunningAckCommand runningAckCommand = JSONUtils.parseObject(
command.getBody(), TaskExecuteRunningAckCommand.class);
TaskExecuteRunningAckMessage runningAckCommand = JSONUtils.parseObject(command.getBody(),
TaskExecuteRunningAckMessage.class);
if (runningAckCommand == null) {
logger.error("task execute running ack command is null");
return;
@ -58,7 +62,8 @@ public class TaskExecuteRunningAckProcessor implements NettyRequestProcessor {
logger.info("task execute running ack command : {}", runningAckCommand);
if (runningAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
ResponseCache.get().removeRunningCache(runningAckCommand.getTaskInstanceId());
messageRetryRunner.removeRetryMessage(runningAckCommand.getTaskInstanceId(),
CommandType.TASK_EXECUTE_RUNNING);
}
} finally {
LoggerUtils.removeTaskInstanceIdMDC();

50
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -28,15 +28,17 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.log.LogClientService;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -47,7 +49,10 @@ import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
/**
* task kill processor
@ -57,18 +62,15 @@ public class TaskKillProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class);
/**
* task callback service
*/
@Autowired
private TaskCallbackService taskCallbackService;
/**
* task execute manager
*/
@Autowired
private WorkerManagerThread workerManager;
@Autowired
private MessageRetryRunner messageRetryRunner;
/**
* task kill process
*
@ -77,7 +79,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
*/
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class);
if (killCommand == null) {
logger.error("task kill request command is null");
@ -86,7 +89,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
logger.info("task kill command : {}", killCommand);
int taskInstanceId = killCommand.getTaskInstanceId();
TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
TaskExecutionContext taskExecutionContext
= TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) {
logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId());
return;
@ -96,25 +100,43 @@ public class TaskKillProcessor implements NettyRequestProcessor {
if (processId == 0) {
this.cancelApplication(taskInstanceId);
workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.KILL);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
sendTaskKillResponseCommand(channel, taskExecutionContext);
logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
return;
}
Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
taskExecutionContext.setCurrentExecutionStatus(result.getLeft() ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE);
taskExecutionContext.setCurrentExecutionStatus(
result.getLeft() ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE);
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight()));
sendTaskKillResponseCommand(channel, taskExecutionContext);
taskCallbackService.sendTaskKillResponseCommand(taskExecutionContext);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
}
private void sendTaskKillResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) {
TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskKillResponseCommand.setHost(taskExecutionContext.getHost());
taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
channel.writeAndFlush(taskKillResponseCommand.convert2Command()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
logger.error("Submit kill response to master error, kill command: {}", taskKillResponseCommand);
}
}
});
}
/**
* do kill
*

40
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java → dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java

@ -18,15 +18,17 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskRejectAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@ -34,25 +36,35 @@ import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
@Component
public class TaskRecallAckProcessor implements NettyRequestProcessor {
public class TaskRejectAckProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskRecallAckProcessor.class);
private final Logger logger = LoggerFactory.getLogger(TaskRejectAckProcessor.class);
@Autowired
private MessageRetryRunner messageRetryRunner;
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_RECALL_ACK == command.getType(),
String.format("invalid command type : %s", command.getType()));
Preconditions.checkArgument(CommandType.TASK_REJECT_ACK == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskRecallAckCommand taskRecallAckCommand = JSONUtils.parseObject(command.getBody(), TaskRecallAckCommand.class);
if (taskRecallAckCommand == null) {
TaskRejectAckCommand taskRejectAckMessage = JSONUtils.parseObject(command.getBody(),
TaskRejectAckCommand.class);
if (taskRejectAckMessage == null) {
return;
}
if (taskRecallAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
ResponseCache.get().removeRecallCache(taskRecallAckCommand.getTaskInstanceId());
logger.debug("removeRecallCache: task instance id:{}", taskRecallAckCommand.getTaskInstanceId());
TaskCallbackService.remove(taskRecallAckCommand.getTaskInstanceId());
logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskRecallAckCommand.getTaskInstanceId());
try {
LoggerUtils.setTaskInstanceIdMDC(taskRejectAckMessage.getTaskInstanceId());
if (taskRejectAckMessage.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
messageRetryRunner.removeRetryMessage(taskRejectAckMessage.getTaskInstanceId(),
CommandType.TASK_REJECT);
logger.debug("removeRecallCache: task instance id:{}", taskRejectAckMessage.getTaskInstanceId());
} else {
logger.error("Receive task reject ack message, the message status is not success, message: {}",
taskRejectAckMessage);
}
} finally {
LoggerUtils.removeTaskInstanceIdMDC();
}
}
}

92
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.rpc;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.BaseCommand;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.message.MessageSender;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import lombok.NonNull;
@Component
public class WorkerMessageSender {
private final Logger logger = LoggerFactory.getLogger(WorkerMessageSender.class);
@Autowired
private MessageRetryRunner messageRetryRunner;
@Autowired
private ApplicationContext applicationContext;
private Map<CommandType, MessageSender> messageSenderMap = new HashMap<>();
@PostConstruct
public void init() {
Map<String, MessageSender> messageSenders = applicationContext.getBeansOfType(MessageSender.class);
messageSenders.values().forEach(messageSender -> messageSenderMap.put(messageSender.getMessageType(),
messageSender));
}
// todo: use message rather than context
public void sendMessageWithRetry(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull String messageReceiverAddress,
@NonNull CommandType messageType) {
MessageSender messageSender = messageSenderMap.get(messageType);
if (messageSender == null) {
throw new IllegalArgumentException("The messageType is invalidated, messageType: " + messageType);
}
BaseCommand baseCommand = messageSender.buildMessage(taskExecutionContext, messageReceiverAddress);
try {
messageRetryRunner.addRetryMessage(taskExecutionContext.getTaskInstanceId(), messageType, baseCommand);
messageSender.sendMessage(baseCommand);
} catch (RemotingException e) {
logger.error("Send message error, messageType: {}, message: {}", messageType, baseCommand);
}
}
public void sendMessage(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull String messageReceiverAddress,
@NonNull CommandType messageType) {
MessageSender messageSender = messageSenderMap.get(messageType);
if (messageSender == null) {
throw new IllegalArgumentException("The messageType is invalidated, messageType: " + messageType);
}
BaseCommand baseCommand = messageSender.buildMessage(taskExecutionContext, messageReceiverAddress);
try {
messageSender.sendMessage(baseCommand);
} catch (RemotingException e) {
logger.error("Send message error, messageType: {}, message: {}", messageType, baseCommand);
}
}
}

75
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.rpc;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResultAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* This rpc client is only used to send message, will not receive message, all response message should send to {@link WorkerRpcServer}.
*/
@Component
public class WorkerRpcClient implements AutoCloseable {
private final Logger logger = LoggerFactory.getLogger(WorkerRpcClient.class);
@Autowired
private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
@Autowired
private TaskExecuteResultAckProcessor taskExecuteResultAckProcessor;
@Autowired
private TaskRejectAckProcessor taskRejectAckProcessor;
private NettyRemotingClient nettyRemotingClient;
public void start() {
logger.info("Worker rpc client starting");
NettyClientConfig nettyClientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(nettyClientConfig);
// we only use the client to handle the ack message, we can optimize this, send ack to the nettyServer.
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,
taskExecuteRunningAckProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor);
logger.info("Worker rpc client started");
}
public void send(Host host, Command command) throws RemotingException {
nettyRemotingClient.send(host, command);
}
public void close() {
logger.info("Worker rpc client closing");
nettyRemotingClient.close();
logger.info("Worker rpc client closed");
}
}

18
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java

@ -23,11 +23,11 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResultAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskRecallAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
import java.io.Closeable;
@ -42,19 +42,19 @@ public class WorkerRpcServer implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRpcServer.class);
@Autowired
private TaskExecuteProcessor taskExecuteProcessor;
private TaskDispatchProcessor taskDispatchProcessor;
@Autowired
private TaskKillProcessor taskKillProcessor;
@Autowired
private TaskRecallAckProcessor taskRecallAckProcessor;
private TaskRejectAckProcessor taskRejectAckProcessor;
@Autowired
private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
@Autowired
private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
private TaskExecuteResultAckProcessor taskExecuteResultAckProcessor;
@Autowired
private HostUpdateProcessor hostUpdateProcessor;
@ -72,12 +72,12 @@ public class WorkerRpcServer implements Closeable {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_DISPATCH_REQUEST, taskDispatchProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL_ACK, taskRecallAckProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);

134
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java

@ -1,134 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* Retry Report Task Status Thread
*/
@Component
public class RetryReportTaskStatusThread extends BaseDaemonThread {
private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class);
/**
* every 5 minutes
*/
private static long RETRY_REPORT_TASK_STATUS_INTERVAL = 5 * 60 * 1000L;
@Autowired
private TaskCallbackService taskCallbackService;
protected RetryReportTaskStatusThread() {
super("RetryReportTaskStatusThread");
}
@Override
public synchronized void start() {
logger.info("Retry report task status thread starting");
super.start();
logger.info("Retry report task status thread started");
}
/**
* retry ack/response
*/
@Override
public void run() {
final ResponseCache instance = ResponseCache.get();
while (Stopper.isRunning()) {
// sleep 5 minutes
ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);
try {
// todo: Only retry the send failed command
retryRunningCommand(instance);
retryResponseCommand(instance);
retryRecallCommand(instance);
} catch (Exception e) {
logger.warn("Retry report task status error", e);
}
}
}
private void retryRunningCommand(ResponseCache instance) {
if (!instance.getRunningCache().isEmpty()) {
Map<Integer, Command> runningCache = instance.getRunningCache();
logger.info("Send task running retry command starting, waiting to retry size: {}", runningCache.size());
for (Map.Entry<Integer, Command> entry : runningCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command runningCommand = entry.getValue();
try {
taskCallbackService.send(taskInstanceId, runningCommand);
} catch (Exception ex) {
logger.error("Retry send running command to master error, taskInstanceId: {}, command: {}", taskInstanceId, runningCommand);
}
}
logger.info("Send task running retry command finished, waiting to retry size: {}", runningCache.size());
}
}
private void retryResponseCommand(ResponseCache instance) {
Map<Integer, Command> responseCache = instance.getResponseCache();
if (!responseCache.isEmpty()) {
logger.info("Send task response retry command starting, waiting to retry size: {}", responseCache.size());
for (Map.Entry<Integer, Command> entry : responseCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command responseCommand = entry.getValue();
try {
taskCallbackService.send(taskInstanceId, responseCommand);
} catch (Exception ex) {
logger.error("Retry send response command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
}
}
logger.info("Send task response retry command finished, waiting to retry size: {}", responseCache.size());
}
}
private void retryRecallCommand(ResponseCache instance) {
Map<Integer, Command> recallCache = instance.getRecallCache();
if (!recallCache.isEmpty()) {
logger.info("Send task recall retry command starting, waiting to retry size: {}", recallCache.size());
for (Map.Entry<Integer, Command> entry : recallCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command responseCommand = entry.getValue();
try {
taskCallbackService.send(taskInstanceId, responseCommand);
} catch (Exception ex) {
logger.error("Retry send recall command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
}
}
logger.info("Send task recall retry command finished, waiting to retry size: {}", recallCache.size());
}
}
}

85
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -33,11 +33,11 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
@ -53,18 +53,18 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
import lombok.NonNull;
/**
* task scheduler thread
*/
@ -78,9 +78,11 @@ public class TaskExecuteThread implements Runnable, Delayed {
/**
* task instance
*/
private TaskExecutionContext taskExecutionContext;
private final TaskExecutionContext taskExecutionContext;
private final String masterAddress;
private StorageOperate storageOperate;
private final StorageOperate storageOperate;
/**
* abstract task
@ -90,12 +92,12 @@ public class TaskExecuteThread implements Runnable, Delayed {
/**
* task callback service
*/
private TaskCallbackService taskCallbackService;
private final WorkerMessageSender workerMessageSender;
/**
* alert client server
*/
private AlertClientService alertClientService;
private final AlertClientService alertClientService;
private TaskPluginManager taskPluginManager;
@ -103,25 +105,29 @@ public class TaskExecuteThread implements Runnable, Delayed {
* constructor
*
* @param taskExecutionContext taskExecutionContext
* @param taskCallbackService taskCallbackService
* @param workerMessageSender used for worker send message to master
*/
public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
TaskCallbackService taskCallbackService,
AlertClientService alertClientService,
public TaskExecuteThread(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull String masterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
this.taskCallbackService = taskCallbackService;
this.masterAddress = masterAddress;
this.workerMessageSender = workerMessageSender;
this.alertClientService = alertClientService;
this.storageOperate = storageOperate;
}
public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
TaskCallbackService taskCallbackService,
AlertClientService alertClientService,
TaskPluginManager taskPluginManager,
public TaskExecuteThread(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull String masterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
this.taskCallbackService = taskCallbackService;
this.masterAddress = masterAddress;
this.workerMessageSender = workerMessageSender;
this.alertClientService = alertClientService;
this.taskPluginManager = taskPluginManager;
this.storageOperate = storageOperate;
@ -129,18 +135,26 @@ public class TaskExecuteThread implements Runnable, Delayed {
@Override
public void run() {
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
logger.info("[WorkflowInstance-{}][TaskInstance-{}] Task dry run success",
taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
return;
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
logger.info("Task dry run success");
return;
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
logger.info("script path : {}", taskExecutionContext.getExecutePath());
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
@ -149,10 +163,13 @@ public class TaskExecuteThread implements Runnable, Delayed {
// callback task execute running
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext);
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RUNNING);
// copy hdfs/minio file to local
List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources());
List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(),
taskExecutionContext.getResources());
if (!fileDownloads.isEmpty()) {
downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
}
@ -160,8 +177,8 @@ public class TaskExecuteThread implements Runnable, Delayed {
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setTaskAppId(String.format("%s_%s",
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
if (null == taskChannel) {
@ -208,7 +225,9 @@ public class TaskExecuteThread implements Runnable, Delayed {
taskExecutionContext.setAppIds(this.task.getAppIds());
} finally {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
clearTaskExecPath();
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}

6
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java

@ -68,8 +68,10 @@ public class WorkerExecService {
@Override
public void onFailure(Throwable throwable) {
logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}", taskExecuteThread.getTaskExecutionContext().getProcessInstanceId()
, taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), throwable);
logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}",
taskExecuteThread.getTaskExecutionContext().getProcessInstanceId(),
taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
throwable);
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
}
};

19
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

@ -21,12 +21,9 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@ -62,7 +59,7 @@ public class WorkerManagerThread implements Runnable {
* task callback service
*/
@Autowired
private TaskCallbackService taskCallbackService;
private WorkerMessageSender workerMessageSender;
private volatile int workerExecThreads;
@ -110,20 +107,8 @@ public class WorkerManagerThread implements Runnable {
waitSubmitQueue.stream()
.filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == taskInstanceId)
.forEach(waitSubmitQueue::remove);
sendTaskKillResponse(taskInstanceId);
}
/**
* kill task before execute , like delay task
*/
private void sendTaskKillResponse(Integer taskInstanceId) {
TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) {
return;
}
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.KILL);
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
}
/**
* submit task

92
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java → dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java

@ -25,11 +25,12 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
@ -48,19 +49,20 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* test task execute processor
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class,
JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
@PrepareForTest({SpringApplicationContext.class, WorkerConfig.class, FileUtils.class, JsonSerializer.class,
JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
@Ignore
public class TaskExecuteProcessorTest {
public class TaskDispatchProcessorTest {
private TaskExecutionContext taskExecutionContext;
private TaskCallbackService taskCallbackService;
private WorkerMessageSender workerMessageSender;
private ExecutorService workerExecService;
@ -72,7 +74,7 @@ public class TaskExecuteProcessorTest {
private Command ackCommand;
private TaskExecuteRequestCommand taskRequestCommand;
private TaskDispatchCommand taskRequestCommand;
private AlertClientService alertClientService;
@ -86,66 +88,73 @@ public class TaskExecuteProcessorTest {
workerConfig.setExecThreads(1);
workerConfig.setListenPort(1234);
command = new Command();
command.setType(CommandType.TASK_EXECUTE_REQUEST);
ackCommand = new TaskExecuteRunningCommand().convert2Command();
taskRequestCommand = new TaskExecuteRequestCommand(taskExecutionContext);
command.setType(CommandType.TASK_DISPATCH_REQUEST);
ackCommand = new TaskExecuteRunningCommand("127.0.0.1:1234",
"127.0.0.1:5678",
System.currentTimeMillis()).convert2Command();
taskRequestCommand = new TaskDispatchCommand(taskExecutionContext,
"127.0.0.1:5678",
"127.0.0.1:1234",
System.currentTimeMillis());
alertClientService = PowerMockito.mock(AlertClientService.class);
workerExecService = PowerMockito.mock(ExecutorService.class);
PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class)))
.thenReturn(null);
PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class))).thenReturn(null);
PowerMockito.mockStatic(ChannelUtils.class);
PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null);
taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
PowerMockito.doNothing().when(taskCallbackService).send(taskExecutionContext.getTaskInstanceId(), ackCommand);
workerMessageSender = PowerMockito.mock(WorkerMessageSender.class);
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
.thenReturn(taskCallbackService);
PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class))
.thenReturn(workerConfig);
PowerMockito.when(SpringApplicationContext.getBean(WorkerMessageSender.class)).thenReturn(workerMessageSender);
PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)).thenReturn(workerConfig);
workerManager = PowerMockito.mock(WorkerManagerThread.class);
storageOperate = PowerMockito.mock(StorageOperate.class);
PowerMockito.when(
workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, storageOperate)))
.thenReturn(Boolean.TRUE);
PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext,
"127.0.0.1:5678",
workerMessageSender,
alertClientService,
storageOperate))).thenReturn(Boolean.TRUE);
PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class))
.thenReturn(workerManager);
PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class)).thenReturn(workerManager);
PowerMockito.mockStatic(ThreadUtils.class);
PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()))
.thenReturn(workerExecService);
PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread",
workerConfig.getExecThreads())).thenReturn(
workerExecService);
PowerMockito.mockStatic(JsonSerializer.class);
PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskExecuteRequestCommand.class))
.thenReturn(taskRequestCommand);
PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskDispatchCommand.class)).thenReturn(
taskRequestCommand);
PowerMockito.mockStatic(JSONUtils.class);
PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class))
.thenReturn(taskRequestCommand);
PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class)).thenReturn(
taskRequestCommand);
PowerMockito.mockStatic(FileUtils.class);
PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()))
.thenReturn(taskExecutionContext.getExecutePath());
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId())).thenReturn(
taskExecutionContext.getExecutePath());
PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());
SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(
null, null, null, alertClientService, storageOperate);
PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments()
.thenReturn(simpleTaskExecuteThread);
SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(new TaskExecutionContext(),
workerMessageSender,
"127.0.0.1:5678",
LoggerFactory.getLogger(
TaskDispatchProcessorTest.class),
alertClientService,
storageOperate);
PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments().thenReturn(simpleTaskExecuteThread);
}
@Test
public void testNormalExecution() {
TaskExecuteProcessor processor = new TaskExecuteProcessor();
TaskDispatchProcessor processor = new TaskDispatchProcessor();
processor.process(null, command);
Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
@ -154,7 +163,7 @@ public class TaskExecuteProcessorTest {
@Test
public void testDelayExecution() {
taskExecutionContext.setDelayTime(1);
TaskExecuteProcessor processor = new TaskExecuteProcessor();
TaskDispatchProcessor processor = new TaskDispatchProcessor();
processor.process(null, command);
Assert.assertEquals(ExecutionStatus.DELAY_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
@ -179,11 +188,12 @@ public class TaskExecuteProcessorTest {
private static class SimpleTaskExecuteThread extends TaskExecuteThread {
public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext,
TaskCallbackService taskCallbackService,
WorkerMessageSender workerMessageSender,
String masterAddress,
Logger taskLogger,
AlertClientService alertClientService,
StorageOperate storageOperate) {
super(taskExecutionContext, taskCallbackService, alertClientService, storageOperate);
super(taskExecutionContext, masterAddress, workerMessageSender, alertClientService, storageOperate);
}
@Override

12
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java

@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClientTest;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
@ -48,7 +48,7 @@ public class TaskExecuteThreadTest {
private TaskExecutionContext taskExecutionContext;
@Mock
private TaskCallbackService taskCallbackService;
private WorkerMessageSender workerMessageSender;
@Mock
private AlertClientService alertClientService;
@ -61,8 +61,12 @@ public class TaskExecuteThreadTest {
@Test
public void checkTest() {
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService,
alertClientService, taskPluginManager, storageOperate);
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext,
"127.0.0.1:5678",
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate);
String path = "/";
Map<String, String> projectRes = new HashMap<>();

Loading…
Cancel
Save