diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java new file mode 100644 index 0000000000..5c6740f50f --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.manager; + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; + +import java.util.Date; + +/** + * task event + */ +public class TaskEvent { + + public static final String ACK = "ack"; + public static final String RESPONSE = "response"; + + /** + * taskInstanceId + */ + private int taskInstanceId; + + /** + * worker address + */ + private String workerAddress; + + /** + * state + */ + private ExecutionStatus state; + + /** + * start time + */ + private Date startTime; + + /** + * end time + */ + private Date endTime; + + /** + * execute path + */ + private String executePath; + + /** + * log path + */ + private String logPath; + + /** + * processId + */ + private int processId; + + /** + * appIds + */ + private String appIds; + + /** + * ack / response + */ + private String type; + + + /** + * receive ack info + * @param state state + * @param startTime startTime + * @param workerAddress workerAddress + * @param executePath executePath + * @param logPath logPath + * @param taskInstanceId taskInstanceId + * @param type type + */ + public void receiveAck(ExecutionStatus state, + Date startTime, + String workerAddress, + String executePath, + String logPath, + int taskInstanceId, + String type){ + this.state = state; + this.startTime = startTime; + this.workerAddress = workerAddress; + this.executePath = executePath; + this.logPath = logPath; + this.taskInstanceId = taskInstanceId; + this.type = type; + } + + /** + * receive response info + * @param state state + * @param endTime endTime + * @param processId processId + * @param appIds appIds + * @param taskInstanceId taskInstanceId + * @param type type + */ + public void receiveResponse(ExecutionStatus state, + Date endTime, + int processId, + String appIds, + int taskInstanceId, + String type){ + this.state = state; + this.endTime = endTime; + this.processId = processId; + this.appIds = appIds; + this.taskInstanceId = taskInstanceId; + this.type = type; + } + + public int getTaskInstanceId() { + return taskInstanceId; + } + + public void setTaskInstanceId(int taskInstanceId) { + this.taskInstanceId = taskInstanceId; + } + + public String getWorkerAddress() { + return workerAddress; + } + + public void setWorkerAddress(String workerAddress) { + this.workerAddress = workerAddress; + } + + public ExecutionStatus getState() { + return state; + } + + public void setState(ExecutionStatus state) { + this.state = state; + } + + public Date getStartTime() { + return startTime; + } + + public void setStartTime(Date startTime) { + this.startTime = startTime; + } + + public Date getEndTime() { + return endTime; + } + + public void setEndTime(Date endTime) { + this.endTime = endTime; + } + + public String getExecutePath() { + return executePath; + } + + public void setExecutePath(String executePath) { + this.executePath = executePath; + } + + public String getLogPath() { + return logPath; + } + + public void setLogPath(String logPath) { + this.logPath = logPath; + } + + public int getProcessId() { + return processId; + } + + public void setProcessId(int processId) { + this.processId = processId; + } + + public String getAppIds() { + return appIds; + } + + public void setAppIds(String appIds) { + this.appIds = appIds; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + @Override + public String toString() { + return "TaskEvent{" + + "taskInstanceId=" + taskInstanceId + + ", workerAddress='" + workerAddress + '\'' + + ", state=" + state + + ", startTime=" + startTime + + ", endTime=" + endTime + + ", executePath='" + executePath + '\'' + + ", logPath='" + logPath + '\'' + + ", processId=" + processId + + ", appIds='" + appIds + '\'' + + ", type='" + type + '\'' + + '}'; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java new file mode 100644 index 0000000000..a2710ee48f --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.manager; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * task manager + */ +@Component +public class TaskManager { + + /** + * logger + */ + private static final Logger logger = LoggerFactory.getLogger(TaskManager.class); + + /** + * attemptQueue + */ + private final BlockingQueue attemptQueue = new LinkedBlockingQueue<>(5000); + + + /** + * process service + */ + @Autowired + private ProcessService processService; + + + @PostConstruct + public void init(){ + TaskWorker taskWorker = new TaskWorker(); + taskWorker.start(); + } + + /** + * put task to attemptQueue + * + * @param taskEvent taskEvent + */ + public void putTask(TaskEvent taskEvent){ + try { + attemptQueue.put(taskEvent); + } catch (InterruptedException e) { + logger.error("put task : {} error :{}",taskEvent,e); + } + } + + + /** + * task worker thread + */ + class TaskWorker extends Thread { + + @Override + public void run() { + + while (Stopper.isRunning()){ + try { + if (attemptQueue.size() == 0){ + Thread.sleep(Constants.SLEEP_TIME_MILLIS); + continue; + } + TaskEvent taskEvent = attemptQueue.take(); + + persist(taskEvent); + + }catch (Exception e){ + logger.error("persist task error",e); + } + } + } + + /** + * persist taskEvent + * @param taskEvent taskEvent + */ + private void persist(TaskEvent taskEvent){ + if (TaskEvent.ACK.equals(taskEvent.getType())){ + processService.changeTaskState(taskEvent.getState(), + taskEvent.getStartTime(), + taskEvent.getWorkerAddress(), + taskEvent.getExecutePath(), + taskEvent.getLogPath(), + taskEvent.getTaskInstanceId()); + }else if (TaskEvent.RESPONSE.equals(taskEvent.getType())){ + processService.changeTaskState(taskEvent.getState(), + taskEvent.getEndTime(), + taskEvent.getProcessId(), + taskEvent.getAppIds(), + taskEvent.getTaskInstanceId()); + } + } + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index 8f0b731e05..a678caddf8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -28,6 +28,8 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; +import org.apache.dolphinscheduler.server.master.manager.TaskEvent; +import org.apache.dolphinscheduler.server.master.manager.TaskManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; @@ -43,7 +45,7 @@ public class TaskAckProcessor implements NettyRequestProcessor { /** * process service */ - private final ProcessService processService; + private final TaskManager taskManager; /** * taskInstance cache manager @@ -51,7 +53,7 @@ public class TaskAckProcessor implements NettyRequestProcessor { private final TaskInstanceCacheManager taskInstanceCacheManager; public TaskAckProcessor(){ - this.processService = SpringApplicationContext.getBean(ProcessService.class); + this.taskManager = SpringApplicationContext.getBean(TaskManager.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); } @@ -69,15 +71,18 @@ public class TaskAckProcessor implements NettyRequestProcessor { taskInstanceCacheManager.cacheTaskInstance(taskAckCommand); String workerAddress = ChannelUtils.toAddress(channel).getAddress(); - /** - * change Task state - */ - processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()), + + // TaskEvent + TaskEvent taskEvent = new TaskEvent(); + taskEvent.receiveAck(ExecutionStatus.of(taskAckCommand.getStatus()), taskAckCommand.getStartTime(), workerAddress, taskAckCommand.getExecutePath(), taskAckCommand.getLogPath(), - taskAckCommand.getTaskInstanceId()); + taskAckCommand.getTaskInstanceId(), + TaskEvent.ACK); + + taskManager.putTask(taskEvent); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 93ca4abd6d..ffc5d7293c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; +import org.apache.dolphinscheduler.server.master.manager.TaskEvent; +import org.apache.dolphinscheduler.server.master.manager.TaskManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; @@ -42,7 +44,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor { /** * process service */ - private final ProcessService processService; + private final TaskManager taskManager; /** * taskInstance cache manager @@ -50,7 +52,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor { private final TaskInstanceCacheManager taskInstanceCacheManager; public TaskResponseProcessor(){ - this.processService = SpringApplicationContext.getBean(ProcessService.class); + this.taskManager = SpringApplicationContext.getBean(TaskManager.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); } @@ -70,11 +72,16 @@ public class TaskResponseProcessor implements NettyRequestProcessor { taskInstanceCacheManager.cacheTaskInstance(responseCommand); - processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()), + // TaskEvent + TaskEvent taskEvent = new TaskEvent(); + taskEvent.receiveResponse(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), responseCommand.getProcessId(), responseCommand.getAppIds(), - responseCommand.getTaskInstanceId()); + responseCommand.getTaskInstanceId(), + TaskEvent.RESPONSE); + + taskManager.putTask(taskEvent); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 80ba649fc8..ed476133ca 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -24,6 +24,7 @@ import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; @@ -86,6 +87,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { String contextJson = taskRequestCommand.getTaskExecutionContext(); TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class); + taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort()); // local execute path String execLocalPath = getExecLocalPath(taskExecutionContext);