diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java index 9c3bbe9cfd..9dee69be60 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java @@ -115,7 +115,7 @@ public class CheckUtils { * * @param parameter parameter * @param taskType task type - * @return true if taks node parameters are valid, otherwise return false + * @return true if task node parameters are valid, otherwise return false */ public static boolean checkTaskNodeParameters(String parameter, String taskType) { AbstractParameters abstractParameters = TaskParametersUtils.getParameters(taskType, parameter); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java index 196d0a73ae..1adf5a80ca 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java @@ -34,7 +34,7 @@ public class TaskInfo implements Serializable{ /** - * taks name + * task name */ private String taskName; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index 7b4c721422..3fc65c1853 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -39,7 +39,7 @@ public class TaskExecutionContext implements Serializable{ /** - * taks name + * task name */ private String taskName; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java index cbb8972a33..b1b67affcc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java @@ -34,7 +34,7 @@ public class NettyRemoteChannel { private final Channel channel; /** - * equest unique identification + * request unique identification */ private final long opaque; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 1e8bf9d0e7..0fe75240c0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.worker.processor; - import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -33,14 +32,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; - import java.util.Set; import java.util.concurrent.ConcurrentHashMap; - import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; /** - * taks callback service + * task callback service */ @Service public class TaskCallbackService { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 50f8989b0e..0af84b100f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -51,7 +51,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { private final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class); - /** * thread executor service */ @@ -83,9 +82,18 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { logger.info("received command : {}", taskRequestCommand); - String contextJson = taskRequestCommand.getTaskExecutionContext(); + if(taskRequestCommand == null){ + logger.error("task execute request command is null"); + return; + } + String contextJson = taskRequestCommand.getTaskExecutionContext(); TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class); + if(taskExecutionContext == null){ + logger.error("task execution context is null"); + return; + } + taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort()); // local execute path @@ -102,7 +110,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { // tell master that task is in executing final Command ackCommand = buildAckCommand(taskExecutionContext).convert2Command(); - + try { RetryerUtils.retryCall(() -> { taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(),ackCommand); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index 78ba3a6b44..e064f4cebe 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -18,14 +18,17 @@ package org.apache.dolphinscheduler.server.worker.processor; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; @@ -52,10 +55,23 @@ import java.util.Date; * test task call back service */ @RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class, WorkerRegistry.class, - ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, - ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class, - TaskResponseService.class, TaskAckProcessor.class,TaskResponseProcessor.class}) +@ContextConfiguration(classes={ + TaskCallbackServiceTestConfig.class, + SpringZKServer.class, + SpringApplicationContext.class, + MasterRegistry.class, + WorkerRegistry.class, + ZookeeperRegistryCenter.class, + MasterConfig.class, + WorkerConfig.class, + ZookeeperCachedOperator.class, + ZookeeperConfig.class, + ZookeeperNodeManager.class, + TaskCallbackService.class, + TaskResponseService.class, + TaskAckProcessor.class, + TaskResponseProcessor.class, + TaskExecuteProcessor.class}) public class TaskCallbackServiceTest { @Autowired @@ -70,6 +86,9 @@ public class TaskCallbackServiceTest { @Autowired private TaskResponseProcessor taskResponseProcessor; + @Autowired + private TaskExecuteProcessor taskExecuteProcessor; + /** * send ack test * @throws Exception @@ -176,6 +195,35 @@ public class TaskCallbackServiceTest { } } + @Test + public void testTaskExecuteProcessor() throws Exception{ + final NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(30000); + NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); + nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor); + nettyRemotingServer.start(); + + final NettyClientConfig clientConfig = new NettyClientConfig(); + NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig); + + TaskExecuteRequestCommand taskExecuteRequestCommand = new TaskExecuteRequestCommand(); + + nettyRemotingClient.send(new Host("localhost",30000),taskExecuteRequestCommand.convert2Command()); + + taskExecuteRequestCommand.setTaskExecutionContext(JSONUtils.toJsonString(new TaskExecutionContext())); + + nettyRemotingClient.send(new Host("localhost",30000),taskExecuteRequestCommand.convert2Command()); + + Thread.sleep(5000); + + Stopper.stop(); + + Thread.sleep(5000); + + nettyRemotingServer.close(); + nettyRemotingClient.close(); + } + // @Test(expected = IllegalStateException.class) // public void testSendAckWithIllegalStateException2(){ // masterRegistry.registry();