Browse Source

Fix kill yarn job error when failover caused by doesn't set ProcessDefinition (#10948)

3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
b245e7c973
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 21
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
  2. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java

21
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -114,16 +115,18 @@ public class MasterFailoverService {
* @param masterHost master host * @param masterHost master host
*/ */
private void doFailoverMaster(@NonNull String masterHost) { private void doFailoverMaster(@NonNull String masterHost) {
LOGGER.info("Master[{}] failover starting, need to failover process", masterHost);
StopWatch failoverTimeCost = StopWatch.createStarted(); StopWatch failoverTimeCost = StopWatch.createStarted();
Optional<Date> masterStartupTimeOptional = Optional<Date> masterStartupTimeOptional = getServerStartupTime(registryClient.getServerList(NodeType.MASTER),
getServerStartupTime(registryClient.getServerList(NodeType.MASTER), masterHost); masterHost);
List<ProcessInstance> needFailoverProcessInstanceList = List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(
processService.queryNeedFailoverProcessInstances(masterHost); masterHost);
if (CollectionUtils.isEmpty(needFailoverProcessInstanceList)) {
return;
}
LOGGER.info( LOGGER.info(
"Master[{}] failover there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}", "Master[{}] failover starting there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}",
masterHost, masterHost,
needFailoverProcessInstanceList.size(), needFailoverProcessInstanceList.size(),
needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList())); needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));
@ -136,6 +139,11 @@ public class MasterFailoverService {
LOGGER.info("WorkflowInstance doesn't need to failover"); LOGGER.info("WorkflowInstance doesn't need to failover");
continue; continue;
} }
// todo: use batch query
ProcessDefinition processDefinition
= processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
int processInstanceId = processInstance.getId(); int processInstanceId = processInstance.getId();
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId); List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId);
for (TaskInstance taskInstance : taskInstanceList) { for (TaskInstance taskInstance : taskInstanceList) {
@ -205,6 +213,7 @@ public class MasterFailoverService {
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance) .buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance) .buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
.create(); .create();
if (masterConfig.isKillYarnJobWhenTaskFailover()) { if (masterConfig.isKillYarnJobWhenTaskFailover()) {

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java

@ -115,10 +115,10 @@ public class WorkerFailoverService {
for (TaskInstance taskInstance : needFailoverTaskInstanceList) { for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId()); LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
try { try {
ProcessInstance processInstance = ProcessInstance processInstance = processInstanceCacheMap.computeIfAbsent(
processInstanceCacheMap.computeIfAbsent(taskInstance.getProcessInstanceId(), k -> { taskInstance.getProcessInstanceId(), k -> {
WorkflowExecuteRunnable workflowExecuteRunnable = WorkflowExecuteRunnable workflowExecuteRunnable = cacheManager.getByProcessInstanceId(
cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId()); taskInstance.getProcessInstanceId());
if (workflowExecuteRunnable == null) { if (workflowExecuteRunnable == null) {
return null; return null;
} }
@ -167,6 +167,7 @@ public class WorkerFailoverService {
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance) .buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance) .buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
.create(); .create();
if (masterConfig.isKillYarnJobWhenTaskFailover()) { if (masterConfig.isKillYarnJobWhenTaskFailover()) {

Loading…
Cancel
Save