diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java index 24ab68f260..7f7da0e445 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; /** * execute task request command */ public class ExecuteTaskAckCommand implements Serializable { private int taskInstanceId; private Date startTime; private String host; private int status; private String logPath; private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.EXECUTE_TASK_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; /** * execute task request command */ public class ExecuteTaskAckCommand implements Serializable { private int taskInstanceId; private Date startTime; private String host; private int status; private String logPath; private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.EXECUTE_TASK_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskAckCommand{" + "taskInstanceId=" + taskInstanceId + ", startTime=" + startTime + ", host='" + host + '\'' + ", status=" + status + ", logPath='" + logPath + '\'' + ", executePath='" + executePath + '\'' + '}'; } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java index adfcb1d10e..4d01142c97 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * task instance json */ private String taskInstanceJson; public String getTaskInstanceJson() { return taskInstanceJson; } public void setTaskInstanceJson(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskInstanceJson) { this.taskInstanceJson = taskInstanceJson; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskRequestCommand{" + "taskInstanceJson='" + taskInstanceJson + '\'' + '}'; } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java index 3e6d5c117e..8193c9d96d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; /** * execute task response command */ public class ExecuteTaskResponseCommand implements Serializable { public ExecuteTaskResponseCommand() { } public ExecuteTaskResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; /** * execute task response command */ public class ExecuteTaskResponseCommand implements Serializable { public ExecuteTaskResponseCommand() { } public ExecuteTaskResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } /** * package response command * * @param opaque request unique identification * @return command */ public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java index 4e4404ea1c..818b453a1b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java @@ -59,14 +59,14 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { */ final CommandType commandType = command.getType(); switch (commandType){ - case GET_LOG_BYTES_REQUEST: - GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize( - command.getBody(), GetLogBytesRequestCommand.class); - byte[] bytes = getFileContentBytes(getLogRequest.getPath()); - GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes); - channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque())); - break; - case VIEW_WHOLE_LOG_REQUEST: + case GET_LOG_BYTES_REQUEST: + GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize( + command.getBody(), GetLogBytesRequestCommand.class); + byte[] bytes = getFileContentBytes(getLogRequest.getPath()); + GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes); + channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque())); + break; + case VIEW_WHOLE_LOG_REQUEST: ViewLogRequestCommand viewLogRequest = FastJsonSerializer.deserialize( command.getBody(), ViewLogRequestCommand.class); String msg = readWholeFileContent(viewLogRequest.getPath()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index f8fcb1456d..8104c42bd0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -16,10 +16,18 @@ */ package org.apache.dolphinscheduler.server.master.runner; +import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.utils.BeanContext; +import org.apache.dolphinscheduler.remote.NettyRemotingClient; +import org.apache.dolphinscheduler.remote.command.*; +import org.apache.dolphinscheduler.remote.command.log.RollViewLogRequestCommand; +import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.exceptions.RemotingException; +import org.apache.dolphinscheduler.remote.utils.Address; +import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -75,6 +83,12 @@ public class MasterBaseTaskExecThread implements Callable { */ private MasterConfig masterConfig; + + /** + * netty remoting client + */ + private static final NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(new NettyClientConfig()); + /** * constructor of MasterBaseTaskExecThread * @param taskInstance task instance @@ -105,6 +119,38 @@ public class MasterBaseTaskExecThread implements Callable { this.cancel = true; } + + // TODO send task to worker + public void sendToWorker(String taskInstanceJson){ + final Address address = new Address("127.0.0.1", 12346); + ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(taskInstanceJson); + try { + Command responseCommand = nettyRemotingClient.sendSync(address, + taskRequestCommand.convert2Command(), Integer.MAX_VALUE); + + logger.info("receive command : {}", responseCommand); + + final CommandType commandType = responseCommand.getType(); + switch (commandType){ + case EXECUTE_TASK_ACK: + ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize( + responseCommand.getBody(), ExecuteTaskAckCommand.class); + logger.info("taskAckCommand : {}",taskAckCommand); + break; + case EXECUTE_TASK_RESPONSE: + ExecuteTaskResponseCommand taskResponseCommand = FastJsonSerializer.deserialize( + responseCommand.getBody(), ExecuteTaskResponseCommand.class); + logger.info("taskResponseCommand : {}",taskResponseCommand); + break; + default: + throw new IllegalArgumentException("unknown commandType"); + } + logger.info("response result : {}",responseCommand); + } catch (InterruptedException | RemotingException ex) { + logger.error(String.format("send command to : %s error", address), ex); + } + } + /** * submit master base task exec thread * @return TaskInstance @@ -128,7 +174,8 @@ public class MasterBaseTaskExecThread implements Callable { } if(submitDB && !submitQueue){ // submit task to queue - submitQueue = processService.submitTaskToQueue(task); + sendToWorker(JSONObject.toJSONString(task)); + submitQueue = true; } if(submitDB && submitQueue){ return task; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index be832174a5..b8bf1c9074 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -160,20 +160,7 @@ public class MasterExecThread implements Runnable { this.nettyRemotingClient = nettyRemotingClient; } - //TODO - /**端口,默认是123456 - * 需要构造ExecuteTaskRequestCommand,里面就是TaskInstance的属性。 - */ - private void sendToWorker(){ - final Address address = new Address("localhost", 12346); - ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand(); - try { - Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), 5000); - //结果可能为空,所以不用管,能发过去,就行。 - } catch (InterruptedException | RemotingException ex) { - logger.error(String.format("send command to : %s error", address), ex); - } - } + @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index d014f1ea4e..340a11adf2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -35,9 +35,8 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.processor.WorkerNettyRequestProcessor; +import org.apache.dolphinscheduler.server.worker.processor.WorkerRequestProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; -import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread; import org.apache.dolphinscheduler.server.zk.ZKWorkerClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -169,12 +168,12 @@ public class WorkerServer implements IStoppable { //init remoting server NettyServerConfig serverConfig = new NettyServerConfig(); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); - this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerNettyRequestProcessor(processService)); + this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerRequestProcessor(processService)); this.nettyRemotingServer.start(); - //worker registry - this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort()); - this.workerRegistry.registry(); + // TODO ,because there is a heartbeat, you can reuse the heartbeat logic,worker registry +// this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort()); +// this.workerRegistry.registry(); this.zkWorkerClient.init(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java index 95345c0b95..e3d893fb7b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java @@ -19,11 +19,19 @@ package org.apache.dolphinscheduler.server.worker.processor; import io.netty.channel.Channel; - +/** + * callback channel + */ public class CallbackChannel { + /** + * channel + */ private Channel channel; + /** + * equest unique identification + */ private long opaque; public CallbackChannel(Channel channel, long opaque) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java similarity index 77% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 0480d94716..cd62e98a9b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -26,14 +26,30 @@ import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; import java.util.concurrent.ConcurrentHashMap; -public class TaskInstanceCallbackService { +/** + * taks callback service + */ +public class TaskCallbackService { + /** + * callback channels + */ private static final ConcurrentHashMap CALL_BACK_CHANNELS = new ConcurrentHashMap<>(); + /** + * add callback channel + * @param taskInstanceId taskInstanceId + * @param channel channel + */ public void addCallbackChannel(int taskInstanceId, CallbackChannel channel){ CALL_BACK_CHANNELS.put(taskInstanceId, channel); } + /** + * get callback channel + * @param taskInstanceId taskInstanceId + * @return callback channel + */ public CallbackChannel getCallbackChannel(int taskInstanceId){ CallbackChannel callbackChannel = CALL_BACK_CHANNELS.get(taskInstanceId); if(callbackChannel.getChannel().isActive()){ @@ -45,18 +61,34 @@ public class TaskInstanceCallbackService { return callbackChannel; } + /** + * remove callback channels + * @param taskInstanceId taskInstanceId + */ public void remove(int taskInstanceId){ CALL_BACK_CHANNELS.remove(taskInstanceId); } + /** + * send ack + * @param taskInstanceId taskInstanceId + * @param ackCommand ackCommand + */ public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){ CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId); callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command(callbackChannel.getOpaque())); } + /** + * send result + * + * @param taskInstanceId taskInstanceId + * @param responseCommand responseCommand + */ public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){ CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId); - callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){ + callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command( + callbackChannel.getOpaque())).addListener(new ChannelFutureListener(){ @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -68,7 +100,7 @@ public class TaskInstanceCallbackService { }); } - //TODO + // TODO private Channel createChannel(){ return null; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java similarity index 73% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java index 2e5ea99b63..8ea9ccbe5b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.processor; +import com.alibaba.fastjson.JSONObject; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.ThreadUtils; @@ -28,6 +29,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; +import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -40,32 +43,56 @@ import org.slf4j.LoggerFactory; import java.util.Date; import java.util.concurrent.ExecutorService; +/** + * worker request processor + */ +public class WorkerRequestProcessor implements NettyRequestProcessor { -public class WorkerNettyRequestProcessor implements NettyRequestProcessor { - - private final Logger logger = LoggerFactory.getLogger(WorkerNettyRequestProcessor.class); + private final Logger logger = LoggerFactory.getLogger(WorkerRequestProcessor.class); + /** + * process service + */ private final ProcessService processService; + /** + * thread executor service + */ private final ExecutorService workerExecService; + /** + * worker config + */ private final WorkerConfig workerConfig; - private final TaskInstanceCallbackService taskInstanceCallbackService; + /** + * task callback service + */ + private final TaskCallbackService taskCallbackService; - public WorkerNettyRequestProcessor(ProcessService processService){ + public WorkerRequestProcessor(ProcessService processService){ this.processService = processService; - this.taskInstanceCallbackService = new TaskInstanceCallbackService(); + this.taskCallbackService = new TaskCallbackService(); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()); } @Override public void process(Channel channel, Command command) { - Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); - logger.debug("received command : {}", command); - TaskInstance taskInstance = FastJsonSerializer.deserialize(command.getBody(), TaskInstance.class); - //TODO 需要干掉,然后移到master里面。 + Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), + String.format("invalid command type : %s", command.getType())); + logger.info("received command : {}", command); + ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize( + command.getBody(), ExecuteTaskRequestCommand.class); + + String taskInstanceJson = taskRequestCommand.getTaskInstanceJson(); + + TaskInstance taskInstance = JSONObject.parseObject(taskInstanceJson, TaskInstance.class); + + taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId()); + + + //TODO this logic need add to master int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId(); Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId); // verify tenant is null @@ -77,7 +104,8 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor { String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId()); taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue); taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); - //TODO 到这里。 + //TODO end + // local execute path String execLocalPath = getExecLocalPath(taskInstance); logger.info("task instance local execute path : {} ", execLocalPath); @@ -88,11 +116,21 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor { } catch (Exception ex){ logger.error(String.format("create execLocalPath : %s", execLocalPath), ex); } + + taskCallbackService.addCallbackChannel(taskInstance.getId(), + new CallbackChannel(channel, command.getOpaque())); + // submit task - taskInstanceCallbackService.addCallbackChannel(taskInstance.getId(), new CallbackChannel(channel, command.getOpaque())); - workerExecService.submit(new TaskScheduleThread(taskInstance, processService, taskInstanceCallbackService)); + workerExecService.submit(new TaskScheduleThread(taskInstance, + processService, taskCallbackService)); } + /** + * whehter tenant is null + * @param tenant tenant + * @param taskInstance taskInstance + * @return result + */ private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) { if(tenant == null){ logger.error("tenant not exists,process instance id : {},task instance id : {}", @@ -103,6 +141,11 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor { return false; } + /** + * get execute local path + * @param taskInstance taskInstance + * @return execute local path + */ private String getExecLocalPath(TaskInstance taskInstance){ return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(), taskInstance.getProcessDefine().getId(), diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index 396b009c01..fc81638705 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -24,20 +24,36 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +/** + * worker registry + */ public class WorkerRegistry { private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class); + /** + * zookeeper registry center + */ private final ZookeeperRegistryCenter zookeeperRegistryCenter; + /** + * port + */ private final int port; + /** + * construct + * @param zookeeperRegistryCenter zookeeperRegistryCenter + * @param port port + */ public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){ this.zookeeperRegistryCenter = zookeeperRegistryCenter; this.port = port; } + /** + * registry + */ public void registry() { String address = Constants.LOCAL_ADDRESS; String localNodePath = getWorkerPath(); @@ -58,6 +74,9 @@ public class WorkerRegistry { logger.info("scheduler node : {} registry to ZK successfully.", address); } + /** + * remove registry info + */ public void unRegistry() { String address = getLocalAddress(); String localNodePath = getWorkerPath(); @@ -65,12 +84,20 @@ public class WorkerRegistry { logger.info("worker node : {} unRegistry to ZK.", address); } + /** + * get worker path + * @return + */ private String getWorkerPath() { String address = getLocalAddress(); String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address; return localNodePath; } + /** + * get local address + * @return + */ private String getLocalAddress(){ return Constants.LOCAL_ADDRESS + ":" + port; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java index 013db83761..9e0c452a80 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java @@ -235,7 +235,7 @@ public class FetchTaskThread implements Runnable{ logger.info("task : {} ready to submit to task scheduler thread",taskInstId); // submit task - workerExecService.submit(new TaskScheduleThread(taskInstance, processService)); +// workerExecService.submit(new TaskScheduleThread(taskInstance, processService)); // remove node from zk removeNodeFromTaskQueue(taskQueueStr); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index 96cb0c2246..bb233b559e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -35,10 +35,9 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; -import org.apache.dolphinscheduler.server.worker.processor.TaskInstanceCallbackService; +import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.server.worker.task.TaskProps; @@ -80,7 +79,7 @@ public class TaskScheduleThread implements Runnable { /** * task instance callback service */ - private TaskInstanceCallbackService taskInstanceCallbackService; + private TaskCallbackService taskInstanceCallbackService; /** * constructor @@ -88,7 +87,7 @@ public class TaskScheduleThread implements Runnable { * @param taskInstance task instance * @param processService process dao */ - public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskInstanceCallbackService taskInstanceCallbackService){ + public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskCallbackService taskInstanceCallbackService){ this.processService = processService; this.taskInstance = taskInstance; this.taskInstanceCallbackService = taskInstanceCallbackService; @@ -97,6 +96,9 @@ public class TaskScheduleThread implements Runnable { @Override public void run() { + // TODO Need to be removed and kept temporarily update task instance state + updateTaskState(taskInstance.getTaskType()); + ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId()); try { @@ -169,6 +171,12 @@ public class TaskScheduleThread implements Runnable { }catch (Exception e){ logger.error("task scheduler failure", e); kill(); + + //TODO Need to be removed and kept temporarily update task instance state + processService.changeTaskState(ExecutionStatus.FAILURE, + new Date(), + taskInstance.getId()); + responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); responseCommand.setEndTime(new Date()); @@ -176,6 +184,14 @@ public class TaskScheduleThread implements Runnable { taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand); } + logger.info("task instance id : {},task final status : {}", + taskInstance.getId(), + task.getExitStatus()); + // update task instance state + processService.changeTaskState(task.getExitStatus(), + new Date(), + taskInstance.getId()); + } /** @@ -342,4 +358,28 @@ public class TaskScheduleThread implements Runnable { PermissionCheck permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger); permissionCheck.checkPermission(); } + + /** + * update task state according to task type + * @param taskType + */ + private void updateTaskState(String taskType) { + // update task status is running + if(taskType.equals(TaskType.SQL.name()) || + taskType.equals(TaskType.PROCEDURE.name())){ + processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION, + taskInstance.getStartTime(), + taskInstance.getHost(), + null, + getTaskLogPath(), + taskInstance.getId()); + }else{ + processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION, + taskInstance.getStartTime(), + taskInstance.getHost(), + taskInstance.getExecutePath(), + getTaskLogPath(), + taskInstance.getId()); + } + } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java index 770ab3cff6..19a96a7be8 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java @@ -91,7 +91,7 @@ public class MasterExecThreadTest { processDefinition.setGlobalParamList(Collections.EMPTY_LIST); Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); - masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService)); + masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService,null)); // prepareProcess init dag Field dag = MasterExecThread.class.getDeclaredField("dag"); dag.setAccessible(true);