Browse Source

[Fix-10452] Serial wait for policy recovery (#10453)

* Serial wait for policy recovery

* processInstance state check null

* add sendresult(Host host, Command command) method and replace the original.
3.1.0-release
WangJPLeo 2 years ago committed by GitHub
parent
commit
f46faa02c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  2. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  3. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
  4. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
  5. 8
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
  6. 24
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -65,6 +65,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.corn.CronUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
@ -499,13 +500,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// determine whether the process is normal
if (update > 0) {
String host = processInstance.getHost();
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0
);
stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
Host host = new Host(processInstance.getHost());
stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command());
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -684,14 +684,13 @@ public class WorkflowExecuteRunnable implements Runnable {
if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
// serial wait execution type needs to wake up the waiting process
if (processDefinition.getExecutionType().typeIsSerialWait()){
if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()){
endProcess();
return true;
}
this.updateProcessInstanceState(stateEvent);
return true;
}
if (processComplementData()) {
return true;
}
@ -832,7 +831,7 @@ public class WorkflowExecuteRunnable implements Runnable {
*/
public void endProcess() {
this.stateEvents.clear();
if (processDefinition.getExecutionType().typeIsSerialWait()) {
if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()) {
checkSerialProcess(processDefinition);
}
if (processInstance.getState().typeIsWaitingThread()) {
@ -855,6 +854,10 @@ public class WorkflowExecuteRunnable implements Runnable {
if (nextProcessInstance == null) {
return;
}
ProcessInstance nextReadyStopProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), ExecutionStatus.READY_STOP.getCode(), processInstance.getId());
if (processDefinition.getExecutionType().typeIsSerialPriority() && nextReadyStopProcessInstance != null) {
return;
}
nextInstanceId = nextProcessInstance.getId();
}
ProcessInstance nextProcessInstance = this.processService.findProcessInstanceById(nextInstanceId);

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
@ -182,16 +183,15 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
* notify process's master
*/
private void notifyProcess(ProcessInstance finishProcessInstance, ProcessInstance processInstance, TaskInstance taskInstance) {
String host = processInstance.getHost();
if (Strings.isNullOrEmpty(host)) {
String processInstanceHost = processInstance.getHost();
if (Strings.isNullOrEmpty(processInstanceHost)) {
logger.error("process {} host is empty, cannot notify task {} now", processInstance.getId(), taskInstance.getId());
return;
}
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
finishProcessInstance.getId(), 0, finishProcessInstance.getState(), processInstance.getId(), taskInstance.getId()
);
stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
Host host = new Host(processInstanceHost);
stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command());
}
}

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -215,9 +216,8 @@ public class SubTaskProcessor extends BaseTaskProcessor {
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
processInstance.getId(), taskInstance.getId(), subProcessInstance.getState(), subProcessInstance.getId(), 0
);
String address = subProcessInstance.getHost().split(":")[0];
int port = Integer.parseInt(subProcessInstance.getHost().split(":")[1]);
this.stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
Host host = new Host(subProcessInstance.getHost());
this.stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command());
}
@Override

8
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java

@ -122,4 +122,12 @@ public class StateEventCallbackService {
nettyRemoteChannel.writeAndFlush(command);
}
}
public void sendResult(Host host, Command command) {
logger.info("send result, host:{}, command:{}", host.getAddress(), command.toString());
NettyRemoteChannel nettyRemoteChannel = newRemoteChannel(host);
if (nettyRemoteChannel != null) {
nettyRemoteChannel.writeAndFlush(command);
}
}
}

24
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -345,25 +345,29 @@ public class ProcessServiceImpl implements ProcessService {
} else if (processDefinition.getExecutionType().typeIsSerialPriority()) {
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
saveProcessInstance(processInstance);
return;
}
for (ProcessInstance info : runningProcessInstances) {
if (Objects.nonNull(info.getState()) && (ExecutionStatus.READY_STOP.equals(info.getState()) || info.getState().typeIsFinished())) {
continue;
}
info.setCommandType(CommandType.STOP);
info.addHistoryCmd(CommandType.STOP);
info.setState(ExecutionStatus.READY_STOP);
int update = updateProcessInstance(info);
// determine whether the process is normal
if (update > 0) {
String host = info.getHost();
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
info.getId(), 0, info.getState(), info.getId(), 0
);
try {
stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
Host host = new Host(info.getHost());
stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command());
} catch (Exception e) {
logger.error("sendResultError");
}
logger.error("sendResultError",e );
}
}
}
@ -3037,13 +3041,11 @@ public class ProcessServiceImpl implements ProcessService {
@Override
public void sendStartTask2Master(ProcessInstance processInstance, int taskId,
org.apache.dolphinscheduler.remote.command.CommandType taskType) {
String host = processInstance.getHost();
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
TaskEventChangeCommand taskEventChangeCommand = new TaskEventChangeCommand(
processInstance.getId(), taskId
);
stateEventCallbackService.sendResult(address, port, taskEventChangeCommand.convert2Command(taskType));
Host host = new Host(processInstance.getHost());
stateEventCallbackService.sendResult(host, taskEventChangeCommand.convert2Command(taskType));
}
@Override

Loading…
Cancel
Save