Browse Source

Kill task when we do master failover to avoid task lost (#10997)

(cherry picked from commit 998e4d74dd)
3.0.0/version-upgrade
Wenjun Ruan 2 years ago
parent
commit
17f0be5bfb
  1. 21
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
  2. 6
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
  3. 22
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java

21
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java

@ -28,8 +28,12 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
@ -63,13 +67,17 @@ public class MasterFailoverService {
private final ProcessService processService;
private final String localAddress;
private final NettyExecutorManager nettyExecutorManager;
public MasterFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService) {
@NonNull ProcessService processService,
@NonNull NettyExecutorManager nettyExecutorManager) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
this.nettyExecutorManager = nettyExecutorManager;
}
@ -221,6 +229,17 @@ public class MasterFailoverService {
LOGGER.info("TaskInstance failover begin kill the task related yarn job");
ProcessUtils.killYarnJob(taskExecutionContext);
}
// kill worker task, When the master failover and worker failover happened in the same time,
// the task may not be failover if we don't set NEED_FAULT_TOLERANCE.
// This can be improved if we can load all task when cache a workflowInstance in memory
try {
TaskKillRequestCommand killCommand = new TaskKillRequestCommand(taskInstance.getId());
Host workerHost = Host.of(taskInstance.getHost());
nettyExecutorManager.doExecute(workerHost, killCommand.convert2Command());
LOGGER.info("Failover task success, has killed the task in worker: {}", taskInstance.getHost());
} catch (ExecuteException e) {
LOGGER.error("Kill task failed", e);
}
} else {
LOGGER.info("The failover taskInstance is a master task");
}

6
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java

@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
@ -82,6 +83,9 @@ public class FailoverServiceTest {
@Mock
private ProcessInstanceExecCacheManager cacheManager;
@Mock
private NettyExecutorManager nettyExecutorManager;
private static int masterPort = 5678;
private static int workerPort = 1234;
@ -100,7 +104,7 @@ public class FailoverServiceTest {
given(masterConfig.getListenPort()).willReturn(masterPort);
MasterFailoverService masterFailoverService =
new MasterFailoverService(registryClient, masterConfig, processService);
new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager);
WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
masterConfig,
processService,

22
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java

@ -21,9 +21,16 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* kill task request command
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TaskKillRequestCommand implements Serializable {
/**
@ -31,14 +38,6 @@ public class TaskKillRequestCommand implements Serializable {
*/
private int taskInstanceId;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
/**
* package request command
*
@ -51,11 +50,4 @@ public class TaskKillRequestCommand implements Serializable {
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskKillRequestCommand{"
+ "taskInstanceId=" + taskInstanceId
+ '}';
}
}

Loading…
Cancel
Save