From e7c32affc6e40b16801722e31489cf618494233a Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Thu, 20 Feb 2020 09:29:27 +0800 Subject: [PATCH] master/worker basic communication --- .../command/ExecuteTaskRequestCommand.java | 2 +- .../runner/MasterBaseTaskExecThread.java | 35 +++++++++++++++- .../master/runner/MasterExecThread.java | 15 +------ .../server/worker/WorkerServer.java | 6 +-- .../server/worker/runner/FetchTaskThread.java | 2 +- .../worker/runner/TaskScheduleThread.java | 41 +++++++++++++++++++ .../server/master/MasterExecThreadTest.java | 2 +- 7 files changed, 82 insertions(+), 21 deletions(-) diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java index adfcb1d10e..9881a2965f 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { private String taskInstanceJson; public String getTaskInstanceJson() { return taskInstanceJson; } public void setTaskInstanceJson(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index f8fcb1456d..c732e976d5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -16,10 +16,17 @@ */ package org.apache.dolphinscheduler.server.master.runner; +import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.utils.BeanContext; +import org.apache.dolphinscheduler.remote.NettyRemotingClient; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; +import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.exceptions.RemotingException; +import org.apache.dolphinscheduler.remote.utils.Address; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -75,6 +82,12 @@ public class MasterBaseTaskExecThread implements Callable { */ private MasterConfig masterConfig; + + /** + * netty remoting client + */ + private NettyRemotingClient nettyRemotingClient; + /** * constructor of MasterBaseTaskExecThread * @param taskInstance task instance @@ -88,6 +101,9 @@ public class MasterBaseTaskExecThread implements Callable { this.cancel = false; this.taskInstance = taskInstance; this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class); + + NettyClientConfig clientConfig = new NettyClientConfig(); + this.nettyRemotingClient = new NettyRemotingClient(clientConfig); } /** @@ -105,6 +121,23 @@ public class MasterBaseTaskExecThread implements Callable { this.cancel = true; } + + //TODO + /**端口,默认是123456 + * 需要构造ExecuteTaskRequestCommand,里面就是TaskInstance的属性。 + */ + public void sendToWorker(String taskInstanceJson){ + final Address address = new Address("192.168.220.247", 12346); + ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand(); + try { + Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), 5000); + logger.info("已发送任务到Worker上,Worker需要执行任务"); + //结果可能为空,所以不用管,能发过去,就行。 + } catch (InterruptedException | RemotingException ex) { + logger.error(String.format("send command to : %s error", address), ex); + } + } + /** * submit master base task exec thread * @return TaskInstance @@ -128,7 +161,7 @@ public class MasterBaseTaskExecThread implements Callable { } if(submitDB && !submitQueue){ // submit task to queue - submitQueue = processService.submitTaskToQueue(task); + sendToWorker(JSONObject.toJSONString(task)); } if(submitDB && submitQueue){ return task; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index be832174a5..b8bf1c9074 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -160,20 +160,7 @@ public class MasterExecThread implements Runnable { this.nettyRemotingClient = nettyRemotingClient; } - //TODO - /**端口,默认是123456 - * 需要构造ExecuteTaskRequestCommand,里面就是TaskInstance的属性。 - */ - private void sendToWorker(){ - final Address address = new Address("localhost", 12346); - ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand(); - try { - Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), 5000); - //结果可能为空,所以不用管,能发过去,就行。 - } catch (InterruptedException | RemotingException ex) { - logger.error(String.format("send command to : %s error", address), ex); - } - } + @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index d014f1ea4e..af5402ad0a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -172,9 +172,9 @@ public class WorkerServer implements IStoppable { this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerNettyRequestProcessor(processService)); this.nettyRemotingServer.start(); - //worker registry - this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort()); - this.workerRegistry.registry(); + // TODO 注释掉,因为有心跳,可以复用心跳的逻辑,worker registry +// this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort()); +// this.workerRegistry.registry(); this.zkWorkerClient.init(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java index 013db83761..9e0c452a80 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java @@ -235,7 +235,7 @@ public class FetchTaskThread implements Runnable{ logger.info("task : {} ready to submit to task scheduler thread",taskInstId); // submit task - workerExecService.submit(new TaskScheduleThread(taskInstance, processService)); +// workerExecService.submit(new TaskScheduleThread(taskInstance, processService)); // remove node from zk removeNodeFromTaskQueue(taskQueueStr); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index 96cb0c2246..1bba60cd1e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -97,6 +97,9 @@ public class TaskScheduleThread implements Runnable { @Override public void run() { + // TODO 需要去掉,暂时保留 + updateTaskState(taskInstance.getTaskType()); + ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId()); try { @@ -169,6 +172,12 @@ public class TaskScheduleThread implements Runnable { }catch (Exception e){ logger.error("task scheduler failure", e); kill(); + + //TODO 需要去掉,暂时保留 update task instance state + processService.changeTaskState(ExecutionStatus.FAILURE, + new Date(), + taskInstance.getId()); + responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); responseCommand.setEndTime(new Date()); @@ -176,6 +185,14 @@ public class TaskScheduleThread implements Runnable { taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand); } + logger.info("task instance id : {},task final status : {}", + taskInstance.getId(), + task.getExitStatus()); + // update task instance state + processService.changeTaskState(task.getExitStatus(), + new Date(), + taskInstance.getId()); + } /** @@ -342,4 +359,28 @@ public class TaskScheduleThread implements Runnable { PermissionCheck permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger); permissionCheck.checkPermission(); } + + /** + * update task state according to task type + * @param taskType + */ + private void updateTaskState(String taskType) { + // update task status is running + if(taskType.equals(TaskType.SQL.name()) || + taskType.equals(TaskType.PROCEDURE.name())){ + processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION, + taskInstance.getStartTime(), + taskInstance.getHost(), + null, + getTaskLogPath(), + taskInstance.getId()); + }else{ + processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION, + taskInstance.getStartTime(), + taskInstance.getHost(), + taskInstance.getExecutePath(), + getTaskLogPath(), + taskInstance.getId()); + } + } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java index 770ab3cff6..19a96a7be8 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java @@ -91,7 +91,7 @@ public class MasterExecThreadTest { processDefinition.setGlobalParamList(Collections.EMPTY_LIST); Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); - masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService)); + masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService,null)); // prepareProcess init dag Field dag = MasterExecThread.class.getDeclaredField("dag"); dag.setAccessible(true);