diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 0d7376681d..396dcd3811 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -65,6 +65,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; +import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.corn.CronUtils; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; @@ -499,13 +500,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ // determine whether the process is normal if (update > 0) { - String host = processInstance.getHost(); - String address = host.split(":")[0]; - int port = Integer.parseInt(host.split(":")[1]); StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0 ); - stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); + Host host = new Host(processInstance.getHost()); + stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command()); putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 801b482be2..c7d8b5c0b2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -684,14 +684,13 @@ public class WorkflowExecuteRunnable implements Runnable { if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) { // serial wait execution type needs to wake up the waiting process - if (processDefinition.getExecutionType().typeIsSerialWait()){ + if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()){ endProcess(); return true; } this.updateProcessInstanceState(stateEvent); return true; } - if (processComplementData()) { return true; } @@ -832,7 +831,7 @@ public class WorkflowExecuteRunnable implements Runnable { */ public void endProcess() { this.stateEvents.clear(); - if (processDefinition.getExecutionType().typeIsSerialWait()) { + if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()) { checkSerialProcess(processDefinition); } if (processInstance.getState().typeIsWaitingThread()) { @@ -855,6 +854,10 @@ public class WorkflowExecuteRunnable implements Runnable { if (nextProcessInstance == null) { return; } + ProcessInstance nextReadyStopProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), ExecutionStatus.READY_STOP.getCode(), processInstance.getId()); + if (processDefinition.getExecutionType().typeIsSerialPriority() && nextReadyStopProcessInstance != null) { + return; + } nextInstanceId = nextProcessInstance.getId(); } ProcessInstance nextProcessInstance = this.processService.findProcessInstanceById(nextInstanceId); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index b658f669f7..ec003762d7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; +import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; @@ -182,16 +183,15 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { * notify process's master */ private void notifyProcess(ProcessInstance finishProcessInstance, ProcessInstance processInstance, TaskInstance taskInstance) { - String host = processInstance.getHost(); - if (Strings.isNullOrEmpty(host)) { + String processInstanceHost = processInstance.getHost(); + if (Strings.isNullOrEmpty(processInstanceHost)) { logger.error("process {} host is empty, cannot notify task {} now", processInstance.getId(), taskInstance.getId()); return; } - String address = host.split(":")[0]; - int port = Integer.parseInt(host.split(":")[1]); StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( finishProcessInstance.getId(), 0, finishProcessInstance.getState(), processInstance.getId(), taskInstance.getId() ); - stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); + Host host = new Host(processInstanceHost); + stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command()); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index d870192049..72d6a7adb4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; +import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -215,9 +216,8 @@ public class SubTaskProcessor extends BaseTaskProcessor { StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( processInstance.getId(), taskInstance.getId(), subProcessInstance.getState(), subProcessInstance.getId(), 0 ); - String address = subProcessInstance.getHost().split(":")[0]; - int port = Integer.parseInt(subProcessInstance.getHost().split(":")[1]); - this.stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); + Host host = new Host(subProcessInstance.getHost()); + this.stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command()); } @Override diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java index af51831068..fb2b411523 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java @@ -122,4 +122,12 @@ public class StateEventCallbackService { nettyRemoteChannel.writeAndFlush(command); } } + + public void sendResult(Host host, Command command) { + logger.info("send result, host:{}, command:{}", host.getAddress(), command.toString()); + NettyRemoteChannel nettyRemoteChannel = newRemoteChannel(host); + if (nettyRemoteChannel != null) { + nettyRemoteChannel.writeAndFlush(command); + } + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index b66fb915bb..d03d165071 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -345,25 +345,29 @@ public class ProcessServiceImpl implements ProcessService { } else if (processDefinition.getExecutionType().typeIsSerialPriority()) { List runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId()); - if (CollectionUtils.isNotEmpty(runningProcessInstances)) { - for (ProcessInstance info : runningProcessInstances) { - info.setCommandType(CommandType.STOP); - info.addHistoryCmd(CommandType.STOP); - info.setState(ExecutionStatus.READY_STOP); - int update = updateProcessInstance(info); - // determine whether the process is normal - if (update > 0) { - String host = info.getHost(); - String address = host.split(":")[0]; - int port = Integer.parseInt(host.split(":")[1]); - StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( + if (CollectionUtils.isEmpty(runningProcessInstances)) { + processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + saveProcessInstance(processInstance); + return; + } + for (ProcessInstance info : runningProcessInstances) { + if (Objects.nonNull(info.getState()) && (ExecutionStatus.READY_STOP.equals(info.getState()) || info.getState().typeIsFinished())) { + continue; + } + info.setCommandType(CommandType.STOP); + info.addHistoryCmd(CommandType.STOP); + info.setState(ExecutionStatus.READY_STOP); + int update = updateProcessInstance(info); + // determine whether the process is normal + if (update > 0) { + StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( info.getId(), 0, info.getState(), info.getId(), 0 - ); - try { - stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); - } catch (Exception e) { - logger.error("sendResultError"); - } + ); + try { + Host host = new Host(info.getHost()); + stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command()); + } catch (Exception e) { + logger.error("sendResultError",e ); } } } @@ -3037,13 +3041,11 @@ public class ProcessServiceImpl implements ProcessService { @Override public void sendStartTask2Master(ProcessInstance processInstance, int taskId, org.apache.dolphinscheduler.remote.command.CommandType taskType) { - String host = processInstance.getHost(); - String address = host.split(":")[0]; - int port = Integer.parseInt(host.split(":")[1]); TaskEventChangeCommand taskEventChangeCommand = new TaskEventChangeCommand( processInstance.getId(), taskId ); - stateEventCallbackService.sendResult(address, port, taskEventChangeCommand.convert2Command(taskType)); + Host host = new Host(processInstance.getHost()); + stateEventCallbackService.sendResult(host, taskEventChangeCommand.convert2Command(taskType)); } @Override