diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java index 12702527f0..1c336c89a1 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java @@ -128,4 +128,13 @@ public enum ExecutionStatus { public String getDescp() { return descp; } + + public static ExecutionStatus of(int status){ + for(ExecutionStatus es : values()){ + if(es.getCode() == status){ + return es; + } + } + throw new IllegalArgumentException("invalid status : " + status); + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 6b5063cba4..0a153e7041 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -23,7 +23,11 @@ import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.remote.NettyRemotingServer; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -90,6 +94,8 @@ public class MasterServer implements IStoppable { @Autowired private SpringApplicationContext springApplicationContext; + private NettyRemotingServer nettyRemotingServer; + /** * master server startup @@ -108,6 +114,15 @@ public class MasterServer implements IStoppable { */ @PostConstruct public void run(){ + + // + //init remoting server + NettyServerConfig serverConfig = new NettyServerConfig(); + this.nettyRemotingServer = new NettyRemotingServer(serverConfig); + this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor(processService)); + this.nettyRemotingServer.start(); + + // zkMasterClient.init(); masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java new file mode 100644 index 0000000000..32fb55facf --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master.future; + + +import org.apache.dolphinscheduler.remote.command.Command; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class TaskFuture { + + private final static Logger LOGGER = LoggerFactory.getLogger(TaskFuture.class); + + private final static ConcurrentHashMap FUTURE_TABLE = new ConcurrentHashMap<>(256); + + /** + * request unique identification + */ + private final long opaque; + + /** + * timeout + */ + private final long timeoutMillis; + + private final CountDownLatch latch = new CountDownLatch(1); + + private final long beginTimestamp = System.currentTimeMillis(); + + /** + * response command + */ + private volatile Command responseCommand; + + private volatile boolean sendOk = true; + + private volatile Throwable cause; + + public TaskFuture(long opaque, long timeoutMillis) { + this.opaque = opaque; + this.timeoutMillis = timeoutMillis; + FUTURE_TABLE.put(opaque, this); + } + + /** + * wait for response + * @return command + * @throws InterruptedException + */ + public Command waitResponse() throws InterruptedException { + this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS); + return this.responseCommand; + } + + /** + * put response + * + * @param responseCommand responseCommand + */ + public void putResponse(final Command responseCommand) { + this.responseCommand = responseCommand; + this.latch.countDown(); + FUTURE_TABLE.remove(opaque); + } + + /** + * whether timeout + * @return timeout + */ + public boolean isTimeout() { + long diff = System.currentTimeMillis() - this.beginTimestamp; + return diff > this.timeoutMillis; + } + + public static void notify(final Command responseCommand){ + TaskFuture taskFuture = FUTURE_TABLE.remove(responseCommand.getOpaque()); + if(taskFuture != null){ + taskFuture.putResponse(responseCommand); + } + } + + + public boolean isSendOK() { + return sendOk; + } + + public void setSendOk(boolean sendOk) { + this.sendOk = sendOk; + } + + public void setCause(Throwable cause) { + this.cause = cause; + } + + public Throwable getCause() { + return cause; + } + + public long getOpaque() { + return opaque; + } + + public long getTimeoutMillis() { + return timeoutMillis; + } + + public long getBeginTimestamp() { + return beginTimestamp; + } + + public Command getResponseCommand() { + return responseCommand; + } + + public void setResponseCommand(Command responseCommand) { + this.responseCommand = responseCommand; + } + + + @Override + public String toString() { + return "ResponseFuture{" + + "opaque=" + opaque + + ", timeoutMillis=" + timeoutMillis + + ", latch=" + latch + + ", beginTimestamp=" + beginTimestamp + + ", responseCommand=" + responseCommand + + ", sendOk=" + sendOk + + ", cause=" + cause + + '}'; + } + + /** + * scan future table + */ + public static void scanFutureTable(){ + final List futureList = new LinkedList<>(); + Iterator> it = FUTURE_TABLE.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry next = it.next(); + TaskFuture future = next.getValue(); + if ((future.getBeginTimestamp() + future.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) { + futureList.add(future); + it.remove(); + LOGGER.warn("remove timeout request : {}", future); + } + } + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java new file mode 100644 index 0000000000..0dd45f091e --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.processor; + +import io.netty.channel.Channel; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.utils.Preconditions; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.apache.dolphinscheduler.server.master.future.TaskFuture; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * task response processor + */ +public class TaskResponseProcessor implements NettyRequestProcessor { + + private final Logger logger = LoggerFactory.getLogger(TaskResponseProcessor.class); + + /** + * process service + */ + private final ProcessService processService; + + public TaskResponseProcessor(ProcessService processService){ + this.processService = processService; + } + + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); + logger.info("received command : {}", command); + ExecuteTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskResponseCommand.class); + processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), responseCommand.getTaskInstanceId()); + TaskFuture.notify(command); + } + + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 8104c42bd0..a261b3430a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -16,14 +16,15 @@ */ package org.apache.dolphinscheduler.server.master.runner; -import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.utils.BeanContext; import org.apache.dolphinscheduler.remote.NettyRemotingClient; -import org.apache.dolphinscheduler.remote.command.*; -import org.apache.dolphinscheduler.remote.command.log.RollViewLogRequestCommand; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; +import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.utils.Address; @@ -121,31 +122,20 @@ public class MasterBaseTaskExecThread implements Callable { // TODO send task to worker - public void sendToWorker(String taskInstanceJson){ + public void sendToWorker(TaskInstance taskInstance){ final Address address = new Address("127.0.0.1", 12346); - ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(taskInstanceJson); + ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(FastJsonSerializer.serializeToString(taskInstance)); try { - Command responseCommand = nettyRemotingClient.sendSync(address, - taskRequestCommand.convert2Command(), Integer.MAX_VALUE); - - logger.info("receive command : {}", responseCommand); - - final CommandType commandType = responseCommand.getType(); - switch (commandType){ - case EXECUTE_TASK_ACK: - ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize( - responseCommand.getBody(), ExecuteTaskAckCommand.class); - logger.info("taskAckCommand : {}",taskAckCommand); - break; - case EXECUTE_TASK_RESPONSE: - ExecuteTaskResponseCommand taskResponseCommand = FastJsonSerializer.deserialize( - responseCommand.getBody(), ExecuteTaskResponseCommand.class); - logger.info("taskResponseCommand : {}",taskResponseCommand); - break; - default: - throw new IllegalArgumentException("unknown commandType"); - } - logger.info("response result : {}",responseCommand); + Command responseCommand = nettyRemotingClient.sendSync(address, taskRequestCommand.convert2Command(), Integer.MAX_VALUE); + ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(responseCommand.getBody(), ExecuteTaskAckCommand.class); + logger.info("taskAckCommand : {}",taskAckCommand); + processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()), + taskAckCommand.getStartTime(), + taskAckCommand.getHost(), + taskAckCommand.getExecutePath(), + taskAckCommand.getLogPath(), + taskInstance.getId()); + } catch (InterruptedException | RemotingException ex) { logger.error(String.format("send command to : %s error", address), ex); } @@ -174,7 +164,7 @@ public class MasterBaseTaskExecThread implements Callable { } if(submitDB && !submitQueue){ // submit task to queue - sendToWorker(JSONObject.toJSONString(task)); + sendToWorker(task); submitQueue = true; } if(submitDB && submitQueue){ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 340a11adf2..2625d68c1e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -171,9 +171,8 @@ public class WorkerServer implements IStoppable { this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerRequestProcessor(processService)); this.nettyRemotingServer.start(); - // TODO ,because there is a heartbeat, you can reuse the heartbeat logic,worker registry -// this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort()); -// this.workerRegistry.registry(); + this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort()); + this.workerRegistry.registry(); this.zkWorkerClient.init(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index bb233b559e..349e762616 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -96,9 +96,6 @@ public class TaskScheduleThread implements Runnable { @Override public void run() { - // TODO Need to be removed and kept temporarily update task instance state - updateTaskState(taskInstance.getTaskType()); - ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId()); try { @@ -167,31 +164,14 @@ public class TaskScheduleThread implements Runnable { responseCommand.setStatus(task.getExitStatus().getCode()); responseCommand.setEndTime(new Date()); logger.info("task instance id : {},task final status : {}", taskInstance.getId(), task.getExitStatus()); - }catch (Exception e){ logger.error("task scheduler failure", e); kill(); - - //TODO Need to be removed and kept temporarily update task instance state - processService.changeTaskState(ExecutionStatus.FAILURE, - new Date(), - taskInstance.getId()); - responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); responseCommand.setEndTime(new Date()); - } finally { taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand); } - - logger.info("task instance id : {},task final status : {}", - taskInstance.getId(), - task.getExitStatus()); - // update task instance state - processService.changeTaskState(task.getExitStatus(), - new Date(), - taskInstance.getId()); - } /**