diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index e149c2e373..0572e8f9ec 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -17,9 +17,10 @@ package org.apache.dolphinscheduler.common; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; + import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.SystemUtils; -import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import java.util.regex.Pattern; @@ -48,20 +49,20 @@ public final class Constants { public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters"; public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers"; public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters"; - public static final String FORMAT_SS ="%s%s"; - public static final String FORMAT_S_S ="%s/%s"; - public static final String AWS_ACCESS_KEY_ID="aws.access.key.id"; - public static final String AWS_SECRET_ACCESS_KEY="aws.secret.access.key"; - public static final String AWS_REGION="aws.region"; - public static final String FOLDER_SEPARATOR ="/"; + public static final String FORMAT_SS = "%s%s"; + public static final String FORMAT_S_S = "%s/%s"; + public static final String AWS_ACCESS_KEY_ID = "aws.access.key.id"; + public static final String AWS_SECRET_ACCESS_KEY = "aws.secret.access.key"; + public static final String AWS_REGION = "aws.region"; + public static final String FOLDER_SEPARATOR = "/"; public static final String RESOURCE_TYPE_FILE = "resources"; - public static final String RESOURCE_TYPE_UDF="udfs"; + public static final String RESOURCE_TYPE_UDF = "udfs"; - public static final String STORAGE_S3="S3"; + public static final String STORAGE_S3 = "S3"; - public static final String STORAGE_HDFS="HDFS"; + public static final String STORAGE_HDFS = "HDFS"; public static final String BUCKET_NAME = "dolphinscheduler-test"; @@ -71,7 +72,6 @@ public final class Constants { public static final String FS_DEFAULT_FS = "fs.defaultFS"; - /** * hadoop configuration */ @@ -254,7 +254,7 @@ public final class Constants { * user name regex */ public static final Pattern REGEX_USER_NAME = Pattern.compile("^[a-zA-Z0-9._-]{3,39}$"); - + /** * read permission */ @@ -424,7 +424,7 @@ public final class Constants { /** * process or task definition first version */ - public static final int VERSION_FIRST = 1; + public static final int VERSION_FIRST = 1; /** * date format of yyyyMMdd @@ -584,7 +584,6 @@ public final class Constants { public static final long DEPENDENT_ALL_TASK_CODE = 0; - /** * preview schedule execute count */ @@ -640,20 +639,22 @@ public final class Constants { */ public static final String TASK_LOG_INFO_FORMAT = "TaskLogInfo-%s"; - public static final int[] NOT_TERMINATED_STATES = new int[] { - ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), - ExecutionStatus.RUNNING_EXECUTION.ordinal(), - ExecutionStatus.DELAY_EXECUTION.ordinal(), - ExecutionStatus.READY_PAUSE.ordinal(), - ExecutionStatus.READY_STOP.ordinal(), - ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(), - ExecutionStatus.WAITING_THREAD.ordinal(), - ExecutionStatus.WAITING_DEPEND.ordinal() + public static final int[] NOT_TERMINATED_STATES = new int[]{ + ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), + ExecutionStatus.DISPATCH.ordinal(), + ExecutionStatus.RUNNING_EXECUTION.ordinal(), + ExecutionStatus.DELAY_EXECUTION.ordinal(), + ExecutionStatus.READY_PAUSE.ordinal(), + ExecutionStatus.READY_STOP.ordinal(), + ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(), + ExecutionStatus.WAITING_THREAD.ordinal(), + ExecutionStatus.WAITING_DEPEND.ordinal() }; - public static final int[] RUNNING_PROCESS_STATE = new int[] { + public static final int[] RUNNING_PROCESS_STATE = new int[]{ ExecutionStatus.RUNNING_EXECUTION.ordinal(), ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), + ExecutionStatus.DISPATCH.ordinal(), ExecutionStatus.SERIAL_WAIT.ordinal() }; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java index 9cec2766f1..78b036f037 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java @@ -18,6 +18,9 @@ package org.apache.dolphinscheduler.common.enums; public enum Event { - ACK, - RESULT; + DISPATCH, + DELAY, + RUNNING, + RESULT, + ; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java index f544b41c36..482cc658aa 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java @@ -54,6 +54,7 @@ public enum TaskStateType { }; case RUNNING: return new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), + ExecutionStatus.DISPATCH.ordinal(), ExecutionStatus.RUNNING_EXECUTION.ordinal(), ExecutionStatus.DELAY_EXECUTION.ordinal(), ExecutionStatus.READY_PAUSE.ordinal(), diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 22b4a69be6..af050aef7d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -27,10 +27,10 @@ import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.CacheProcessor; import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor; -import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; -import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread; @@ -75,10 +75,10 @@ public class MasterServer implements IStoppable { private Scheduler scheduler; @Autowired - private TaskAckProcessor taskAckProcessor; + private TaskExecuteRunningProcessor taskExecuteRunningProcessor; @Autowired - private TaskResponseProcessor taskResponseProcessor; + private TaskExecuteResponseProcessor taskExecuteResponseProcessor; @Autowired private TaskEventProcessor taskEventProcessor; @@ -115,8 +115,8 @@ public class MasterServer implements IStoppable { NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(masterConfig.getListenPort()); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor); this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index d7d549413d..06bbc303e1 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -24,11 +24,14 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriority; @@ -79,6 +82,11 @@ public class TaskPriorityQueueConsumer extends Thread { @Autowired private ExecutorDispatcher dispatcher; + /** + * processInstance cache manager + */ + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; /** * master config @@ -86,6 +94,12 @@ public class TaskPriorityQueueConsumer extends Thread { @Autowired private MasterConfig masterConfig; + /** + * task response service + */ + @Autowired + private TaskEventService taskEventService; + /** * consumer thread pool */ @@ -168,12 +182,24 @@ public class TaskPriorityQueueConsumer extends Thread { } result = dispatcher.dispatch(executionContext); + + if (result) { + addDispatchEvent(context, executionContext); + } } catch (RuntimeException | ExecuteException e) { logger.error("dispatch error: {}", e.getMessage(), e); } return result; } + /** + * add dispatch event + */ + private void addDispatchEvent(TaskExecutionContext context, ExecutionContext executionContext) { + TaskEvent taskEvent = TaskEvent.newDispatchEvent(context.getProcessInstanceId(), context.getTaskInstanceId(), executionContext.getHost().getAddress()); + taskEventService.addEvent(taskEvent); + } + private Command toCommand(TaskExecutionContext taskExecutionContext) { TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(); requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(taskExecutionContext)); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index 0e5333bf1c..0ba24e287d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -26,9 +26,9 @@ import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; -import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; -import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.commons.collections.CollectionUtils; @@ -46,7 +46,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** - * netty executor manager + * netty executor manager */ @Service public class NettyExecutorManager extends AbstractExecutorManager { @@ -60,13 +60,13 @@ public class NettyExecutorManager extends AbstractExecutorManager { private ServerNodeManager serverNodeManager; @Autowired - private TaskAckProcessor taskAckProcessor; + private TaskExecuteRunningProcessor taskExecuteRunningProcessor; @Autowired private TaskKillResponseProcessor taskKillResponseProcessor; @Autowired - private TaskResponseProcessor taskResponseProcessor; + private TaskExecuteResponseProcessor taskExecuteResponseProcessor; /** * netty remote client @@ -83,13 +83,14 @@ public class NettyExecutorManager extends AbstractExecutorManager { @PostConstruct public void init() { - this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor); - this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); + this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor); + this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor); this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor); } /** * execute logic + * * @param context context * @return result * @throws ExecuteException if error throws ExecuteException @@ -119,7 +120,7 @@ public class NettyExecutorManager extends AbstractExecutorManager { boolean success = false; while (!success) { try { - doExecute(host,command); + doExecute(host, command); success = true; context.setHost(host); } catch (ExecuteException ex) { @@ -150,7 +151,8 @@ public class NettyExecutorManager extends AbstractExecutorManager { } /** - * execute logic + * execute logic + * * @param host host * @param command command * @throws ExecuteException if error throws ExecuteException @@ -178,7 +180,8 @@ public class NettyExecutorManager extends AbstractExecutorManager { } /** - * get all nodes + * get all nodes + * * @param context context * @return nodes */ diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java similarity index 64% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java index 47bdfcab1f..264e42e3a5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java @@ -18,13 +18,12 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; -import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; -import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,15 +35,15 @@ import com.google.common.base.Preconditions; import io.netty.channel.Channel; /** - * task response processor + * task execute response processor */ @Component -public class TaskResponseProcessor implements NettyRequestProcessor { +public class TaskExecuteResponseProcessor implements NettyRequestProcessor { - private final Logger logger = LoggerFactory.getLogger(TaskResponseProcessor.class); + private final Logger logger = LoggerFactory.getLogger(TaskExecuteResponseProcessor.class); @Autowired - private TaskResponseService taskResponseService; + private TaskEventService taskEventService; /** * task final result response @@ -57,19 +56,10 @@ public class TaskResponseProcessor implements NettyRequestProcessor { public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); - TaskExecuteResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class); - logger.info("received command : {}", responseCommand); + TaskExecuteResponseCommand taskExecuteResponseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class); + logger.info("received command : {}", taskExecuteResponseCommand); - // TaskResponseEvent - TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()), - responseCommand.getEndTime(), - responseCommand.getProcessId(), - responseCommand.getAppIds(), - responseCommand.getTaskInstanceId(), - responseCommand.getVarPool(), - channel, - responseCommand.getProcessInstanceId() - ); - taskResponseService.addResponse(taskResponseEvent); + TaskEvent taskResponseEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel); + taskEventService.addEvent(taskResponseEvent); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java similarity index 54% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java index 1d188a1433..c5ddc3d77a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java @@ -18,14 +18,12 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; -import org.apache.dolphinscheduler.remote.utils.ChannelUtils; -import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; -import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,15 +35,15 @@ import com.google.common.base.Preconditions; import io.netty.channel.Channel; /** - * task ack processor + * task execute running processor */ @Component -public class TaskAckProcessor implements NettyRequestProcessor { +public class TaskExecuteRunningProcessor implements NettyRequestProcessor { - private final Logger logger = LoggerFactory.getLogger(TaskAckProcessor.class); + private final Logger logger = LoggerFactory.getLogger(TaskExecuteRunningProcessor.class); @Autowired - private TaskResponseService taskResponseService; + private TaskEventService taskEventService; /** * task ack process @@ -55,25 +53,12 @@ public class TaskAckProcessor implements NettyRequestProcessor { */ @Override public void process(Channel channel, Command command) { - Preconditions.checkArgument(CommandType.TASK_EXECUTE_ACK == command.getType(), String.format("invalid command type : %s", command.getType())); - TaskExecuteAckCommand taskAckCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteAckCommand.class); - logger.info("taskAckCommand : {}", taskAckCommand); + Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING == command.getType(), String.format("invalid command type : %s", command.getType())); + TaskExecuteRunningCommand taskExecuteRunningCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class); + logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningCommand); - String workerAddress = ChannelUtils.toAddress(channel).getAddress(); - - ExecutionStatus ackStatus = ExecutionStatus.of(taskAckCommand.getStatus()); - - // TaskResponseEvent - TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ackStatus, - taskAckCommand.getStartTime(), - workerAddress, - taskAckCommand.getExecutePath(), - taskAckCommand.getLogPath(), - taskAckCommand.getTaskInstanceId(), - channel, - taskAckCommand.getProcessInstanceId()); - - taskResponseService.addResponse(taskResponseEvent); + TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningCommand, channel); + taskEventService.addEvent(taskEvent); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java similarity index 69% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java index 2a6bd07fe2..865eee53a5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java @@ -19,6 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor.queue; import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; +import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import java.util.Date; @@ -27,7 +30,7 @@ import io.netty.channel.Channel; /** * task event */ -public class TaskResponseEvent { +public class TaskEvent { /** * taskInstanceId @@ -90,46 +93,45 @@ public class TaskResponseEvent { private Channel channel; private int processInstanceId; - - public static TaskResponseEvent newAck(ExecutionStatus state, - Date startTime, - String workerAddress, - String executePath, - String logPath, - int taskInstanceId, - Channel channel, - int processInstanceId) { - TaskResponseEvent event = new TaskResponseEvent(); - event.setState(state); - event.setStartTime(startTime); - event.setWorkerAddress(workerAddress); - event.setExecutePath(executePath); - event.setLogPath(logPath); + + public static TaskEvent newDispatchEvent(int processInstanceId, int taskInstanceId, String workerAddress) { + TaskEvent event = new TaskEvent(); + event.setProcessInstanceId(processInstanceId); event.setTaskInstanceId(taskInstanceId); - event.setEvent(Event.ACK); + event.setWorkerAddress(workerAddress); + event.setEvent(Event.DISPATCH); + return event; + } + + public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, Channel channel) { + TaskEvent event = new TaskEvent(); + event.setProcessInstanceId(command.getProcessInstanceId()); + event.setTaskInstanceId(command.getTaskInstanceId()); + event.setState(ExecutionStatus.of(command.getStatus())); + event.setStartTime(command.getStartTime()); + event.setExecutePath(command.getExecutePath()); + event.setLogPath(command.getLogPath()); event.setChannel(channel); - event.setProcessInstanceId(processInstanceId); + event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress()); + event.setEvent(Event.RUNNING); return event; } - public static TaskResponseEvent newResult(ExecutionStatus state, - Date endTime, - int processId, - String appIds, - int taskInstanceId, - String varPool, - Channel channel, - int processInstanceId) { - TaskResponseEvent event = new TaskResponseEvent(); - event.setState(state); - event.setEndTime(endTime); - event.setProcessId(processId); - event.setAppIds(appIds); - event.setTaskInstanceId(taskInstanceId); - event.setEvent(Event.RESULT); - event.setVarPool(varPool); + public static TaskEvent newResultEvent(TaskExecuteResponseCommand command, Channel channel) { + TaskEvent event = new TaskEvent(); + event.setProcessInstanceId(command.getProcessInstanceId()); + event.setTaskInstanceId(command.getTaskInstanceId()); + event.setState(ExecutionStatus.of(command.getStatus())); + event.setStartTime(command.getStartTime()); + event.setExecutePath(command.getExecutePath()); + event.setLogPath(command.getLogPath()); + event.setEndTime(command.getEndTime()); + event.setProcessId(command.getProcessId()); + event.setAppIds(command.getAppIds()); + event.setVarPool(command.getVarPool()); event.setChannel(channel); - event.setProcessInstanceId(processInstanceId); + event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress()); + event.setEvent(Event.RESULT); return event; } @@ -140,7 +142,7 @@ public class TaskResponseEvent { public void setVarPool(String varPool) { this.varPool = varPool; } - + public int getTaskInstanceId() { return taskInstanceId; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java similarity index 50% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java index a0de0af09f..4984390527 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java @@ -23,8 +23,8 @@ import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; -import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand; -import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; @@ -50,17 +50,17 @@ import io.netty.channel.Channel; * task manager */ @Component -public class TaskResponseService { +public class TaskEventService { /** * logger */ - private final Logger logger = LoggerFactory.getLogger(TaskResponseService.class); + private final Logger logger = LoggerFactory.getLogger(TaskEventService.class); /** * attemptQueue */ - private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(); + private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(); /** * process service @@ -75,9 +75,9 @@ public class TaskResponseService { private DataQualityResultOperator dataQualityResultOperator; /** - * task response worker + * task event worker */ - private Thread taskResponseWorker; + private Thread taskEventWorker; @Autowired private ProcessInstanceExecCacheManager processInstanceExecCacheManager; @@ -87,19 +87,19 @@ public class TaskResponseService { @PostConstruct public void start() { - this.taskResponseWorker = new TaskResponseWorker(); - this.taskResponseWorker.setName("StateEventResponseWorker"); - this.taskResponseWorker.start(); + this.taskEventWorker = new TaskEventWorker(); + this.taskEventWorker.setName("TaskStateEventWorker"); + this.taskEventWorker.start(); } @PreDestroy public void stop() { try { - this.taskResponseWorker.interrupt(); + this.taskEventWorker.interrupt(); if (!eventQueue.isEmpty()) { - List remainEvents = new ArrayList<>(eventQueue.size()); + List remainEvents = new ArrayList<>(eventQueue.size()); eventQueue.drainTo(remainEvents); - for (TaskResponseEvent event : remainEvents) { + for (TaskEvent event : remainEvents) { this.persist(event); } } @@ -109,15 +109,15 @@ public class TaskResponseService { } /** - * put task to attemptQueue + * add event to queue * - * @param taskResponseEvent taskResponseEvent + * @param taskEvent taskEvent */ - public void addResponse(TaskResponseEvent taskResponseEvent) { + public void addEvent(TaskEvent taskEvent) { try { - eventQueue.put(taskResponseEvent); + eventQueue.put(taskEvent); } catch (InterruptedException e) { - logger.error("put task : {} error :{}", taskResponseEvent, e); + logger.error("add task event : {} error :{}", taskEvent, e); Thread.currentThread().interrupt(); } } @@ -125,7 +125,7 @@ public class TaskResponseService { /** * task worker thread */ - class TaskResponseWorker extends Thread { + class TaskEventWorker extends Thread { @Override public void run() { @@ -133,8 +133,8 @@ public class TaskResponseService { while (Stopper.isRunning()) { try { // if not task , blocking here - TaskResponseEvent taskResponseEvent = eventQueue.take(); - persist(taskResponseEvent); + TaskEvent taskEvent = eventQueue.take(); + persist(taskEvent); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; @@ -147,14 +147,14 @@ public class TaskResponseService { } /** - * persist taskResponseEvent + * persist task event * - * @param taskResponseEvent taskResponseEvent + * @param taskEvent taskEvent */ - private void persist(TaskResponseEvent taskResponseEvent) { - Event event = taskResponseEvent.getEvent(); - int taskInstanceId = taskResponseEvent.getTaskInstanceId(); - int processInstanceId = taskResponseEvent.getProcessInstanceId(); + private void persist(TaskEvent taskEvent) { + Event event = taskEvent.getEvent(); + int taskInstanceId = taskEvent.getTaskInstanceId(); + int processInstanceId = taskEvent.getProcessInstanceId(); TaskInstance taskInstance; WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); @@ -165,75 +165,103 @@ public class TaskResponseService { } switch (event) { - case ACK: - handleAckEvent(taskResponseEvent, taskInstance); + case DISPATCH: + handleDispatchEvent(taskEvent, taskInstance); + // dispatch event do not need to submit state event + return; + case DELAY: + case RUNNING: + handleRunningEvent(taskEvent, taskInstance); break; case RESULT: - handleResultEvent(taskResponseEvent, taskInstance); + handleResultEvent(taskEvent, taskInstance); break; default: throw new IllegalArgumentException("invalid event type : " + event); } StateEvent stateEvent = new StateEvent(); - stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId()); - stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId()); - stateEvent.setExecutionStatus(taskResponseEvent.getState()); + stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId()); + stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId()); + stateEvent.setExecutionStatus(taskEvent.getState()); stateEvent.setType(StateEventType.TASK_STATE_CHANGE); workflowExecuteThreadPool.submitStateEvent(stateEvent); } /** - * handle ack event + * handle dispatch event */ - private void handleAckEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) { - Channel channel = taskResponseEvent.getChannel(); + private void handleDispatchEvent(TaskEvent taskEvent, TaskInstance taskInstance) { + if (taskInstance == null) { + logger.error("taskInstance is null"); + return; + } + if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) { + return; + } + taskInstance.setState(ExecutionStatus.DISPATCH); + taskInstance.setHost(taskEvent.getWorkerAddress()); + processService.saveTaskInstance(taskInstance); + } + + /** + * handle running event + */ + private void handleRunningEvent(TaskEvent taskEvent, TaskInstance taskInstance) { + Channel channel = taskEvent.getChannel(); try { if (taskInstance != null) { if (taskInstance.getState().typeIsFinished()) { - logger.warn("task is finish, ack is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState()); + logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState()); } else { - processService.changeTaskState(taskInstance, taskResponseEvent.getState(), - taskResponseEvent.getStartTime(), - taskResponseEvent.getWorkerAddress(), - taskResponseEvent.getExecutePath(), - taskResponseEvent.getLogPath() - ); + taskInstance.setState(taskEvent.getState()); + taskInstance.setStartTime(taskEvent.getStartTime()); + taskInstance.setHost(taskEvent.getWorkerAddress()); + taskInstance.setLogPath(taskEvent.getLogPath()); + taskInstance.setExecutePath(taskEvent.getExecutePath()); + taskInstance.setPid(taskEvent.getProcessId()); + taskInstance.setAppLink(taskEvent.getAppIds()); + processService.saveTaskInstance(taskInstance); } } // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success - DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); - channel.writeAndFlush(taskAckCommand.convert2Command()); + TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); + channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); } catch (Exception e) { logger.error("worker ack master error", e); - DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1); - channel.writeAndFlush(taskAckCommand.convert2Command()); + TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1); + channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); } } /** * handle result event */ - private void handleResultEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) { - Channel channel = taskResponseEvent.getChannel(); + private void handleResultEvent(TaskEvent taskEvent, TaskInstance taskInstance) { + Channel channel = taskEvent.getChannel(); try { if (taskInstance != null) { - dataQualityResultOperator.operateDqExecuteResult(taskResponseEvent, taskInstance); - - processService.changeTaskState(taskInstance, taskResponseEvent.getState(), - taskResponseEvent.getEndTime(), - taskResponseEvent.getProcessId(), - taskResponseEvent.getAppIds(), - taskResponseEvent.getVarPool() - ); + dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance); + + taskInstance.setStartTime(taskEvent.getStartTime()); + taskInstance.setHost(taskEvent.getWorkerAddress()); + taskInstance.setLogPath(taskEvent.getLogPath()); + taskInstance.setExecutePath(taskEvent.getExecutePath()); + taskInstance.setPid(taskEvent.getProcessId()); + taskInstance.setAppLink(taskEvent.getAppIds()); + taskInstance.setState(taskEvent.getState()); + taskInstance.setEndTime(taskEvent.getEndTime()); + taskInstance.setVarPool(taskEvent.getVarPool()); + processService.changeOutParam(taskInstance); + processService.saveTaskInstance(taskInstance); } // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success - DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); - channel.writeAndFlush(taskResponseCommand.convert2Command()); + TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); + channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command()); } catch (Exception e) { logger.error("worker response master error", e); - DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1); - channel.writeAndFlush(taskResponseCommand.convert2Command()); + TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1); + channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command()); } } } \ No newline at end of file diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index c53a02e886..38fef8f44f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -259,11 +259,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { // verify tenant is null if (verifyTenantIsNull(tenant, taskInstance)) { - processService.changeTaskState(taskInstance, ExecutionStatus.FAILURE, - taskInstance.getStartTime(), - taskInstance.getHost(), - null, - null); + taskInstance.setState(ExecutionStatus.FAILURE); + processService.saveTaskInstance(taskInstance); return null; } // set queue for process instance, user-specified queue takes precedence over tenant queue diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DataQualityResultOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DataQualityResultOperator.java index 906c3a9f34..d172228b36 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DataQualityResultOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DataQualityResultOperator.java @@ -27,7 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.dp.CheckType; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqFailureStrategy; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.OperatorType; -import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -59,7 +59,7 @@ public class DataQualityResultOperator { * @param taskResponseEvent * @param taskInstance */ - public void operateDqExecuteResult(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) { + public void operateDqExecuteResult(TaskEvent taskResponseEvent, TaskInstance taskInstance) { if (TASK_TYPE_DATA_QUALITY.equals(taskInstance.getTaskType())) { ProcessInstance processInstance = @@ -92,7 +92,7 @@ public class DataQualityResultOperator { * @param dqExecuteResult * @param processInstance */ - private void checkDqExecuteResult(TaskResponseEvent taskResponseEvent, + private void checkDqExecuteResult(TaskEvent taskResponseEvent, DqExecuteResult dqExecuteResult, ProcessInstance processInstance) { if (isFailure(dqExecuteResult)) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java index 7429dc2e9b..1a744ea6dd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java @@ -231,6 +231,7 @@ public class DependentExecute { if (state.typeIsRunning() || state == ExecutionStatus.SUBMITTED_SUCCESS + || state == ExecutionStatus.DISPATCH || state == ExecutionStatus.WAITING_THREAD) { return DependResult.WAITING; } else { diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java new file mode 100644 index 0000000000..fc320ab493 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.processor; + +import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.Date; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import io.netty.channel.Channel; + +/** + * task ack processor test + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({SpringApplicationContext.class, TaskEvent.class}) +public class TaskAckProcessorTest { + + private TaskExecuteRunningProcessor taskExecuteRunningProcessor; + private TaskEventService taskEventService; + private ProcessService processService; + private TaskExecuteRunningCommand taskExecuteRunningCommand; + private TaskEvent taskResponseEvent; + private Channel channel; + + @Before + public void before() { + PowerMockito.mockStatic(SpringApplicationContext.class); + + taskEventService = PowerMockito.mock(TaskEventService.class); + PowerMockito.when(SpringApplicationContext.getBean(TaskEventService.class)).thenReturn(taskEventService); + + processService = PowerMockito.mock(ProcessService.class); + PowerMockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService); + + taskExecuteRunningProcessor = new TaskExecuteRunningProcessor(); + + channel = PowerMockito.mock(Channel.class); + taskResponseEvent = PowerMockito.mock(TaskEvent.class); + + taskExecuteRunningCommand = new TaskExecuteRunningCommand(); + taskExecuteRunningCommand.setStatus(1); + taskExecuteRunningCommand.setExecutePath("/dolphinscheduler/worker"); + taskExecuteRunningCommand.setHost("localhost"); + taskExecuteRunningCommand.setLogPath("/temp/worker.log"); + taskExecuteRunningCommand.setStartTime(new Date()); + taskExecuteRunningCommand.setTaskInstanceId(1); + taskExecuteRunningCommand.setProcessInstanceId(1); + } + + @Test + public void testProcess() { +// Command command = taskExecuteAckCommand.convert2Command(); +// Assert.assertEquals(CommandType.TASK_EXECUTE_ACK,command.getType()); +// InetSocketAddress socketAddress = new InetSocketAddress("localhost",12345); +// PowerMockito.when(channel.remoteAddress()).thenReturn(socketAddress); +// PowerMockito.mockStatic(TaskResponseEvent.class); +// +// PowerMockito.when(TaskResponseEvent.newAck(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyInt(), channel)) +// .thenReturn(taskResponseEvent); +// TaskInstance taskInstance = PowerMockito.mock(TaskInstance.class); +// PowerMockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance); +// +// taskAckProcessor.process(channel,command); + } +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index 9794bdda04..9a054b647d 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -19,9 +19,14 @@ package org.apache.dolphinscheduler.server.master.processor.queue; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; +import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.net.InetSocketAddress; import java.util.Date; import org.junit.After; @@ -42,41 +47,52 @@ public class TaskResponseServiceTest { private ProcessService processService; @InjectMocks - TaskResponseService taskRspService; + TaskEventService taskEventService; @Mock private Channel channel; - private TaskResponseEvent ackEvent; + private TaskEvent ackEvent; - private TaskResponseEvent resultEvent; + private TaskEvent resultEvent; private TaskInstance taskInstance; @Mock private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager; + @Mock + private DataQualityResultOperator dataQualityResultOperator; + + @Mock + private WorkflowExecuteThreadPool workflowExecuteThreadPool; + @Before public void before() { - taskRspService.start(); - - ackEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION, - new Date(), - "127.*.*.*", - "path", - "logPath", - 22, - channel, - 1); - - resultEvent = TaskResponseEvent.newResult(ExecutionStatus.SUCCESS, - new Date(), - 1, - "ids", - 22, - "varPol", - channel, - 1); + taskEventService.start(); + + Mockito.when(channel.remoteAddress()).thenReturn(InetSocketAddress.createUnresolved("127.0.0.1", 1234)); + + TaskExecuteRunningCommand taskExecuteRunningCommand = new TaskExecuteRunningCommand(); + taskExecuteRunningCommand.setProcessId(1); + taskExecuteRunningCommand.setTaskInstanceId(22); + taskExecuteRunningCommand.setStatus(ExecutionStatus.RUNNING_EXECUTION.getCode()); + taskExecuteRunningCommand.setExecutePath("path"); + taskExecuteRunningCommand.setLogPath("logPath"); + taskExecuteRunningCommand.setHost("127.*.*.*"); + taskExecuteRunningCommand.setStartTime(new Date()); + + ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningCommand, channel); + + TaskExecuteResponseCommand taskExecuteResponseCommand = new TaskExecuteResponseCommand(); + taskExecuteResponseCommand.setProcessInstanceId(1); + taskExecuteResponseCommand.setTaskInstanceId(22); + taskExecuteResponseCommand.setStatus(ExecutionStatus.SUCCESS.getCode()); + taskExecuteResponseCommand.setEndTime(new Date()); + taskExecuteResponseCommand.setVarPool("varPol"); + taskExecuteResponseCommand.setAppIds("ids"); + taskExecuteResponseCommand.setProcessId(1); + resultEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel); taskInstance = new TaskInstance(); taskInstance.setId(22); @@ -87,14 +103,14 @@ public class TaskResponseServiceTest { public void testAddResponse() { Mockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance); Mockito.when(channel.writeAndFlush(Mockito.any())).thenReturn(null); - taskRspService.addResponse(ackEvent); - taskRspService.addResponse(resultEvent); + taskEventService.addEvent(ackEvent); + taskEventService.addEvent(resultEvent); } @After public void after() { - if (taskRspService != null) { - taskRspService.stop(); + if (taskEventService != null) { + taskEventService.stop(); } } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index a5022427f2..5718872bf5 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -69,24 +69,24 @@ public enum CommandType { TASK_EXECUTE_REQUEST, /** - * execute task ack + * task execute running, from worker to master */ - TASK_EXECUTE_ACK, + TASK_EXECUTE_RUNNING, /** - * execute task response + * task execute running ack, from master to worker */ - TASK_EXECUTE_RESPONSE, + TASK_EXECUTE_RUNNING_ACK, /** - * db task ack + * task execute response, from worker to master */ - DB_TASK_ACK, + TASK_EXECUTE_RESPONSE, /** - * db task response + * task execute response ack, from master to worker */ - DB_TASK_RESPONSE, + TASK_EXECUTE_RESPONSE_ACK, /** * kill task diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java index fd9c428c6e..41ee1ef2ee 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java @@ -61,7 +61,7 @@ public class StateEventResponseCommand implements Serializable { */ public Command convert2Command() { Command command = new Command(); - command.setType(CommandType.DB_TASK_RESPONSE); + command.setType(CommandType.TASK_EXECUTE_RESPONSE_ACK); byte[] body = JSONUtils.toJsonByteArray(this); command.setBody(body); return command; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseAckCommand.java similarity index 83% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseAckCommand.java index 9bd86cbdf4..f3df257b9d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseAckCommand.java @@ -22,18 +22,19 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import java.io.Serializable; /** - * db task final result response command + * task execute response ack command + * from master to worker */ -public class DBTaskResponseCommand implements Serializable { +public class TaskExecuteResponseAckCommand implements Serializable { private int taskInstanceId; private int status; - public DBTaskResponseCommand() { + public TaskExecuteResponseAckCommand() { super(); } - public DBTaskResponseCommand(int status, int taskInstanceId) { + public TaskExecuteResponseAckCommand(int status, int taskInstanceId) { this.status = status; this.taskInstanceId = taskInstanceId; } @@ -61,7 +62,7 @@ public class DBTaskResponseCommand implements Serializable { */ public Command convert2Command() { Command command = new Command(); - command.setType(CommandType.DB_TASK_RESPONSE); + command.setType(CommandType.TASK_EXECUTE_RESPONSE_ACK); byte[] body = JSONUtils.toJsonByteArray(this); command.setBody(body); return command; @@ -69,7 +70,7 @@ public class DBTaskResponseCommand implements Serializable { @Override public String toString() { - return "DBTaskResponseCommand{" + return "TaskExecuteResponseAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}'; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java index eafb803778..4574b8818c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java @@ -23,7 +23,7 @@ import java.io.Serializable; import java.util.Date; /** - * execute task response command + * execute task response command */ public class TaskExecuteResponseCommand implements Serializable { @@ -36,23 +36,43 @@ public class TaskExecuteResponseCommand implements Serializable { } /** - * task instance id + * task instance id */ private int taskInstanceId; /** * process instance id */ - private int processInstanceId; + private int processInstanceId; /** - * status + * status */ private int status; + /** + * startTime + */ + private Date startTime; + + /** + * host + */ + private String host; + + /** + * logPath + */ + private String logPath; /** - * end time + * executePath + */ + private String executePath; + + + /** + * end time */ private Date endTime; @@ -72,6 +92,38 @@ public class TaskExecuteResponseCommand implements Serializable { */ private String varPool; + public Date getStartTime() { + return startTime; + } + + public void setStartTime(Date startTime) { + this.startTime = startTime; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getLogPath() { + return logPath; + } + + public void setLogPath(String logPath) { + this.logPath = logPath; + } + + public String getExecutePath() { + return executePath; + } + + public void setExecutePath(String executePath) { + this.executePath = executePath; + } + public void setVarPool(String varPool) { this.varPool = varPool; } @@ -79,7 +131,7 @@ public class TaskExecuteResponseCommand implements Serializable { public String getVarPool() { return varPool; } - + public int getTaskInstanceId() { return taskInstanceId; } @@ -122,6 +174,7 @@ public class TaskExecuteResponseCommand implements Serializable { /** * package response command + * * @return command */ public Command convert2Command() { @@ -136,10 +189,16 @@ public class TaskExecuteResponseCommand implements Serializable { public String toString() { return "TaskExecuteResponseCommand{" + "taskInstanceId=" + taskInstanceId + + ", processInstanceId=" + processInstanceId + ", status=" + status + + ", startTime=" + startTime + ", endTime=" + endTime + + ", host=" + host + + ", logPath=" + logPath + + ", executePath=" + executePath + ", processId=" + processId + ", appIds='" + appIds + '\'' + + ", varPool=" + varPool + '}'; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckCommand.java similarity index 80% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckCommand.java index 4797104450..b0bb666cba 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckCommand.java @@ -22,18 +22,19 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import java.io.Serializable; /** - * db task ack request command + * task execute running ack command + * from master to worker */ -public class DBTaskAckCommand implements Serializable { +public class TaskExecuteRunningAckCommand implements Serializable { private int taskInstanceId; private int status; - public DBTaskAckCommand() { + public TaskExecuteRunningAckCommand() { super(); } - public DBTaskAckCommand(int status, int taskInstanceId) { + public TaskExecuteRunningAckCommand(int status, int taskInstanceId) { this.status = status; this.taskInstanceId = taskInstanceId; } @@ -61,7 +62,7 @@ public class DBTaskAckCommand implements Serializable { */ public Command convert2Command() { Command command = new Command(); - command.setType(CommandType.DB_TASK_ACK); + command.setType(CommandType.TASK_EXECUTE_RUNNING_ACK); byte[] body = JSONUtils.toJsonByteArray(this); command.setBody(body); return command; @@ -69,6 +70,6 @@ public class DBTaskAckCommand implements Serializable { @Override public String toString() { - return "DBTaskAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}'; + return "TaskExecuteRunningAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}'; } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java similarity index 81% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java index a3dbed573a..0a2eac29f3 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java @@ -23,9 +23,10 @@ import java.io.Serializable; import java.util.Date; /** - * execute task request command + * task execute running command + * from worker to master */ -public class TaskExecuteAckCommand implements Serializable { +public class TaskExecuteRunningCommand implements Serializable { /** * taskInstanceId @@ -62,6 +63,16 @@ public class TaskExecuteAckCommand implements Serializable { */ private String executePath; + /** + * processId + */ + private int processId; + + /** + * appIds + */ + private String appIds; + public Date getStartTime() { return startTime; } @@ -94,6 +105,14 @@ public class TaskExecuteAckCommand implements Serializable { this.taskInstanceId = taskInstanceId; } + public int getProcessInstanceId() { + return processInstanceId; + } + + public void setProcessInstanceId(int processInstanceId) { + this.processInstanceId = processInstanceId; + } + public String getLogPath() { return logPath; } @@ -110,6 +129,22 @@ public class TaskExecuteAckCommand implements Serializable { this.executePath = executePath; } + public int getProcessId() { + return processId; + } + + public void setProcessId(int processId) { + this.processId = processId; + } + + public String getAppIds() { + return appIds; + } + + public void setAppIds(String appIds) { + this.appIds = appIds; + } + /** * package request command * @@ -117,7 +152,7 @@ public class TaskExecuteAckCommand implements Serializable { */ public Command convert2Command() { Command command = new Command(); - command.setType(CommandType.TASK_EXECUTE_ACK); + command.setType(CommandType.TASK_EXECUTE_RUNNING); byte[] body = JSONUtils.toJsonByteArray(this); command.setBody(body); return command; @@ -125,22 +160,16 @@ public class TaskExecuteAckCommand implements Serializable { @Override public String toString() { - return "TaskExecuteAckCommand{" + return "TaskExecuteRunningCommand{" + "taskInstanceId=" + taskInstanceId + + ", processInstanceId='" + processInstanceId + '\'' + ", startTime=" + startTime + ", host='" + host + '\'' + ", status=" + status + ", logPath='" + logPath + '\'' + ", executePath='" + executePath + '\'' - + ", processInstanceId='" + processInstanceId + '\'' + + ", processId=" + processId + '\'' + + ", appIds='" + appIds + '\'' + '}'; } - - public int getProcessInstanceId() { - return processInstanceId; - } - - public void setProcessInstanceId(int processInstanceId) { - this.processInstanceId = processInstanceId; - } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index bf377d8358..2255c0811b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -17,13 +17,19 @@ package org.apache.dolphinscheduler.service.process; -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.math.NumberUtils; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; +import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; + +import static java.util.stream.Collectors.toSet; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -124,11 +130,10 @@ import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.spi.enums.ResourceType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.math.NumberUtils; import java.util.ArrayList; import java.util.Arrays; @@ -143,17 +148,16 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; -import static java.util.stream.Collectors.toSet; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; -import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; -import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; /** * process relative dao that some mappers in this. @@ -164,6 +168,7 @@ public class ProcessService { private final Logger logger = LoggerFactory.getLogger(getClass()); private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), + ExecutionStatus.DISPATCH.ordinal(), ExecutionStatus.RUNNING_EXECUTION.ordinal(), ExecutionStatus.DELAY_EXECUTION.ordinal(), ExecutionStatus.READY_PAUSE.ordinal(), @@ -266,8 +271,8 @@ public class ProcessService { /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction * - * @param logger logger - * @param host host + * @param logger logger + * @param host host * @param command found command * @return process instance */ @@ -368,7 +373,7 @@ public class ProcessService { /** * set process waiting thread * - * @param command command + * @param command command * @param processInstance processInstance * @return process instance */ @@ -581,8 +586,6 @@ public class ProcessService { /** * recursive delete all task instance by process instance id - * - * @param processInstanceId */ public void deleteWorkTaskInstanceByProcessInstanceId(int processInstanceId) { List taskInstanceList = findValidTaskListByProcessId(processInstanceId); @@ -603,7 +606,7 @@ public class ProcessService { * recursive query sub process definition id by parent id. * * @param parentCode parentCode - * @param ids ids + * @param ids ids */ public void recurseFindSubProcess(long parentCode, List ids) { List taskNodeList = this.getTaskNodeListByDefinition(parentCode); @@ -628,7 +631,7 @@ public class ProcessService { * create recovery waiting thread command and delete origin command at the same time. * if the recovery command is exists, only update the field update_time * - * @param originCommand originCommand + * @param originCommand originCommand * @param processInstance processInstance */ public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) { @@ -684,7 +687,7 @@ public class ProcessService { /** * get schedule time from command * - * @param command command + * @param command command * @param cmdParam cmdParam map * @return date */ @@ -713,8 +716,8 @@ public class ProcessService { * generate a new work process instance from command. * * @param processDefinition processDefinition - * @param command command - * @param cmdParam cmdParam map + * @param command command + * @param cmdParam cmdParam map * @return process instance */ private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition, @@ -799,7 +802,7 @@ public class ProcessService { * use definition creator's tenant. * * @param tenantId tenantId - * @param userId userId + * @param userId userId * @return tenant */ public Tenant getTenantForProcess(int tenantId, int userId) { @@ -837,7 +840,7 @@ public class ProcessService { /** * check command parameters is valid * - * @param command command + * @param command command * @param cmdParam cmdParam map * @return whether command param is valid */ @@ -857,7 +860,7 @@ public class ProcessService { * construct process instance according to one command. * * @param command command - * @param host host + * @param host host * @return process instance */ protected ProcessInstance constructProcessInstance(Command command, String host) { @@ -1036,7 +1039,7 @@ public class ProcessService { * return complement data if the process start with complement data * * @param processInstance processInstance - * @param command command + * @param command command * @return command type */ private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command) { @@ -1051,8 +1054,8 @@ public class ProcessService { * initialize complement data parameters * * @param processDefinition processDefinition - * @param processInstance processInstance - * @param cmdParam cmdParam + * @param processInstance processInstance + * @param cmdParam cmdParam */ private void initComplementDataParam(ProcessDefinition processDefinition, ProcessInstance processInstance, @@ -1125,7 +1128,7 @@ public class ProcessService { * only the keys doesn't in sub process global would be joined. * * @param parentGlobalParams parentGlobalParams - * @param subGlobalParams subGlobalParams + * @param subGlobalParams subGlobalParams * @return global params join */ private String joinGlobalParams(String parentGlobalParams, String subGlobalParams) { @@ -1192,7 +1195,7 @@ public class ProcessService { * submit sub process to command * * @param processInstance processInstance - * @param taskInstance taskInstance + * @param taskInstance taskInstance * @return task instance */ @Transactional(rollbackFor = Exception.class) @@ -1223,7 +1226,7 @@ public class ProcessService { * set map {parent instance id, task instance id, 0(child instance id)} * * @param parentInstance parentInstance - * @param parentTask parentTask + * @param parentTask parentTask * @return process instance map */ private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) { @@ -1252,7 +1255,7 @@ public class ProcessService { * find previous task work process map. * * @param parentProcessInstance parentProcessInstance - * @param parentTask parentTask + * @param parentTask parentTask * @return process instance map */ private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance, @@ -1278,7 +1281,7 @@ public class ProcessService { * create sub work process command * * @param parentProcessInstance parentProcessInstance - * @param task task + * @param task task */ public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) { if (!task.isSubProcess()) { @@ -1412,7 +1415,7 @@ public class ProcessService { * update sub process definition * * @param parentProcessInstance parentProcessInstance - * @param childDefinitionCode childDefinitionId + * @param childDefinitionCode childDefinitionId */ private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) { ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(), @@ -1427,7 +1430,7 @@ public class ProcessService { /** * submit task to mysql * - * @param taskInstance taskInstance + * @param taskInstance taskInstance * @param processInstance processInstance * @return task instance */ @@ -1463,7 +1466,7 @@ public class ProcessService { * return stop if work process state is ready stop * if all of above are not satisfied, return submit success * - * @param taskInstance taskInstance + * @param taskInstance taskInstance * @param processInstance processInstance * @return process instance state */ @@ -1476,6 +1479,7 @@ public class ProcessService { state == ExecutionStatus.RUNNING_EXECUTION || state == ExecutionStatus.DELAY_EXECUTION || state == ExecutionStatus.KILL + || state == ExecutionStatus.DISPATCH ) { return state; } @@ -1689,7 +1693,7 @@ public class ProcessService { * get id list by task state * * @param instanceId instanceId - * @param state state + * @param state state * @return task instance states */ public List findTaskIdByInstanceState(int instanceId, ExecutionStatus state) { @@ -1744,7 +1748,7 @@ public class ProcessService { * find work process map by parent process id and parent task id. * * @param parentWorkProcessId parentWorkProcessId - * @param parentTaskId parentTaskId + * @param parentTaskId parentTaskId * @return process instance map */ public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) { @@ -1766,7 +1770,7 @@ public class ProcessService { * find sub process instance * * @param parentProcessId parentProcessId - * @param parentTaskId parentTaskId + * @param parentTaskId parentTaskId * @return process instance */ public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId) { @@ -1795,29 +1799,6 @@ public class ProcessService { return processInstance; } - /** - * change task state - * - * @param state state - * @param startTime startTime - * @param host host - * @param executePath executePath - * @param logPath logPath - */ - public void changeTaskState(TaskInstance taskInstance, - ExecutionStatus state, - Date startTime, - String host, - String executePath, - String logPath) { - taskInstance.setState(state); - taskInstance.setStartTime(startTime); - taskInstance.setHost(host); - taskInstance.setExecutePath(executePath); - taskInstance.setLogPath(logPath); - saveTaskInstance(taskInstance); - } - /** * update process instance * @@ -1828,27 +1809,6 @@ public class ProcessService { return processInstanceMapper.updateById(processInstance); } - /** - * change task state - * - * @param state state - * @param endTime endTime - * @param varPool varPool - */ - public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, - Date endTime, - int processId, - String appIds, - String varPool) { - taskInstance.setPid(processId); - taskInstance.setAppLink(appIds); - taskInstance.setState(state); - taskInstance.setEndTime(endTime); - taskInstance.setVarPool(varPool); - changeOutParam(taskInstance); - saveTaskInstance(taskInstance); - } - /** * for show in page of taskInstance */ @@ -2006,7 +1966,7 @@ public class ProcessService { * update process instance state by id * * @param processInstanceId processInstanceId - * @param executionStatus executionStatus + * @param executionStatus executionStatus * @return update process result */ public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) { @@ -2042,7 +2002,7 @@ public class ProcessService { /** * find tenant code by resource name * - * @param resName resource name + * @param resName resource name * @param resourceType resource type * @return tenant code */ @@ -2080,7 +2040,7 @@ public class ProcessService { * find last scheduler process instance in the date interval * * @param definitionCode definitionCode - * @param dateInterval dateInterval + * @param dateInterval dateInterval * @return process instance */ public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) { @@ -2093,7 +2053,7 @@ public class ProcessService { * find last manual process instance interval * * @param definitionCode process definition code - * @param dateInterval dateInterval + * @param dateInterval dateInterval * @return process instance */ public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) { @@ -2106,8 +2066,8 @@ public class ProcessService { * find last running process instance * * @param definitionCode process definition code - * @param startTime start time - * @param endTime end time + * @param startTime start time + * @param endTime end time * @return process instance */ public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) { @@ -2191,7 +2151,7 @@ public class ProcessService { /** * list unauthorized udf function * - * @param userId user id + * @param userId user id * @param needChecks data source id array * @return unauthorized udf function list */ @@ -2599,7 +2559,7 @@ public class ProcessService { * add authorized resources * * @param ownResources own resources - * @param userId userId + * @param userId userId */ private void addAuthorizedResources(List ownResources, int userId) { List relationResourceIds = resourceUserMapper.queryResourcesIdListByUserIdAndPerm(userId, 7); @@ -2742,12 +2702,7 @@ public class ProcessService { /** * the first time (when submit the task ) get the resource of the task group * - * @param taskId task id - * @param taskName - * @param groupId - * @param processId - * @param priority - * @return + * @param taskId task id */ public boolean acquireTaskGroup(int taskId, String taskName, int groupId, @@ -2788,9 +2743,6 @@ public class ProcessService { /** * try to get the task group resource(when other task release the resource) - * - * @param taskGroupQueue - * @return */ public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) { TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId()); @@ -2877,11 +2829,11 @@ public class ProcessService { /** * insert into task group queue * - * @param taskId task id - * @param taskName task name - * @param groupId group id + * @param taskId task id + * @param taskName task name + * @param groupId group id * @param processId process id - * @param priority priority + * @param priority priority * @return result and msg code */ public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId, diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java index 882ab59b0a..581c73c8f1 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java @@ -207,6 +207,16 @@ public class TaskExecutionContext { private ResourceParametersHelper resourceParametersHelper; + /** + * endTime + */ + private Date endTime; + + /** + * sql TaskExecutionContext + */ + private SQLTaskExecutionContext sqlTaskExecutionContext; + /** * resources full name and tenant code */ @@ -538,12 +548,61 @@ public class TaskExecutionContext { this.dataQualityTaskExecutionContext = dataQualityTaskExecutionContext; } + public void setCurrentExecutionStatus(ExecutionStatus currentExecutionStatus) { + this.currentExecutionStatus = currentExecutionStatus; + } + public ExecutionStatus getCurrentExecutionStatus() { return currentExecutionStatus; } - public void setCurrentExecutionStatus(ExecutionStatus currentExecutionStatus) { - this.currentExecutionStatus = currentExecutionStatus; + public Date getEndTime() { + return endTime; + } + + public void setEndTime(Date endTime) { + this.endTime = endTime; + } + + @Override + public String toString() { + return "TaskExecutionContext{" + + "taskInstanceId=" + taskInstanceId + + ", taskName='" + taskName + '\'' + + ", currentExecutionStatus=" + currentExecutionStatus + + ", firstSubmitTime=" + firstSubmitTime + + ", startTime=" + startTime + + ", taskType='" + taskType + '\'' + + ", host='" + host + '\'' + + ", executePath='" + executePath + '\'' + + ", logPath='" + logPath + '\'' + + ", taskJson='" + taskJson + '\'' + + ", processId=" + processId + + ", processDefineCode=" + processDefineCode + + ", processDefineVersion=" + processDefineVersion + + ", appIds='" + appIds + '\'' + + ", processInstanceId=" + processInstanceId + + ", scheduleTime=" + scheduleTime + + ", globalParams='" + globalParams + '\'' + + ", executorId=" + executorId + + ", cmdTypeIfComplement=" + cmdTypeIfComplement + + ", tenantCode='" + tenantCode + '\'' + + ", queue='" + queue + '\'' + + ", projectCode=" + projectCode + + ", taskParams='" + taskParams + '\'' + + ", envFile='" + envFile + '\'' + + ", dryRun='" + dryRun + '\'' + + ", definedParams=" + definedParams + + ", taskAppId='" + taskAppId + '\'' + + ", taskTimeoutStrategy=" + taskTimeoutStrategy + + ", taskTimeout=" + taskTimeout + + ", workerGroup='" + workerGroup + '\'' + + ", environmentConfig='" + environmentConfig + '\'' + + ", delayTime=" + delayTime + + ", resources=" + resources + + ", sqlTaskExecutionContext=" + sqlTaskExecutionContext + + ", dataQualityTaskExecutionContext=" + dataQualityTaskExecutionContext + + '}'; } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java index 7295f430e1..8d2774c531 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java @@ -62,7 +62,9 @@ public enum ExecutionStatus { FORCED_SUCCESS(13, "forced success"), SERIAL_WAIT(14, "serial wait"), READY_BLOCK(15, "ready block"), - BLOCK(16, "block"); + BLOCK(16, "block"), + DISPATCH(17, "dispatch"), + ; ExecutionStatus(int code, String descp) { this.code = code; diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 4311612802..45401b62b0 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -28,10 +28,10 @@ import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor; -import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor; import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; +import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor; +import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread; @@ -111,10 +111,10 @@ public class WorkerServer implements IStoppable { private TaskKillProcessor taskKillProcessor; @Autowired - private DBTaskAckProcessor dbTaskAckProcessor; + private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor; @Autowired - private DBTaskResponseProcessor dbTaskResponseProcessor; + private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor; @Autowired private HostUpdateProcessor hostUpdateProcessor; @@ -143,8 +143,8 @@ public class WorkerServer implements IStoppable { this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, dbTaskAckProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, dbTaskResponseProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor); this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor); // logger server diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java index aaf557be51..f28990b152 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java @@ -30,28 +30,30 @@ public class ResponseCache { private static final ResponseCache instance = new ResponseCache(); - private ResponseCache(){} + private ResponseCache() { + } public static ResponseCache get() { return instance; } - private Map ackCache = new ConcurrentHashMap<>(); - private Map responseCache = new ConcurrentHashMap<>(); + private Map runningCache = new ConcurrentHashMap<>(); + private Map responseCache = new ConcurrentHashMap<>(); /** * cache response + * * @param taskInstanceId taskInstanceId * @param command command * @param event event ACK/RESULT */ public void cache(Integer taskInstanceId, Command command, Event event) { switch (event) { - case ACK: - ackCache.put(taskInstanceId,command); + case RUNNING: + runningCache.put(taskInstanceId, command); break; case RESULT: - responseCache.put(taskInstanceId,command); + responseCache.put(taskInstanceId, command); break; default: throw new IllegalArgumentException("invalid event type : " + event); @@ -59,15 +61,17 @@ public class ResponseCache { } /** - * remove ack cache + * remove running cache + * * @param taskInstanceId taskInstanceId */ - public void removeAckCache(Integer taskInstanceId) { - ackCache.remove(taskInstanceId); + public void removeRunningCache(Integer taskInstanceId) { + runningCache.remove(taskInstanceId); } /** - * remove reponse cache + * remove response cache + * * @param taskInstanceId taskInstanceId */ public void removeResponseCache(Integer taskInstanceId) { @@ -75,18 +79,20 @@ public class ResponseCache { } /** - * getAckCache + * get running cache + * * @return getAckCache */ - public Map getAckCache() { - return ackCache; + public Map getRunningCache() { + return runningCache; } /** * getResponseCache + * * @return getResponseCache */ - public Map getResponseCache() { + public Map getResponseCache() { return responseCache; } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 09b2c3aa67..9a0e8943d6 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -19,16 +19,25 @@ package org.apache.dolphinscheduler.server.worker.processor; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; +import org.apache.dolphinscheduler.common.enums.Event; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; +import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel; +import org.apache.dolphinscheduler.server.worker.cache.ResponseCache; +import java.util.Arrays; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import io.netty.channel.Channel; @@ -45,6 +54,12 @@ public class TaskCallbackService { private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class); private static final int[] RETRY_BACKOFF = {1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200, 200}; + @Autowired + private TaskExecuteResponseAckProcessor taskExecuteRunningProcessor; + + @Autowired + private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor; + /** * remote channels */ @@ -58,15 +73,15 @@ public class TaskCallbackService { public TaskCallbackService() { final NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); - this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor()); - this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor()); + this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningProcessor); + this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor); } /** * add callback channel * * @param taskInstanceId taskInstanceId - * @param channel channel + * @param channel channel */ public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) { REMOTE_CHANNELS.put(taskInstanceId, channel); @@ -128,26 +143,13 @@ public class TaskCallbackService { REMOTE_CHANNELS.remove(taskInstanceId); } - /** - * send ack - * - * @param taskInstanceId taskInstanceId - * @param command command - */ - public void sendAck(int taskInstanceId, Command command) { - NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId); - if (nettyRemoteChannel != null) { - nettyRemoteChannel.writeAndFlush(command); - } - } - /** * send result * * @param taskInstanceId taskInstanceId - * @param command command + * @param command command */ - public void sendResult(int taskInstanceId, Command command) { + public void send(int taskInstanceId, Command command) { NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId); if (nettyRemoteChannel != null) { nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener() { @@ -161,6 +163,99 @@ public class TaskCallbackService { } }); } + } + + /** + * build task execute running command + * + * @param taskExecutionContext taskExecutionContext + * @return TaskExecuteAckCommand + */ + private TaskExecuteRunningCommand buildTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) { + TaskExecuteRunningCommand command = new TaskExecuteRunningCommand(); + command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId()); + command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode()); + command.setLogPath(taskExecutionContext.getLogPath()); + command.setHost(taskExecutionContext.getHost()); + command.setStartTime(taskExecutionContext.getStartTime()); + command.setExecutePath(taskExecutionContext.getExecutePath()); + return command; + } + + /** + * build task execute response command + * + * @param taskExecutionContext taskExecutionContext + * @return TaskExecuteResponseCommand + */ + private TaskExecuteResponseCommand buildTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) { + TaskExecuteResponseCommand command = new TaskExecuteResponseCommand(); + command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId()); + command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode()); + command.setLogPath(taskExecutionContext.getLogPath()); + command.setExecutePath(taskExecutionContext.getExecutePath()); + command.setAppIds(taskExecutionContext.getAppIds()); + command.setProcessId(taskExecutionContext.getProcessId()); + command.setHost(taskExecutionContext.getHost()); + command.setStartTime(taskExecutionContext.getStartTime()); + command.setEndTime(taskExecutionContext.getEndTime()); + command.setVarPool(taskExecutionContext.getVarPool()); + command.setExecutePath(taskExecutionContext.getExecutePath()); + return command; + } + + /** + * build TaskKillResponseCommand + * + * @param taskExecutionContext taskExecutionContext + * @return build TaskKillResponseCommand + */ + private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext) { + TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand(); + taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode()); + taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA))); + taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + taskKillResponseCommand.setHost(taskExecutionContext.getHost()); + taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId()); + return taskKillResponseCommand; + } + + /** + * send task execute running command + * todo unified callback command + */ + public void sendTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) { + TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext); + // add response cache + ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), Event.RUNNING); + send(taskExecutionContext.getTaskInstanceId(), command.convert2Command()); + } + + /** + * send task execute delay command + * todo unified callback command + */ + public void sendTaskExecuteDelayCommand(TaskExecutionContext taskExecutionContext) { + TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext); + send(taskExecutionContext.getTaskInstanceId(), command.convert2Command()); + } + + /** + * send task execute response command + * todo unified callback command + */ + public void sendTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) { + TaskExecuteResponseCommand command = buildTaskExecuteResponseCommand(taskExecutionContext); + // add response cache + ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), Event.RESULT); + send(taskExecutionContext.getTaskInstanceId(), command.convert2Command()); + } + public void sendTaskKillResponseCommand(TaskExecutionContext taskExecutionContext) { + TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext); + send(taskExecutionContext.getTaskInstanceId(), taskKillResponseCommand.convert2Command()); + TaskCallbackService.remove(taskExecutionContext.getTaskInstanceId()); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 88021e122d..29da668b9d 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.worker.processor; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; @@ -30,12 +29,10 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.utils.LogUtils; -import org.apache.dolphinscheduler.server.worker.cache.ResponseCache; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; @@ -128,6 +125,21 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) { + if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) { + OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode()); + } + + // check if the OS user exists + if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) { + logger.error("tenantCode: {} does not exist, taskInstanceId: {}", + taskExecutionContext.getTenantCode(), taskExecutionContext.getTaskInstanceId()); + TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); + taskExecutionContext.setEndTime(new Date()); + taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); + return; + } + // local execute path String execLocalPath = getExecLocalPath(taskExecutionContext); logger.info("task instance local execute path : {}", execLocalPath); @@ -135,12 +147,13 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { try { FileUtils.createWorkDirIfAbsent(execLocalPath); - if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) { - OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode()); - } } catch (Throwable ex) { - logger.error("create execLocalPath: {}", execLocalPath, ex); + logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", execLocalPath, taskExecutionContext.getTaskInstanceId()); + logger.error("create executeLocalPath fail", ex); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); + taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); + return; } } @@ -153,48 +166,19 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION); taskExecutionContext.setStartTime(null); - } else { - taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); - taskExecutionContext.setStartTime(new Date()); + taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext); } - this.doAck(taskExecutionContext); - // submit task to manager - if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager))) { - logger.info("submit task to manager error, queue is full, queue size is {}", workerManager.getDelayQueueSize()); + boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager)); + if (!offer) { + logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}", + workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId()); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); + taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); } } - private void doAck(TaskExecutionContext taskExecutionContext) { - // tell master that task is in executing - TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext); - ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command(), Event.ACK); - taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command()); - } - - /** - * build ack command - * - * @param taskExecutionContext taskExecutionContext - * @return TaskExecuteAckCommand - */ - private TaskExecuteAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) { - TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); - ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode()); - ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); - ackCommand.setHost(taskExecutionContext.getHost()); - ackCommand.setStartTime(taskExecutionContext.getStartTime()); - - ackCommand.setExecutePath(taskExecutionContext.getExecutePath()); - - taskExecutionContext.setLogPath(ackCommand.getLogPath()); - ackCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId()); - - return ackCommand; - } - /** * get execute local path * diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java similarity index 60% rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java index bedc5a68d6..e080842c34 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java @@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.worker.cache.ResponseCache; @@ -34,31 +34,31 @@ import com.google.common.base.Preconditions; import io.netty.channel.Channel; /** - * db task response processor + * task execute running ack, from master to worker */ @Component -public class DBTaskResponseProcessor implements NettyRequestProcessor { +public class TaskExecuteResponseAckProcessor implements NettyRequestProcessor { - private final Logger logger = LoggerFactory.getLogger(DBTaskResponseProcessor.class); + private final Logger logger = LoggerFactory.getLogger(TaskExecuteResponseAckProcessor.class); @Override public void process(Channel channel, Command command) { - Preconditions.checkArgument(CommandType.DB_TASK_RESPONSE == command.getType(), + Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE_ACK == command.getType(), String.format("invalid command type : %s", command.getType())); - DBTaskResponseCommand taskResponseCommand = JSONUtils.parseObject( - command.getBody(), DBTaskResponseCommand.class); + TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = JSONUtils.parseObject( + command.getBody(), TaskExecuteResponseAckCommand.class); - if (taskResponseCommand == null) { - logger.error("dBTask Response command is null"); + if (taskExecuteResponseAckCommand == null) { + logger.error("task execute response ack command is null"); return; } - logger.info("dBTask Response command : {}", taskResponseCommand); + logger.info("task execute response ack command : {}", taskExecuteResponseAckCommand); - if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) { - ResponseCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId()); - TaskCallbackService.remove(taskResponseCommand.getTaskInstanceId()); - logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskResponseCommand.getTaskInstanceId()); + if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) { + ResponseCache.get().removeResponseCache(taskExecuteResponseAckCommand.getTaskInstanceId()); + TaskCallbackService.remove(taskExecuteResponseAckCommand.getTaskInstanceId()); + logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskExecuteResponseAckCommand.getTaskInstanceId()); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java similarity index 64% rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java index 7a80f84401..9d74dc5dcc 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java @@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.worker.cache.ResponseCache; @@ -34,29 +34,29 @@ import com.google.common.base.Preconditions; import io.netty.channel.Channel; /** - * db task ack processor + * task execute running ack processor */ @Component -public class DBTaskAckProcessor implements NettyRequestProcessor { +public class TaskExecuteRunningAckProcessor implements NettyRequestProcessor { - private final Logger logger = LoggerFactory.getLogger(DBTaskAckProcessor.class); + private final Logger logger = LoggerFactory.getLogger(TaskExecuteRunningAckProcessor.class); @Override public void process(Channel channel, Command command) { - Preconditions.checkArgument(CommandType.DB_TASK_ACK == command.getType(), + Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING_ACK == command.getType(), String.format("invalid command type : %s", command.getType())); - DBTaskAckCommand taskAckCommand = JSONUtils.parseObject( - command.getBody(), DBTaskAckCommand.class); + TaskExecuteRunningAckCommand runningAckCommand = JSONUtils.parseObject( + command.getBody(), TaskExecuteRunningAckCommand.class); - if (taskAckCommand == null) { - logger.error("dBTask ACK request command is null"); + if (runningAckCommand == null) { + logger.error("task execute running ack command is null"); return; } - logger.info("dBTask ACK request command : {}", taskAckCommand); + logger.info("task execute running ack command : {}", runningAckCommand); - if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) { - ResponseCache.get().removeAckCache(taskAckCommand.getTaskInstanceId()); + if (runningAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) { + ResponseCache.get().removeRunningCache(runningAckCommand.getTaskInstanceId()); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 3ec0e9ab30..1c8fe55de6 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.processor; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; @@ -90,10 +91,17 @@ public class TaskKillProcessor implements NettyRequestProcessor { taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); - TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand, result); - taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command()); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId()); - TaskCallbackService.remove(killCommand.getTaskInstanceId()); + TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId()); + if (taskExecutionContext == null) { + logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId()); + return; + } + taskExecutionContext.setCurrentExecutionStatus(result.getLeft() ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE); + taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight())); + + taskCallbackService.sendTaskKillResponseCommand(taskExecutionContext); + TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId()); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java index 42013730f0..fc737ca1de 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java @@ -47,7 +47,7 @@ public class RetryReportTaskStatusThread implements Runnable { private TaskCallbackService taskCallbackService; public void start() { - Thread thread = new Thread(this,"RetryReportTaskStatusThread"); + Thread thread = new Thread(this, "RetryReportTaskStatusThread"); thread.setDaemon(true); thread.start(); } @@ -65,21 +65,21 @@ public class RetryReportTaskStatusThread implements Runnable { ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL); try { - if (!instance.getAckCache().isEmpty()) { - Map ackCache = instance.getAckCache(); - for (Map.Entry entry : ackCache.entrySet()) { + if (!instance.getRunningCache().isEmpty()) { + Map runningCache = instance.getRunningCache(); + for (Map.Entry entry : runningCache.entrySet()) { Integer taskInstanceId = entry.getKey(); - Command ackCommand = entry.getValue(); - taskCallbackService.sendAck(taskInstanceId,ackCommand); + Command runningCommand = entry.getValue(); + taskCallbackService.send(taskInstanceId, runningCommand); } } if (!instance.getResponseCache().isEmpty()) { - Map responseCache = instance.getResponseCache(); + Map responseCache = instance.getResponseCache(); for (Map.Entry entry : responseCache.entrySet()) { Integer taskInstanceId = entry.getKey(); Command responseCommand = entry.getValue(); - taskCallbackService.sendResult(taskInstanceId,responseCommand); + taskCallbackService.send(taskInstanceId, responseCommand); } } } catch (Exception e) { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 8f3dc3af0d..7670d49b0a 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -17,19 +17,15 @@ package org.apache.dolphinscheduler.server.worker.runner; -import com.github.rholder.retry.RetryException; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang.StringUtils; +import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; + import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.storage.StorageOperate; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; -import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.common.utils.RetryerUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; @@ -37,17 +33,14 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo; -import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; -import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.server.utils.ProcessUtils; -import org.apache.dolphinscheduler.server.worker.cache.ResponseCache; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.task.TaskPluginManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; import java.io.File; import java.io.IOException; @@ -57,11 +50,11 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Delayed; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * task scheduler thread @@ -131,35 +124,26 @@ public class TaskExecuteThread implements Runnable, Delayed { @Override public void run() { - TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId()); if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) { - responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode()); - responseCommand.setEndTime(new Date()); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS); + taskExecutionContext.setStartTime(new Date()); + taskExecutionContext.setEndTime(new Date()); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT); - taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); + taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); return; } try { logger.info("script path : {}", taskExecutionContext.getExecutePath()); - // check if the OS user exists - if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) { - String errorLog = String.format("tenantCode: %s does not exist", taskExecutionContext.getTenantCode()); - logger.error(errorLog); - responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); - responseCommand.setEndTime(new Date()); - return; - } - if (taskExecutionContext.getStartTime() == null) { taskExecutionContext.setStartTime(new Date()); } - if (taskExecutionContext.getCurrentExecutionStatus() != ExecutionStatus.RUNNING_EXECUTION) { - changeTaskExecutionStatusToRunning(); - } logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId()); + // callback task execute running + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); + taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext); + // copy hdfs/minio file to local downloadResource(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources(), logger); @@ -197,29 +181,27 @@ public class TaskExecuteThread implements Runnable, Delayed { // task handle this.task.handle(); - responseCommand.setStatus(this.task.getExitStatus().getCode()); - // task result process if (this.task.getNeedAlert()) { - sendAlert(this.task.getTaskAlertInfo(), responseCommand.getStatus()); + sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode()); } - responseCommand.setEndTime(new Date()); - responseCommand.setProcessId(this.task.getProcessId()); - responseCommand.setAppIds(this.task.getAppIds()); - responseCommand.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool())); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode())); + taskExecutionContext.setEndTime(DateUtils.getCurrentDate()); + taskExecutionContext.setProcessId(this.task.getProcessId()); + taskExecutionContext.setAppIds(this.task.getAppIds()); + taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool())); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus()); } catch (Throwable e) { logger.error("task scheduler failure", e); kill(); - responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); - responseCommand.setEndTime(new Date()); - responseCommand.setProcessId(task.getProcessId()); - responseCommand.setAppIds(task.getAppIds()); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); + taskExecutionContext.setEndTime(DateUtils.getCurrentDate()); + taskExecutionContext.setProcessId(this.task.getProcessId()); + taskExecutionContext.setAppIds(this.task.getAppIds()); } finally { TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT); - taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); + taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); clearTaskExecPath(); } } @@ -312,7 +294,7 @@ public class TaskExecuteThread implements Runnable, Delayed { // query the tenant code of the resource according to the name of the resource String resHdfsPath = storageOperate.getResourceFileName(tenantCode, fullName); logger.info("get resource file from hdfs :{}", resHdfsPath); - storageOperate.download(tenantCode,resHdfsPath, execLocalPath + File.separator + fullName, false, true); + storageOperate.download(tenantCode, resHdfsPath, execLocalPath + File.separator + fullName, false, true); } catch (Exception e) { logger.error(e.getMessage(), e); throw new ServiceException(e.getMessage()); @@ -323,40 +305,6 @@ public class TaskExecuteThread implements Runnable, Delayed { } } - /** - * send an ack to change the status of the task. - */ - private void changeTaskExecutionStatusToRunning() { - taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); - Command ackCommand = buildAckCommand().convert2Command(); - try { - RetryerUtils.retryCall(() -> { - taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand); - return Boolean.TRUE; - }); - } catch (ExecutionException | RetryException e) { - logger.error(e.getMessage(), e); - } - } - - /** - * build ack command. - * - * @return TaskExecuteAckCommand - */ - private TaskExecuteAckCommand buildAckCommand() { - TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); - ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode()); - ackCommand.setStartTime(taskExecutionContext.getStartTime()); - ackCommand.setLogPath(taskExecutionContext.getLogPath()); - ackCommand.setHost(taskExecutionContext.getHost()); - - ackCommand.setExecutePath(taskExecutionContext.getExecutePath()); - - return ackCommand; - } - /** * get current TaskExecutionContext * diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java index 9fd7bafc19..0dd28e5f04 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -108,7 +108,7 @@ public class WorkerManagerThread implements Runnable { TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId()); responseCommand.setStatus(ExecutionStatus.KILL.getCode()); ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT); - taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); + taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); } /** diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java index fba6c0e030..2bb23cad6a 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java @@ -24,8 +24,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -53,7 +53,7 @@ import org.slf4j.Logger; */ @RunWith(PowerMockRunner.class) @PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class, - JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class}) + JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class}) @Ignore public class TaskExecuteProcessorTest { @@ -84,7 +84,7 @@ public class TaskExecuteProcessorTest { workerConfig.setListenPort(1234); command = new Command(); command.setType(CommandType.TASK_EXECUTE_REQUEST); - ackCommand = new TaskExecuteAckCommand().convert2Command(); + ackCommand = new TaskExecuteRunningCommand().convert2Command(); taskRequestCommand = new TaskExecuteRequestCommand(); alertClientService = PowerMockito.mock(AlertClientService.class); workerExecService = PowerMockito.mock(ExecutorService.class); @@ -95,7 +95,7 @@ public class TaskExecuteProcessorTest { PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null); taskCallbackService = PowerMockito.mock(TaskCallbackService.class); - PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand); + PowerMockito.doNothing().when(taskCallbackService).send(taskExecutionContext.getTaskInstanceId(), ackCommand); PowerMockito.mockStatic(SpringApplicationContext.class); PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class)) @@ -125,10 +125,10 @@ public class TaskExecuteProcessorTest { PowerMockito.mockStatic(FileUtils.class); PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(), - taskExecutionContext.getProcessDefineCode(), - taskExecutionContext.getProcessDefineVersion(), - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId())) + taskExecutionContext.getProcessDefineCode(), + taskExecutionContext.getProcessDefineVersion(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId())) .thenReturn(taskExecutionContext.getExecutePath()); PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());