From 62f7d21bdaa003eb2a8368db7a86c557b95f452d Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Tue, 24 Mar 2020 15:12:48 +0800 Subject: [PATCH] add async queue and new a thread reslove taskResponse is faster than taskAck to db (#2297) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * add TaskInstanceCacheManager receive Worker report result * TaskInstance setExecutePath * add TaskInstanceCacheManager to receive Worker Task result report * TaskInstanceCacheManager add remove method * add license * add dispatcht task method * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * 1,worker TaskPros use TaskExecutionContext replase 2,Master kill Task , KillTaskProcessor modify * worker remove db * ShellTask modify * master persistence processId and appIds * master persistence processId and appIds * master add kill task logic * master add kill task logic * master add kill task logic * javadoc error modify * remove chinese log * executeDirectly method add Override * remote module modify * TaskKillResponseProcessor command type modify * create buildKillCommand * host add host:port format * host add host:port format * TaskAckProcessor modify * TaskAckProcessor modify * task prioriry refator * remove ITaskQueue * task prioriry refator * remove ITaskQueue * TaskPriority refactor * remove logs * WorkerServer refactor * MasterSchedulerService modify * WorkerConfig listen port modify * modify master and worker listen port * cancelTaskInstance set TaskExecutionContext host,logPath,executePath * cancelTaskInstance set TaskExecutionContext host,logPath,executePath * Encapsulate the parameters required by sqltask * 1,Encapsulate the parameters required by sqltask 2,SQLTask optimization * AbstractTask modify * ProcedureTask optimization * MasterSchedulerService modify * TaskUpdateQueueConsumer modify * test * DataxTask process run debug * DataxTask process run debug * add protobuf dependency,MR、Spark task etc need this * TaskUpdateQueueConsumer modify * TaskExecutionContextBuilder set TaskInstance workgroup * WorkerGroupService queryAllGroup modify query available work group * 1,get workergroup from zk modify 2,SpringConnectionFactory repeat load modify * master and worker register ip use OSUtils.getHost() * ProcessInstance host set ip:port format * worker fault tolerance modify * Constants and .env modify * master fault tolerant bug modify * UT add pom.xml * timing online modify * when taskResponse is faster than taskAck to db,task state will error add async queue and new a thread reslove this problem * TaskExecutionContext set host Co-authored-by: qiaozhanwei --- .../server/master/manager/TaskEvent.java | 227 ++++++++++++++++++ .../server/master/manager/TaskManager.java | 121 ++++++++++ .../master/processor/TaskAckProcessor.java | 19 +- .../processor/TaskResponseProcessor.java | 15 +- .../processor/TaskExecuteProcessor.java | 2 + 5 files changed, 373 insertions(+), 11 deletions(-) create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java new file mode 100644 index 0000000000..5c6740f50f --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.manager; + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; + +import java.util.Date; + +/** + * task event + */ +public class TaskEvent { + + public static final String ACK = "ack"; + public static final String RESPONSE = "response"; + + /** + * taskInstanceId + */ + private int taskInstanceId; + + /** + * worker address + */ + private String workerAddress; + + /** + * state + */ + private ExecutionStatus state; + + /** + * start time + */ + private Date startTime; + + /** + * end time + */ + private Date endTime; + + /** + * execute path + */ + private String executePath; + + /** + * log path + */ + private String logPath; + + /** + * processId + */ + private int processId; + + /** + * appIds + */ + private String appIds; + + /** + * ack / response + */ + private String type; + + + /** + * receive ack info + * @param state state + * @param startTime startTime + * @param workerAddress workerAddress + * @param executePath executePath + * @param logPath logPath + * @param taskInstanceId taskInstanceId + * @param type type + */ + public void receiveAck(ExecutionStatus state, + Date startTime, + String workerAddress, + String executePath, + String logPath, + int taskInstanceId, + String type){ + this.state = state; + this.startTime = startTime; + this.workerAddress = workerAddress; + this.executePath = executePath; + this.logPath = logPath; + this.taskInstanceId = taskInstanceId; + this.type = type; + } + + /** + * receive response info + * @param state state + * @param endTime endTime + * @param processId processId + * @param appIds appIds + * @param taskInstanceId taskInstanceId + * @param type type + */ + public void receiveResponse(ExecutionStatus state, + Date endTime, + int processId, + String appIds, + int taskInstanceId, + String type){ + this.state = state; + this.endTime = endTime; + this.processId = processId; + this.appIds = appIds; + this.taskInstanceId = taskInstanceId; + this.type = type; + } + + public int getTaskInstanceId() { + return taskInstanceId; + } + + public void setTaskInstanceId(int taskInstanceId) { + this.taskInstanceId = taskInstanceId; + } + + public String getWorkerAddress() { + return workerAddress; + } + + public void setWorkerAddress(String workerAddress) { + this.workerAddress = workerAddress; + } + + public ExecutionStatus getState() { + return state; + } + + public void setState(ExecutionStatus state) { + this.state = state; + } + + public Date getStartTime() { + return startTime; + } + + public void setStartTime(Date startTime) { + this.startTime = startTime; + } + + public Date getEndTime() { + return endTime; + } + + public void setEndTime(Date endTime) { + this.endTime = endTime; + } + + public String getExecutePath() { + return executePath; + } + + public void setExecutePath(String executePath) { + this.executePath = executePath; + } + + public String getLogPath() { + return logPath; + } + + public void setLogPath(String logPath) { + this.logPath = logPath; + } + + public int getProcessId() { + return processId; + } + + public void setProcessId(int processId) { + this.processId = processId; + } + + public String getAppIds() { + return appIds; + } + + public void setAppIds(String appIds) { + this.appIds = appIds; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + @Override + public String toString() { + return "TaskEvent{" + + "taskInstanceId=" + taskInstanceId + + ", workerAddress='" + workerAddress + '\'' + + ", state=" + state + + ", startTime=" + startTime + + ", endTime=" + endTime + + ", executePath='" + executePath + '\'' + + ", logPath='" + logPath + '\'' + + ", processId=" + processId + + ", appIds='" + appIds + '\'' + + ", type='" + type + '\'' + + '}'; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java new file mode 100644 index 0000000000..a2710ee48f --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.manager; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * task manager + */ +@Component +public class TaskManager { + + /** + * logger + */ + private static final Logger logger = LoggerFactory.getLogger(TaskManager.class); + + /** + * attemptQueue + */ + private final BlockingQueue attemptQueue = new LinkedBlockingQueue<>(5000); + + + /** + * process service + */ + @Autowired + private ProcessService processService; + + + @PostConstruct + public void init(){ + TaskWorker taskWorker = new TaskWorker(); + taskWorker.start(); + } + + /** + * put task to attemptQueue + * + * @param taskEvent taskEvent + */ + public void putTask(TaskEvent taskEvent){ + try { + attemptQueue.put(taskEvent); + } catch (InterruptedException e) { + logger.error("put task : {} error :{}",taskEvent,e); + } + } + + + /** + * task worker thread + */ + class TaskWorker extends Thread { + + @Override + public void run() { + + while (Stopper.isRunning()){ + try { + if (attemptQueue.size() == 0){ + Thread.sleep(Constants.SLEEP_TIME_MILLIS); + continue; + } + TaskEvent taskEvent = attemptQueue.take(); + + persist(taskEvent); + + }catch (Exception e){ + logger.error("persist task error",e); + } + } + } + + /** + * persist taskEvent + * @param taskEvent taskEvent + */ + private void persist(TaskEvent taskEvent){ + if (TaskEvent.ACK.equals(taskEvent.getType())){ + processService.changeTaskState(taskEvent.getState(), + taskEvent.getStartTime(), + taskEvent.getWorkerAddress(), + taskEvent.getExecutePath(), + taskEvent.getLogPath(), + taskEvent.getTaskInstanceId()); + }else if (TaskEvent.RESPONSE.equals(taskEvent.getType())){ + processService.changeTaskState(taskEvent.getState(), + taskEvent.getEndTime(), + taskEvent.getProcessId(), + taskEvent.getAppIds(), + taskEvent.getTaskInstanceId()); + } + } + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index 8f0b731e05..a678caddf8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -28,6 +28,8 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; +import org.apache.dolphinscheduler.server.master.manager.TaskEvent; +import org.apache.dolphinscheduler.server.master.manager.TaskManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; @@ -43,7 +45,7 @@ public class TaskAckProcessor implements NettyRequestProcessor { /** * process service */ - private final ProcessService processService; + private final TaskManager taskManager; /** * taskInstance cache manager @@ -51,7 +53,7 @@ public class TaskAckProcessor implements NettyRequestProcessor { private final TaskInstanceCacheManager taskInstanceCacheManager; public TaskAckProcessor(){ - this.processService = SpringApplicationContext.getBean(ProcessService.class); + this.taskManager = SpringApplicationContext.getBean(TaskManager.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); } @@ -69,15 +71,18 @@ public class TaskAckProcessor implements NettyRequestProcessor { taskInstanceCacheManager.cacheTaskInstance(taskAckCommand); String workerAddress = ChannelUtils.toAddress(channel).getAddress(); - /** - * change Task state - */ - processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()), + + // TaskEvent + TaskEvent taskEvent = new TaskEvent(); + taskEvent.receiveAck(ExecutionStatus.of(taskAckCommand.getStatus()), taskAckCommand.getStartTime(), workerAddress, taskAckCommand.getExecutePath(), taskAckCommand.getLogPath(), - taskAckCommand.getTaskInstanceId()); + taskAckCommand.getTaskInstanceId(), + TaskEvent.ACK); + + taskManager.putTask(taskEvent); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 93ca4abd6d..ffc5d7293c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; +import org.apache.dolphinscheduler.server.master.manager.TaskEvent; +import org.apache.dolphinscheduler.server.master.manager.TaskManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; @@ -42,7 +44,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor { /** * process service */ - private final ProcessService processService; + private final TaskManager taskManager; /** * taskInstance cache manager @@ -50,7 +52,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor { private final TaskInstanceCacheManager taskInstanceCacheManager; public TaskResponseProcessor(){ - this.processService = SpringApplicationContext.getBean(ProcessService.class); + this.taskManager = SpringApplicationContext.getBean(TaskManager.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); } @@ -70,11 +72,16 @@ public class TaskResponseProcessor implements NettyRequestProcessor { taskInstanceCacheManager.cacheTaskInstance(responseCommand); - processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()), + // TaskEvent + TaskEvent taskEvent = new TaskEvent(); + taskEvent.receiveResponse(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), responseCommand.getProcessId(), responseCommand.getAppIds(), - responseCommand.getTaskInstanceId()); + responseCommand.getTaskInstanceId(), + TaskEvent.RESPONSE); + + taskManager.putTask(taskEvent); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 80ba649fc8..ed476133ca 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -24,6 +24,7 @@ import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; @@ -86,6 +87,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { String contextJson = taskRequestCommand.getTaskExecutionContext(); TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class); + taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort()); // local execute path String execLocalPath = getExecLocalPath(taskExecutionContext);