diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index 0b4d26d1ce..51d068ad08 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -17,14 +17,9 @@ package org.apache.dolphinscheduler.server.master.processor; -import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; - import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.Preconditions; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; @@ -35,7 +30,6 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,16 +53,9 @@ public class TaskAckProcessor implements NettyRequestProcessor { */ private final TaskInstanceCacheManager taskInstanceCacheManager; - - /** - * processService - */ - private ProcessService processService; - - public TaskAckProcessor() { + public TaskAckProcessor(){ this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); - this.processService = SpringApplicationContext.getBean(ProcessService.class); } /** @@ -94,19 +81,10 @@ public class TaskAckProcessor implements NettyRequestProcessor { workerAddress, taskAckCommand.getExecutePath(), taskAckCommand.getLogPath(), - taskAckCommand.getTaskInstanceId()); + taskAckCommand.getTaskInstanceId(), + channel); taskResponseService.addResponse(taskResponseEvent); - - while (Stopper.isRunning()) { - TaskInstance taskInstance = processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId()); - - if (taskInstance != null && ackStatus.typeIsRunning()) { - break; - } - ThreadUtils.sleep(SLEEP_TIME_MILLIS); - } - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index ffe3d5d9a8..080fdd540d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -17,14 +17,9 @@ package org.apache.dolphinscheduler.server.master.processor; -import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; - import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.Preconditions; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; @@ -34,7 +29,6 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,15 +52,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor { */ private final TaskInstanceCacheManager taskInstanceCacheManager; - /** - * processService - */ - private ProcessService processService; - - public TaskResponseProcessor() { + public TaskResponseProcessor(){ this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); - this.processService = SpringApplicationContext.getBean(ProcessService.class); } /** @@ -85,26 +73,16 @@ public class TaskResponseProcessor implements NettyRequestProcessor { taskInstanceCacheManager.cacheTaskInstance(responseCommand); - ExecutionStatus responseStatus = ExecutionStatus.of(responseCommand.getStatus()); - // TaskResponseEvent TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), responseCommand.getProcessId(), responseCommand.getAppIds(), responseCommand.getTaskInstanceId(), - responseCommand.getVarPool()); - + responseCommand.getVarPool(), + channel); taskResponseService.addResponse(taskResponseEvent); - - while (Stopper.isRunning()) { - TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); - - if (taskInstance != null && responseStatus.typeIsFinished()) { - break; - } - ThreadUtils.sleep(SLEEP_TIME_MILLIS); - } } + } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java index ba07be50f3..0ca558a560 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java @@ -18,10 +18,14 @@ package org.apache.dolphinscheduler.server.master.processor.queue; import com.fasterxml.jackson.annotation.JsonFormat; + +import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import java.util.Date; +import io.netty.channel.Channel; + /** * task event */ @@ -83,8 +87,19 @@ public class TaskResponseEvent { * varPool */ private String varPool; + + /** + * channel + */ + private Channel channel; - public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, String workerAddress, String executePath, String logPath, int taskInstanceId) { + public static TaskResponseEvent newAck(ExecutionStatus state, + Date startTime, + String workerAddress, + String executePath, + String logPath, + int taskInstanceId, + Channel channel) { TaskResponseEvent event = new TaskResponseEvent(); event.setState(state); event.setStartTime(startTime); @@ -93,10 +108,17 @@ public class TaskResponseEvent { event.setLogPath(logPath); event.setTaskInstanceId(taskInstanceId); event.setEvent(Event.ACK); + event.setChannel(channel); return event; } - public static TaskResponseEvent newResult(ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstanceId, String varPool) { + public static TaskResponseEvent newResult(ExecutionStatus state, + Date endTime, + int processId, + String appIds, + int taskInstanceId, + String varPool, + Channel channel) { TaskResponseEvent event = new TaskResponseEvent(); event.setState(state); event.setEndTime(endTime); @@ -105,6 +127,7 @@ public class TaskResponseEvent { event.setTaskInstanceId(taskInstanceId); event.setEvent(Event.RESULT); event.setVarPool(varPool); + event.setChannel(channel); return event; } @@ -196,8 +219,11 @@ public class TaskResponseEvent { this.event = event; } - public enum Event{ - ACK, - RESULT; + public Channel getChannel() { + return channel; + } + + public void setChannel(Channel channel) { + this.channel = channel; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index 6434db70e5..65315c7bd9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -17,7 +17,13 @@ package org.apache.dolphinscheduler.server.master.processor.queue; +import io.netty.channel.Channel; +import org.apache.dolphinscheduler.common.enums.Event; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand; +import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,24 +129,50 @@ public class TaskResponseService { * @param taskResponseEvent taskResponseEvent */ private void persist(TaskResponseEvent taskResponseEvent){ - TaskResponseEvent.Event event = taskResponseEvent.getEvent(); + Event event = taskResponseEvent.getEvent(); + Channel channel = taskResponseEvent.getChannel(); switch (event){ case ACK: - processService.changeTaskState(taskResponseEvent.getState(), - taskResponseEvent.getStartTime(), - taskResponseEvent.getWorkerAddress(), - taskResponseEvent.getExecutePath(), - taskResponseEvent.getLogPath(), - taskResponseEvent.getTaskInstanceId()); + try { + TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); + if (taskInstance != null){ + processService.changeTaskState(taskResponseEvent.getState(), + taskResponseEvent.getStartTime(), + taskResponseEvent.getWorkerAddress(), + taskResponseEvent.getExecutePath(), + taskResponseEvent.getLogPath(), + taskResponseEvent.getTaskInstanceId()); + } + // if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success + DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId()); + channel.writeAndFlush(taskAckCommand.convert2Command()); + }catch (Exception e){ + logger.error("worker ack master error",e); + DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(),-1); + channel.writeAndFlush(taskAckCommand.convert2Command()); + } break; case RESULT: - processService.changeTaskState(taskResponseEvent.getState(), - taskResponseEvent.getEndTime(), - taskResponseEvent.getProcessId(), - taskResponseEvent.getAppIds(), - taskResponseEvent.getTaskInstanceId(), - taskResponseEvent.getVarPool()); + try { + TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); + if (taskInstance != null){ + processService.changeTaskState(taskResponseEvent.getState(), + taskResponseEvent.getEndTime(), + taskResponseEvent.getProcessId(), + taskResponseEvent.getAppIds(), + taskResponseEvent.getTaskInstanceId(), + taskResponseEvent.getVarPool() + ); + } + // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success + DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId()); + channel.writeAndFlush(taskResponseCommand.convert2Command()); + }catch (Exception e){ + logger.error("worker response master error",e); + DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(),-1); + channel.writeAndFlush(taskResponseCommand.convert2Command()); + } break; default: throw new IllegalArgumentException("invalid event type : " + event); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index f0833cb7e0..6895de3d4a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -22,9 +22,12 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor; +import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; +import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +73,9 @@ public class WorkerServer { @Autowired private SpringApplicationContext springApplicationContext; + @Autowired + private RetryReportTaskStatusThread retryReportTaskStatusThread; + /** * worker server startup * @@ -95,11 +101,16 @@ public class WorkerServer { this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor()); this.nettyRemotingServer.start(); // worker registry this.workerRegistry.registry(); + // retry report task status + this.retryReportTaskStatusThread.start(); + /** * register hooks, which are called before the process exits */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java index b1b67affcc..6e2fdeb5d9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java @@ -50,6 +50,12 @@ public class NettyRemoteChannel { this.opaque = opaque; } + public NettyRemoteChannel(Channel channel) { + this.channel = channel; + this.host = ChannelUtils.toAddress(channel); + this.opaque = -1; + } + public Channel getChannel() { return channel; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 4aeb6feffb..7a21f7ece2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; @@ -66,6 +67,8 @@ public class TaskCallbackService { public TaskCallbackService(){ final NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); + this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor()); + this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor()); } /** @@ -83,16 +86,17 @@ public class TaskCallbackService { * @return callback channel */ private NettyRemoteChannel getRemoteChannel(int taskInstanceId){ + Channel newChannel; NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId); - if(nettyRemoteChannel == null){ - throw new IllegalArgumentException("nettyRemoteChannel is empty, should call addRemoteChannel first"); - } - if(nettyRemoteChannel.isActive()){ - return nettyRemoteChannel; - } - Channel newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost()); - if(newChannel != null){ - return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId); + if(nettyRemoteChannel != null){ + if(nettyRemoteChannel.isActive()){ + return nettyRemoteChannel; + } + newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost()); + if(newChannel != null){ + return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId); + } + } logger.warn("original master : {} for task : {} is not reachable, random select master", nettyRemoteChannel.getHost(), @@ -116,7 +120,7 @@ public class TaskCallbackService { for (String masterNode : masterNodes) { newChannel = nettyRemotingClient.getChannel(Host.of(masterNode)); if (newChannel != null) { - return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId); + return getRemoteChannel(newChannel,taskInstanceId); } } masterNodes = null; @@ -138,6 +142,12 @@ public class TaskCallbackService { return remoteChannel; } + private NettyRemoteChannel getRemoteChannel(Channel newChannel, int taskInstanceId){ + NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel); + addRemoteChannel(taskInstanceId, remoteChannel); + return remoteChannel; + } + /** * remove callback channels * @param taskInstanceId taskInstanceId diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 13a9dca9d6..34c4cf9f8e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.processor; +import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.thread.ThreadUtils; @@ -34,19 +35,18 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.LogUtils; +import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.Date; import java.util.Optional; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.github.rholder.retry.RetryException; import io.netty.channel.Channel; @@ -100,15 +100,16 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { logger.error("task execution context is null"); return; } - - taskExecutionContext.setHost(NetUtils.getHost() + ":" + workerConfig.getListenPort()); - // custom logger Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, taskExecutionContext.getProcessDefineId(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId())); + taskExecutionContext.setHost(NetUtils.getHost() + ":" + workerConfig.getListenPort()); + taskExecutionContext.setStartTime(new Date()); + taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); + // local execute path String execLocalPath = getExecLocalPath(taskExecutionContext); logger.info("task instance local execute path : {} ", execLocalPath); @@ -126,27 +127,17 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); - if (DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L) > 0) { - taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION); - taskExecutionContext.setStartTime(null); - } else { - taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); - taskExecutionContext.setStartTime(new Date()); - } + this.doAck(taskExecutionContext); - // tell master the status of this task (RUNNING_EXECUTION or DELAY_EXECUTION) - final Command ackCommand = buildAckCommand(taskExecutionContext).convert2Command(); + // submit task + workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger)); + } - try { - RetryerUtils.retryCall(() -> { - taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(),ackCommand); - return Boolean.TRUE; - }); - // submit task - workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger)); - } catch (ExecutionException | RetryException e) { - logger.error(e.getMessage(), e); - } + private void doAck(TaskExecutionContext taskExecutionContext){ + // tell master that task is in executing + TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext); + ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),ackCommand.convert2Command(),Event.ACK); + taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command()); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 58f743303c..6baeae9b7b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -32,6 +33,7 @@ import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; @@ -165,14 +167,10 @@ public class TaskExecuteThread implements Runnable { responseCommand.setProcessId(task.getProcessId()); responseCommand.setAppIds(task.getAppIds()); } finally { - try { - taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(responseCommand.getStatus())); - taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); - } catch (Exception e) { - ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); - taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); - } + taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),responseCommand.convert2Command(),Event.RESULT); + taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); + } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java index f4805a7561..244797f1ef 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java @@ -91,7 +91,7 @@ public class TaskAckProcessorTest { PowerMockito.when(channel.remoteAddress()).thenReturn(socketAddress); PowerMockito.mockStatic(TaskResponseEvent.class); - PowerMockito.when(TaskResponseEvent.newAck(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyInt())) + PowerMockito.when(TaskResponseEvent.newAck(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyInt(), channel)) .thenReturn(taskResponseEvent); TaskInstance taskInstance = PowerMockito.mock(TaskInstance.class); PowerMockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index a2b1b4ecc2..a9fdb58520 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -46,7 +46,7 @@ public class TaskResponseServiceTest { @Test public void testAdd(){ TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION, new Date(), - "", "", "", 1); + "", "", "", 1,null); taskResponseService.addResponse(taskResponseEvent); Assert.assertTrue(taskResponseService.getEventQueue().size() == 1); try { @@ -60,7 +60,7 @@ public class TaskResponseServiceTest { @Test public void testStop(){ TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION, new Date(), - "", "", "", 1); + "", "", "", 1,null); taskResponseService.addResponse(taskResponseEvent); taskResponseService.stop(); Assert.assertTrue(taskResponseService.getEventQueue().size() == 0);