From e7c32affc6e40b16801722e31489cf618494233a Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Thu, 20 Feb 2020 09:29:27 +0800 Subject: [PATCH 1/7] master/worker basic communication --- .../command/ExecuteTaskRequestCommand.java | 2 +- .../runner/MasterBaseTaskExecThread.java | 35 +++++++++++++++- .../master/runner/MasterExecThread.java | 15 +------ .../server/worker/WorkerServer.java | 6 +-- .../server/worker/runner/FetchTaskThread.java | 2 +- .../worker/runner/TaskScheduleThread.java | 41 +++++++++++++++++++ .../server/master/MasterExecThreadTest.java | 2 +- 7 files changed, 82 insertions(+), 21 deletions(-) diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java index adfcb1d10e..9881a2965f 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { private String taskInstanceJson; public String getTaskInstanceJson() { return taskInstanceJson; } public void setTaskInstanceJson(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index f8fcb1456d..c732e976d5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -16,10 +16,17 @@ */ package org.apache.dolphinscheduler.server.master.runner; +import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.utils.BeanContext; +import org.apache.dolphinscheduler.remote.NettyRemotingClient; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; +import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.exceptions.RemotingException; +import org.apache.dolphinscheduler.remote.utils.Address; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -75,6 +82,12 @@ public class MasterBaseTaskExecThread implements Callable { */ private MasterConfig masterConfig; + + /** + * netty remoting client + */ + private NettyRemotingClient nettyRemotingClient; + /** * constructor of MasterBaseTaskExecThread * @param taskInstance task instance @@ -88,6 +101,9 @@ public class MasterBaseTaskExecThread implements Callable { this.cancel = false; this.taskInstance = taskInstance; this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class); + + NettyClientConfig clientConfig = new NettyClientConfig(); + this.nettyRemotingClient = new NettyRemotingClient(clientConfig); } /** @@ -105,6 +121,23 @@ public class MasterBaseTaskExecThread implements Callable { this.cancel = true; } + + //TODO + /**端口,默认是123456 + * 需要构造ExecuteTaskRequestCommand,里面就是TaskInstance的属性。 + */ + public void sendToWorker(String taskInstanceJson){ + final Address address = new Address("192.168.220.247", 12346); + ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand(); + try { + Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), 5000); + logger.info("已发送任务到Worker上,Worker需要执行任务"); + //结果可能为空,所以不用管,能发过去,就行。 + } catch (InterruptedException | RemotingException ex) { + logger.error(String.format("send command to : %s error", address), ex); + } + } + /** * submit master base task exec thread * @return TaskInstance @@ -128,7 +161,7 @@ public class MasterBaseTaskExecThread implements Callable { } if(submitDB && !submitQueue){ // submit task to queue - submitQueue = processService.submitTaskToQueue(task); + sendToWorker(JSONObject.toJSONString(task)); } if(submitDB && submitQueue){ return task; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index be832174a5..b8bf1c9074 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -160,20 +160,7 @@ public class MasterExecThread implements Runnable { this.nettyRemotingClient = nettyRemotingClient; } - //TODO - /**端口,默认是123456 - * 需要构造ExecuteTaskRequestCommand,里面就是TaskInstance的属性。 - */ - private void sendToWorker(){ - final Address address = new Address("localhost", 12346); - ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand(); - try { - Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), 5000); - //结果可能为空,所以不用管,能发过去,就行。 - } catch (InterruptedException | RemotingException ex) { - logger.error(String.format("send command to : %s error", address), ex); - } - } + @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index d014f1ea4e..af5402ad0a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -172,9 +172,9 @@ public class WorkerServer implements IStoppable { this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerNettyRequestProcessor(processService)); this.nettyRemotingServer.start(); - //worker registry - this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort()); - this.workerRegistry.registry(); + // TODO 注释掉,因为有心跳,可以复用心跳的逻辑,worker registry +// this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort()); +// this.workerRegistry.registry(); this.zkWorkerClient.init(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java index 013db83761..9e0c452a80 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java @@ -235,7 +235,7 @@ public class FetchTaskThread implements Runnable{ logger.info("task : {} ready to submit to task scheduler thread",taskInstId); // submit task - workerExecService.submit(new TaskScheduleThread(taskInstance, processService)); +// workerExecService.submit(new TaskScheduleThread(taskInstance, processService)); // remove node from zk removeNodeFromTaskQueue(taskQueueStr); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index 96cb0c2246..1bba60cd1e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -97,6 +97,9 @@ public class TaskScheduleThread implements Runnable { @Override public void run() { + // TODO 需要去掉,暂时保留 + updateTaskState(taskInstance.getTaskType()); + ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId()); try { @@ -169,6 +172,12 @@ public class TaskScheduleThread implements Runnable { }catch (Exception e){ logger.error("task scheduler failure", e); kill(); + + //TODO 需要去掉,暂时保留 update task instance state + processService.changeTaskState(ExecutionStatus.FAILURE, + new Date(), + taskInstance.getId()); + responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); responseCommand.setEndTime(new Date()); @@ -176,6 +185,14 @@ public class TaskScheduleThread implements Runnable { taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand); } + logger.info("task instance id : {},task final status : {}", + taskInstance.getId(), + task.getExitStatus()); + // update task instance state + processService.changeTaskState(task.getExitStatus(), + new Date(), + taskInstance.getId()); + } /** @@ -342,4 +359,28 @@ public class TaskScheduleThread implements Runnable { PermissionCheck permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger); permissionCheck.checkPermission(); } + + /** + * update task state according to task type + * @param taskType + */ + private void updateTaskState(String taskType) { + // update task status is running + if(taskType.equals(TaskType.SQL.name()) || + taskType.equals(TaskType.PROCEDURE.name())){ + processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION, + taskInstance.getStartTime(), + taskInstance.getHost(), + null, + getTaskLogPath(), + taskInstance.getId()); + }else{ + processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION, + taskInstance.getStartTime(), + taskInstance.getHost(), + taskInstance.getExecutePath(), + getTaskLogPath(), + taskInstance.getId()); + } + } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java index 770ab3cff6..19a96a7be8 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java @@ -91,7 +91,7 @@ public class MasterExecThreadTest { processDefinition.setGlobalParamList(Collections.EMPTY_LIST); Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); - masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService)); + masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService,null)); // prepareProcess init dag Field dag = MasterExecThread.class.getDeclaredField("dag"); dag.setAccessible(true); From 910401fb2d7e75390276af3058469158b4a56ab7 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Thu, 20 Feb 2020 11:51:51 +0800 Subject: [PATCH 2/7] master/worker basic communication --- .../remote/command/ExecuteTaskRequestCommand.java | 2 +- .../master/runner/MasterBaseTaskExecThread.java | 15 ++++----------- .../processor/WorkerNettyRequestProcessor.java | 12 +++++++++++- .../server/worker/runner/TaskScheduleThread.java | 4 ++-- 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java index 9881a2965f..4d01142c97 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { private String taskInstanceJson; public String getTaskInstanceJson() { return taskInstanceJson; } public void setTaskInstanceJson(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * task instance json */ private String taskInstanceJson; public String getTaskInstanceJson() { return taskInstanceJson; } public void setTaskInstanceJson(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskRequestCommand{" + "taskInstanceJson='" + taskInstanceJson + '\'' + '}'; } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index c732e976d5..25447af010 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -86,7 +86,7 @@ public class MasterBaseTaskExecThread implements Callable { /** * netty remoting client */ - private NettyRemotingClient nettyRemotingClient; + private static final NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(new NettyClientConfig()); /** * constructor of MasterBaseTaskExecThread @@ -101,9 +101,6 @@ public class MasterBaseTaskExecThread implements Callable { this.cancel = false; this.taskInstance = taskInstance; this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class); - - NettyClientConfig clientConfig = new NettyClientConfig(); - this.nettyRemotingClient = new NettyRemotingClient(clientConfig); } /** @@ -122,17 +119,13 @@ public class MasterBaseTaskExecThread implements Callable { } - //TODO - /**端口,默认是123456 - * 需要构造ExecuteTaskRequestCommand,里面就是TaskInstance的属性。 - */ + // TODO send task to worker public void sendToWorker(String taskInstanceJson){ final Address address = new Address("192.168.220.247", 12346); - ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand(); + ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand(taskInstanceJson); try { Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), 5000); - logger.info("已发送任务到Worker上,Worker需要执行任务"); - //结果可能为空,所以不用管,能发过去,就行。 + logger.info("response result : {}",response); } catch (InterruptedException | RemotingException ex) { logger.error(String.format("send command to : %s error", address), ex); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java index 2e5ea99b63..b1f5da0916 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.processor; +import com.alibaba.fastjson.JSONObject; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.ThreadUtils; @@ -28,6 +29,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -64,7 +66,15 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor { public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); logger.debug("received command : {}", command); - TaskInstance taskInstance = FastJsonSerializer.deserialize(command.getBody(), TaskInstance.class); + ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskRequestCommand.class); + + String taskInstanceJson = taskRequestCommand.getTaskInstanceJson(); + + TaskInstance taskInstance = JSONObject.parseObject(taskInstanceJson, TaskInstance.class); + + taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId()); + + //TODO 需要干掉,然后移到master里面。 int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId(); Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index 1bba60cd1e..cb777c6bac 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -105,7 +105,7 @@ public class TaskScheduleThread implements Runnable { try { // tell master that task is in executing ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInstance.getTaskType()); - taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand); +// taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand); logger.info("script path : {}", taskInstance.getExecutePath()); // task node @@ -182,7 +182,7 @@ public class TaskScheduleThread implements Runnable { responseCommand.setEndTime(new Date()); } finally { - taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand); +// taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand); } logger.info("task instance id : {},task final status : {}", From f6a2130ba1cbe34558207725c3148ecd72f8e1d3 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Thu, 20 Feb 2020 12:11:13 +0800 Subject: [PATCH 3/7] master/worker basic communication --- .../master/runner/MasterBaseTaskExecThread.java | 2 +- .../worker/processor/WorkerNettyRequestProcessor.java | 11 +++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 25447af010..3b578f72e2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -124,7 +124,7 @@ public class MasterBaseTaskExecThread implements Callable { final Address address = new Address("192.168.220.247", 12346); ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand(taskInstanceJson); try { - Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), 5000); + Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), Integer.MAX_VALUE); logger.info("response result : {}",response); } catch (InterruptedException | RemotingException ex) { logger.error(String.format("send command to : %s error", address), ex); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java index b1f5da0916..d8a288d805 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java @@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; +import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; +import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -64,9 +66,11 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor { @Override public void process(Channel channel, Command command) { - Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); + Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), + String.format("invalid command type : %s", command.getType())); logger.debug("received command : {}", command); - ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskRequestCommand.class); + ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize( + command.getBody(), ExecuteTaskRequestCommand.class); String taskInstanceJson = taskRequestCommand.getTaskInstanceJson(); @@ -101,6 +105,9 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor { // submit task taskInstanceCallbackService.addCallbackChannel(taskInstance.getId(), new CallbackChannel(channel, command.getOpaque())); workerExecService.submit(new TaskScheduleThread(taskInstance, processService, taskInstanceCallbackService)); + + ExecuteTaskResponseCommand executeTaskResponseCommand = new ExecuteTaskResponseCommand(taskInstance.getId()); + channel.writeAndFlush(executeTaskResponseCommand.convert2Command()); } private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) { From c39e511262bbe62ff8f8f98163eecf9f55cb16f2 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Thu, 20 Feb 2020 14:04:14 +0800 Subject: [PATCH 4/7] master/worker basic communication --- .../remote/command/ExecuteTaskResponseCommand.java | 2 +- .../server/worker/processor/WorkerNettyRequestProcessor.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java index 3e6d5c117e..8193c9d96d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; /** * execute task response command */ public class ExecuteTaskResponseCommand implements Serializable { public ExecuteTaskResponseCommand() { } public ExecuteTaskResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; /** * execute task response command */ public class ExecuteTaskResponseCommand implements Serializable { public ExecuteTaskResponseCommand() { } public ExecuteTaskResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } /** * package response command * * @param opaque request unique identification * @return command */ public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java index d8a288d805..61df9fb66c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java @@ -107,7 +107,7 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor { workerExecService.submit(new TaskScheduleThread(taskInstance, processService, taskInstanceCallbackService)); ExecuteTaskResponseCommand executeTaskResponseCommand = new ExecuteTaskResponseCommand(taskInstance.getId()); - channel.writeAndFlush(executeTaskResponseCommand.convert2Command()); + channel.writeAndFlush(executeTaskResponseCommand.convert2Command(command.getOpaque())); } private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) { From 06927c2cf46143f7111c4f831d6ceaaa1a046bca Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Thu, 20 Feb 2020 14:26:14 +0800 Subject: [PATCH 5/7] master/worker basic communication --- .../runner/MasterBaseTaskExecThread.java | 3 +- .../server/worker/WorkerServer.java | 5 +-- .../worker/processor/CallbackChannel.java | 10 ++++- ...kService.java => TaskCallbackService.java} | 4 +- ...essor.java => WorkerRequestProcessor.java} | 42 +++++++++++++------ .../worker/runner/TaskScheduleThread.java | 11 +++-- 6 files changed, 50 insertions(+), 25 deletions(-) rename dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/{TaskInstanceCallbackService.java => TaskCallbackService.java} (96%) rename dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/{WorkerNettyRequestProcessor.java => WorkerRequestProcessor.java} (87%) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 3b578f72e2..27d527a59a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -121,7 +121,7 @@ public class MasterBaseTaskExecThread implements Callable { // TODO send task to worker public void sendToWorker(String taskInstanceJson){ - final Address address = new Address("192.168.220.247", 12346); + final Address address = new Address("127.0.0.1", 12346); ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand(taskInstanceJson); try { Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), Integer.MAX_VALUE); @@ -155,6 +155,7 @@ public class MasterBaseTaskExecThread implements Callable { if(submitDB && !submitQueue){ // submit task to queue sendToWorker(JSONObject.toJSONString(task)); + submitQueue = true; } if(submitDB && submitQueue){ return task; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index af5402ad0a..19cb417e30 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -35,9 +35,8 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.processor.WorkerNettyRequestProcessor; +import org.apache.dolphinscheduler.server.worker.processor.WorkerRequestProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; -import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread; import org.apache.dolphinscheduler.server.zk.ZKWorkerClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -169,7 +168,7 @@ public class WorkerServer implements IStoppable { //init remoting server NettyServerConfig serverConfig = new NettyServerConfig(); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); - this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerNettyRequestProcessor(processService)); + this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerRequestProcessor(processService)); this.nettyRemotingServer.start(); // TODO 注释掉,因为有心跳,可以复用心跳的逻辑,worker registry diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java index 95345c0b95..e3d893fb7b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java @@ -19,11 +19,19 @@ package org.apache.dolphinscheduler.server.worker.processor; import io.netty.channel.Channel; - +/** + * callback channel + */ public class CallbackChannel { + /** + * channel + */ private Channel channel; + /** + * equest unique identification + */ private long opaque; public CallbackChannel(Channel channel, long opaque) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java similarity index 96% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 0480d94716..762ee18c45 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; import java.util.concurrent.ConcurrentHashMap; -public class TaskInstanceCallbackService { +public class TaskCallbackService { private static final ConcurrentHashMap CALL_BACK_CHANNELS = new ConcurrentHashMap<>(); @@ -56,7 +56,7 @@ public class TaskInstanceCallbackService { public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){ CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId); - callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){ + callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command(0)).addListener(new ChannelFutureListener(){ @Override public void operationComplete(ChannelFuture future) throws Exception { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java similarity index 87% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java index 61df9fb66c..be6b95e489 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java @@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; -import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -44,22 +43,36 @@ import org.slf4j.LoggerFactory; import java.util.Date; import java.util.concurrent.ExecutorService; +/** + * worker request processor + */ +public class WorkerRequestProcessor implements NettyRequestProcessor { -public class WorkerNettyRequestProcessor implements NettyRequestProcessor { - - private final Logger logger = LoggerFactory.getLogger(WorkerNettyRequestProcessor.class); + private final Logger logger = LoggerFactory.getLogger(WorkerRequestProcessor.class); + /** + * process service + */ private final ProcessService processService; + /** + * thread executor service + */ private final ExecutorService workerExecService; + /** + * worker config + */ private final WorkerConfig workerConfig; - private final TaskInstanceCallbackService taskInstanceCallbackService; + /** + * task callback service + */ + private final TaskCallbackService taskCallbackService; - public WorkerNettyRequestProcessor(ProcessService processService){ + public WorkerRequestProcessor(ProcessService processService){ this.processService = processService; - this.taskInstanceCallbackService = new TaskInstanceCallbackService(); + this.taskCallbackService = new TaskCallbackService(); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()); } @@ -68,7 +81,7 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor { public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); - logger.debug("received command : {}", command); + logger.info("received command : {}", command); ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize( command.getBody(), ExecuteTaskRequestCommand.class); @@ -79,7 +92,7 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor { taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId()); - //TODO 需要干掉,然后移到master里面。 + //TODO this logic need add to master int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId(); Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId); // verify tenant is null @@ -91,7 +104,8 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor { String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId()); taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue); taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); - //TODO 到这里。 + //TODO end + // local execute path String execLocalPath = getExecLocalPath(taskInstance); logger.info("task instance local execute path : {} ", execLocalPath); @@ -102,9 +116,13 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor { } catch (Exception ex){ logger.error(String.format("create execLocalPath : %s", execLocalPath), ex); } + + taskCallbackService.addCallbackChannel(taskInstance.getId(), + new CallbackChannel(channel, command.getOpaque())); + // submit task - taskInstanceCallbackService.addCallbackChannel(taskInstance.getId(), new CallbackChannel(channel, command.getOpaque())); - workerExecService.submit(new TaskScheduleThread(taskInstance, processService, taskInstanceCallbackService)); + workerExecService.submit(new TaskScheduleThread(taskInstance, + processService, taskCallbackService)); ExecuteTaskResponseCommand executeTaskResponseCommand = new ExecuteTaskResponseCommand(taskInstance.getId()); channel.writeAndFlush(executeTaskResponseCommand.convert2Command(command.getOpaque())); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index cb777c6bac..3488fe0031 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -35,10 +35,9 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; -import org.apache.dolphinscheduler.server.worker.processor.TaskInstanceCallbackService; +import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.server.worker.task.TaskProps; @@ -80,7 +79,7 @@ public class TaskScheduleThread implements Runnable { /** * task instance callback service */ - private TaskInstanceCallbackService taskInstanceCallbackService; + private TaskCallbackService taskInstanceCallbackService; /** * constructor @@ -88,7 +87,7 @@ public class TaskScheduleThread implements Runnable { * @param taskInstance task instance * @param processService process dao */ - public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskInstanceCallbackService taskInstanceCallbackService){ + public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskCallbackService taskInstanceCallbackService){ this.processService = processService; this.taskInstance = taskInstance; this.taskInstanceCallbackService = taskInstanceCallbackService; @@ -105,7 +104,7 @@ public class TaskScheduleThread implements Runnable { try { // tell master that task is in executing ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInstance.getTaskType()); -// taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand); + taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand); logger.info("script path : {}", taskInstance.getExecutePath()); // task node @@ -182,7 +181,7 @@ public class TaskScheduleThread implements Runnable { responseCommand.setEndTime(new Date()); } finally { -// taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand); + taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand); } logger.info("task instance id : {},task final status : {}", From 2bb2d0ce7c86c8a77bd637cec68cea18a21c062a Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Thu, 20 Feb 2020 14:55:53 +0800 Subject: [PATCH 6/7] master/worker basic communication --- .../remote/command/ExecuteTaskAckCommand.java | 2 +- .../server/log/LoggerRequestProcessor.java | 16 +++++----- .../runner/MasterBaseTaskExecThread.java | 30 +++++++++++++++---- .../worker/processor/TaskCallbackService.java | 5 ++-- .../processor/WorkerRequestProcessor.java | 3 -- .../worker/runner/TaskScheduleThread.java | 4 +-- 6 files changed, 39 insertions(+), 21 deletions(-) diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java index 24ab68f260..7f7da0e445 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; /** * execute task request command */ public class ExecuteTaskAckCommand implements Serializable { private int taskInstanceId; private Date startTime; private String host; private int status; private String logPath; private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.EXECUTE_TASK_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; /** * execute task request command */ public class ExecuteTaskAckCommand implements Serializable { private int taskInstanceId; private Date startTime; private String host; private int status; private String logPath; private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.EXECUTE_TASK_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskAckCommand{" + "taskInstanceId=" + taskInstanceId + ", startTime=" + startTime + ", host='" + host + '\'' + ", status=" + status + ", logPath='" + logPath + '\'' + ", executePath='" + executePath + '\'' + '}'; } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java index 4e4404ea1c..818b453a1b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java @@ -59,14 +59,14 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { */ final CommandType commandType = command.getType(); switch (commandType){ - case GET_LOG_BYTES_REQUEST: - GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize( - command.getBody(), GetLogBytesRequestCommand.class); - byte[] bytes = getFileContentBytes(getLogRequest.getPath()); - GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes); - channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque())); - break; - case VIEW_WHOLE_LOG_REQUEST: + case GET_LOG_BYTES_REQUEST: + GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize( + command.getBody(), GetLogBytesRequestCommand.class); + byte[] bytes = getFileContentBytes(getLogRequest.getPath()); + GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes); + channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque())); + break; + case VIEW_WHOLE_LOG_REQUEST: ViewLogRequestCommand viewLogRequest = FastJsonSerializer.deserialize( command.getBody(), ViewLogRequestCommand.class); String msg = readWholeFileContent(viewLogRequest.getPath()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 27d527a59a..8104c42bd0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -22,11 +22,12 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.utils.BeanContext; import org.apache.dolphinscheduler.remote.NettyRemotingClient; -import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; +import org.apache.dolphinscheduler.remote.command.*; +import org.apache.dolphinscheduler.remote.command.log.RollViewLogRequestCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.utils.Address; +import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -122,10 +123,29 @@ public class MasterBaseTaskExecThread implements Callable { // TODO send task to worker public void sendToWorker(String taskInstanceJson){ final Address address = new Address("127.0.0.1", 12346); - ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand(taskInstanceJson); + ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(taskInstanceJson); try { - Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), Integer.MAX_VALUE); - logger.info("response result : {}",response); + Command responseCommand = nettyRemotingClient.sendSync(address, + taskRequestCommand.convert2Command(), Integer.MAX_VALUE); + + logger.info("receive command : {}", responseCommand); + + final CommandType commandType = responseCommand.getType(); + switch (commandType){ + case EXECUTE_TASK_ACK: + ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize( + responseCommand.getBody(), ExecuteTaskAckCommand.class); + logger.info("taskAckCommand : {}",taskAckCommand); + break; + case EXECUTE_TASK_RESPONSE: + ExecuteTaskResponseCommand taskResponseCommand = FastJsonSerializer.deserialize( + responseCommand.getBody(), ExecuteTaskResponseCommand.class); + logger.info("taskResponseCommand : {}",taskResponseCommand); + break; + default: + throw new IllegalArgumentException("unknown commandType"); + } + logger.info("response result : {}",responseCommand); } catch (InterruptedException | RemotingException ex) { logger.error(String.format("send command to : %s error", address), ex); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 762ee18c45..2b6ed957d0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -56,7 +56,8 @@ public class TaskCallbackService { public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){ CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId); - callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command(0)).addListener(new ChannelFutureListener(){ + callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command( + callbackChannel.getOpaque())).addListener(new ChannelFutureListener(){ @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -68,7 +69,7 @@ public class TaskCallbackService { }); } - //TODO + // TODO private Channel createChannel(){ return null; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java index be6b95e489..0a1b7c4837 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java @@ -123,9 +123,6 @@ public class WorkerRequestProcessor implements NettyRequestProcessor { // submit task workerExecService.submit(new TaskScheduleThread(taskInstance, processService, taskCallbackService)); - - ExecuteTaskResponseCommand executeTaskResponseCommand = new ExecuteTaskResponseCommand(taskInstance.getId()); - channel.writeAndFlush(executeTaskResponseCommand.convert2Command(command.getOpaque())); } private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index 3488fe0031..bb233b559e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -96,7 +96,7 @@ public class TaskScheduleThread implements Runnable { @Override public void run() { - // TODO 需要去掉,暂时保留 + // TODO Need to be removed and kept temporarily update task instance state updateTaskState(taskInstance.getTaskType()); ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId()); @@ -172,7 +172,7 @@ public class TaskScheduleThread implements Runnable { logger.error("task scheduler failure", e); kill(); - //TODO 需要去掉,暂时保留 update task instance state + //TODO Need to be removed and kept temporarily update task instance state processService.changeTaskState(ExecutionStatus.FAILURE, new Date(), taskInstance.getId()); From d16a8aa370c9738ff7534d98b7702e5adfd4995d Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Thu, 20 Feb 2020 15:28:03 +0800 Subject: [PATCH 7/7] add comment --- .../server/worker/WorkerServer.java | 2 +- .../worker/processor/TaskCallbackService.java | 31 +++++++++++++++++++ .../processor/WorkerRequestProcessor.java | 11 +++++++ .../worker/registry/WorkerRegistry.java | 29 ++++++++++++++++- 4 files changed, 71 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 19cb417e30..340a11adf2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -171,7 +171,7 @@ public class WorkerServer implements IStoppable { this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerRequestProcessor(processService)); this.nettyRemotingServer.start(); - // TODO 注释掉,因为有心跳,可以复用心跳的逻辑,worker registry + // TODO ,because there is a heartbeat, you can reuse the heartbeat logic,worker registry // this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort()); // this.workerRegistry.registry(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 2b6ed957d0..cd62e98a9b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -26,14 +26,30 @@ import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; import java.util.concurrent.ConcurrentHashMap; +/** + * taks callback service + */ public class TaskCallbackService { + /** + * callback channels + */ private static final ConcurrentHashMap CALL_BACK_CHANNELS = new ConcurrentHashMap<>(); + /** + * add callback channel + * @param taskInstanceId taskInstanceId + * @param channel channel + */ public void addCallbackChannel(int taskInstanceId, CallbackChannel channel){ CALL_BACK_CHANNELS.put(taskInstanceId, channel); } + /** + * get callback channel + * @param taskInstanceId taskInstanceId + * @return callback channel + */ public CallbackChannel getCallbackChannel(int taskInstanceId){ CallbackChannel callbackChannel = CALL_BACK_CHANNELS.get(taskInstanceId); if(callbackChannel.getChannel().isActive()){ @@ -45,15 +61,30 @@ public class TaskCallbackService { return callbackChannel; } + /** + * remove callback channels + * @param taskInstanceId taskInstanceId + */ public void remove(int taskInstanceId){ CALL_BACK_CHANNELS.remove(taskInstanceId); } + /** + * send ack + * @param taskInstanceId taskInstanceId + * @param ackCommand ackCommand + */ public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){ CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId); callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command(callbackChannel.getOpaque())); } + /** + * send result + * + * @param taskInstanceId taskInstanceId + * @param responseCommand responseCommand + */ public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){ CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId); callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command( diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java index 0a1b7c4837..8ea9ccbe5b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java @@ -125,6 +125,12 @@ public class WorkerRequestProcessor implements NettyRequestProcessor { processService, taskCallbackService)); } + /** + * whehter tenant is null + * @param tenant tenant + * @param taskInstance taskInstance + * @return result + */ private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) { if(tenant == null){ logger.error("tenant not exists,process instance id : {},task instance id : {}", @@ -135,6 +141,11 @@ public class WorkerRequestProcessor implements NettyRequestProcessor { return false; } + /** + * get execute local path + * @param taskInstance taskInstance + * @return execute local path + */ private String getExecLocalPath(TaskInstance taskInstance){ return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(), taskInstance.getProcessDefine().getId(), diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index 396b009c01..fc81638705 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -24,20 +24,36 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +/** + * worker registry + */ public class WorkerRegistry { private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class); + /** + * zookeeper registry center + */ private final ZookeeperRegistryCenter zookeeperRegistryCenter; + /** + * port + */ private final int port; + /** + * construct + * @param zookeeperRegistryCenter zookeeperRegistryCenter + * @param port port + */ public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){ this.zookeeperRegistryCenter = zookeeperRegistryCenter; this.port = port; } + /** + * registry + */ public void registry() { String address = Constants.LOCAL_ADDRESS; String localNodePath = getWorkerPath(); @@ -58,6 +74,9 @@ public class WorkerRegistry { logger.info("scheduler node : {} registry to ZK successfully.", address); } + /** + * remove registry info + */ public void unRegistry() { String address = getLocalAddress(); String localNodePath = getWorkerPath(); @@ -65,12 +84,20 @@ public class WorkerRegistry { logger.info("worker node : {} unRegistry to ZK.", address); } + /** + * get worker path + * @return + */ private String getWorkerPath() { String address = getLocalAddress(); String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address; return localNodePath; } + /** + * get local address + * @return + */ private String getLocalAddress(){ return Constants.LOCAL_ADDRESS + ":" + port; }