diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 50c851c483..b2cf53a575 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -53,6 +53,8 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.dolphinscheduler.common.Constants.*; + /** * TaskUpdateQueue consumer */ @@ -68,7 +70,7 @@ public class TaskPriorityQueueConsumer extends Thread{ * taskUpdateQueue */ @Autowired - private TaskPriorityQueue taskUpdateQueue; + private TaskPriorityQueue taskPriorityQueue; /** * processService @@ -93,7 +95,7 @@ public class TaskPriorityQueueConsumer extends Thread{ while (Stopper.isRunning()){ try { // if not task , blocking here - String taskPriorityInfo = taskUpdateQueue.take(); + String taskPriorityInfo = taskPriorityQueue.take(); TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo); @@ -114,13 +116,22 @@ public class TaskPriorityQueueConsumer extends Thread{ private Boolean dispatch(int taskInstanceId){ TaskExecutionContext context = getTaskExecutionContext(taskInstanceId); ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup()); - try { - return dispatcher.dispatch(executionContext); - } catch (ExecuteException e) { - logger.error("execute exception", e); - return false; - } + Boolean result = false; + while (Stopper.isRunning()){ + try { + result = dispatcher.dispatch(executionContext); + } catch (ExecuteException e) { + logger.error("dispatch error",e); + try { + Thread.sleep(SLEEP_TIME_MILLIS); + } catch (InterruptedException e1) {} + } + if (result){ + break; + } + } + return result; } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index 1eb40db152..7af9cdc2cc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.Preconditions; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; @@ -31,9 +33,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.dolphinscheduler.common.Constants.*; + /** * task ack processor */ @@ -51,9 +56,16 @@ public class TaskAckProcessor implements NettyRequestProcessor { */ private final TaskInstanceCacheManager taskInstanceCacheManager; + + /** + * processService + */ + private ProcessService processService; + public TaskAckProcessor(){ this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); + this.processService = SpringApplicationContext.getBean(ProcessService.class); } /** @@ -71,8 +83,10 @@ public class TaskAckProcessor implements NettyRequestProcessor { String workerAddress = ChannelUtils.toAddress(channel).getAddress(); + ExecutionStatus ackStatus = ExecutionStatus.of(taskAckCommand.getStatus()); + // TaskResponseEvent - TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.of(taskAckCommand.getStatus()), + TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ackStatus, taskAckCommand.getStartTime(), workerAddress, taskAckCommand.getExecutePath(), @@ -81,6 +95,18 @@ public class TaskAckProcessor implements NettyRequestProcessor { taskResponseService.addResponse(taskResponseEvent); + while (Stopper.isRunning()){ + TaskInstance taskInstance = processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId()); + + if (taskInstance != null && ackStatus.typeIsRunning()){ + break; + } + + try { + Thread.sleep(SLEEP_TIME_MILLIS); + } catch (InterruptedException e) {} + } + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 36b382313b..ecb8646ad0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.Preconditions; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; @@ -30,9 +32,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.dolphinscheduler.common.Constants.*; + /** * task response processor */ @@ -50,9 +55,15 @@ public class TaskResponseProcessor implements NettyRequestProcessor { */ private final TaskInstanceCacheManager taskInstanceCacheManager; + /** + * processService + */ + private ProcessService processService; + public TaskResponseProcessor(){ this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); + this.processService = SpringApplicationContext.getBean(ProcessService.class); } /** @@ -71,6 +82,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor { taskInstanceCacheManager.cacheTaskInstance(responseCommand); + ExecutionStatus responseStatus = ExecutionStatus.of(responseCommand.getStatus()); + // TaskResponseEvent TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), @@ -79,6 +92,18 @@ public class TaskResponseProcessor implements NettyRequestProcessor { responseCommand.getTaskInstanceId()); taskResponseService.addResponse(taskResponseEvent); + + while (Stopper.isRunning()){ + TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); + + if (taskInstance != null && responseStatus.typeIsFinished()){ + break; + } + + try { + Thread.sleep(SLEEP_TIME_MILLIS); + } catch (InterruptedException e) {} + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index f966591df4..7cd25cba65 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -18,9 +18,11 @@ package org.apache.dolphinscheduler.server.worker.processor; +import com.alibaba.fastjson.JSONObject; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; @@ -93,8 +95,17 @@ public class TaskCallbackService { } logger.warn("original master : {} is not reachable, random select master", nettyRemoteChannel.getHost()); Set masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly(); - if(CollectionUtils.isEmpty(masterNodes)){ - throw new IllegalStateException("no available master node exception"); + while (Stopper.isRunning()) { + if (CollectionUtils.isEmpty(masterNodes)) { + logger.error("no available master node"); + try { + Thread.sleep(1000); + }catch (Exception e){ + + } + }else { + break; + } } for(String masterNode : masterNodes){ newChannel = nettyRemotingClient.getChannel(Host.of(masterNode)); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index 5f44e1cee2..a0fee7c36e 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -17,21 +17,26 @@ package org.apache.dolphinscheduler.server.worker.processor; import io.netty.channel.Channel; +import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.junit.Test; @@ -47,9 +52,10 @@ import java.util.Date; * test task call back service */ @RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, MasterRegistry.class, WorkerRegistry.class, +@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class, WorkerRegistry.class, ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, - ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class}) + ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class, + TaskResponseService.class, TaskAckProcessor.class,TaskResponseProcessor.class}) public class TaskCallbackServiceTest { @Autowired @@ -58,12 +64,22 @@ public class TaskCallbackServiceTest { @Autowired private MasterRegistry masterRegistry; + @Autowired + private TaskAckProcessor taskAckProcessor; + + @Autowired + private TaskResponseProcessor taskResponseProcessor; + + /** + * send ack test + * @throws Exception + */ @Test - public void testSendAck(){ + public void testSendAck() throws Exception{ final NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(30000); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); - nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); + nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); nettyRemotingServer.start(); final NettyClientConfig clientConfig = new NettyClientConfig(); @@ -75,22 +91,64 @@ public class TaskCallbackServiceTest { ackCommand.setStartTime(new Date()); taskCallbackService.sendAck(1, ackCommand.convert2Command()); + Thread.sleep(5000); + + Stopper.stop(); + + Thread.sleep(5000); + nettyRemotingServer.close(); nettyRemotingClient.close(); } + /** + * send result test + * @throws Exception + */ + @Test + public void testSendResult() throws Exception{ + final NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(30000); + NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); + nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor); + nettyRemotingServer.start(); + + final NettyClientConfig clientConfig = new NettyClientConfig(); + NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig); + Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000")); + taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1)); + TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(); + responseCommand.setTaskInstanceId(1); + responseCommand.setEndTime(new Date()); + + taskCallbackService.sendResult(1, responseCommand.convert2Command()); + + Thread.sleep(5000); + + Stopper.stop(); + + Thread.sleep(5000); + + nettyRemotingServer.close(); + nettyRemotingClient.close(); + } + + + @Test(expected = IllegalArgumentException.class) public void testSendAckWithIllegalArgumentException(){ TaskExecuteAckCommand ackCommand = Mockito.mock(TaskExecuteAckCommand.class); taskCallbackService.sendAck(1, ackCommand.convert2Command()); + Stopper.stop(); } @Test(expected = IllegalStateException.class) public void testSendAckWithIllegalStateException1(){ + masterRegistry.registry(); final NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(30000); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); - nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); + nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); nettyRemotingServer.start(); final NettyClientConfig clientConfig = new NettyClientConfig(); @@ -103,7 +161,21 @@ public class TaskCallbackServiceTest { ackCommand.setStartTime(new Date()); nettyRemotingServer.close(); + taskCallbackService.sendAck(1, ackCommand.convert2Command()); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + Stopper.stop(); + + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } @Test(expected = IllegalStateException.class) @@ -112,7 +184,7 @@ public class TaskCallbackServiceTest { final NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(30000); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); - nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); + nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); nettyRemotingServer.start(); final NettyClientConfig clientConfig = new NettyClientConfig(); @@ -125,6 +197,20 @@ public class TaskCallbackServiceTest { ackCommand.setStartTime(new Date()); nettyRemotingServer.close(); + taskCallbackService.sendAck(1, ackCommand.convert2Command()); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + Stopper.stop(); + + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } }