From 726706835d41c2dac378e2ef52b8886d8ff77398 Mon Sep 17 00:00:00 2001 From: Tboy Date: Wed, 26 Feb 2020 22:11:14 +0800 Subject: [PATCH] Refactor worker (#10) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Refactor worker (#2000) * Refactor worker (#2) * Refactor worker (#1993) * Refactor worker (#1) * add TaskResponseProcessor (#1983) * 1, master persistent task 2. extract master and worker communication model (#1992) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error Co-authored-by: qiaozhanwei * updates Co-authored-by: qiaozhanwei * TaskExecutionContext create modify (#1994) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify Co-authored-by: qiaozhanwei * updates * add- register processor Co-authored-by: qiaozhanwei * buildAckCommand taskInstanceId not set modify (#2002) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify (#2004) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment (#2006) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type (#2012) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * Refactor worker (#2018) * Refactor worker (#7) * Refactor worker (#2000) * Refactor worker (#2) * Refactor worker (#1993) * Refactor worker (#1) * add TaskResponseProcessor (#1983) * 1, master persistent task 2. extract master and worker communication model (#1992) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error Co-authored-by: qiaozhanwei * updates Co-authored-by: qiaozhanwei * TaskExecutionContext create modify (#1994) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify Co-authored-by: qiaozhanwei * updates * add- register processor Co-authored-by: qiaozhanwei * buildAckCommand taskInstanceId not set modify (#2002) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify (#2004) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment (#2006) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type (#2012) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type Co-authored-by: qiaozhanwei * Refactor worker (#8) * Refactor worker (#2000) * Refactor worker (#2) * Refactor worker (#1993) * Refactor worker (#1) * add TaskResponseProcessor (#1983) * 1, master persistent task 2. extract master and worker communication model (#1992) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error Co-authored-by: qiaozhanwei * updates Co-authored-by: qiaozhanwei * TaskExecutionContext create modify (#1994) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify Co-authored-by: qiaozhanwei * updates * add- register processor Co-authored-by: qiaozhanwei * buildAckCommand taskInstanceId not set modify (#2002) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify (#2004) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment (#2006) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type (#2012) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type Co-authored-by: qiaozhanwei * add kill command Co-authored-by: qiaozhanwei * add TaskInstanceCacheManager receive Worker report result,modify master polling db transfrom to cache (#2021) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * add TaskInstanceCacheManager receive Worker report result * TaskInstance setExecutePath * add TaskInstanceCacheManager to receive Worker Task result report * TaskInstanceCacheManager add remove method * add license * add dispatcht task method * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access Co-authored-by: qiaozhanwei --- .../common/utils/CommonUtils.java | 2 +- .../dao/entity/TaskInstance.java | 22 +- .../command/ExecuteTaskResponseCommand.java | 2 +- .../remote/entity/TaskExecutionContext.java | 15 +- .../builder/TaskExecutionContextBuilder.java | 1 + .../cache/TaskInstanceCacheManager.java | 64 ++++++ .../impl/TaskInstanceCacheManagerImpl.java | 115 ++++++++++ .../master/processor/TaskAckProcessor.java | 13 +- .../processor/TaskResponseProcessor.java | 18 +- .../runner/MasterBaseTaskExecThread.java | 79 +++++-- .../master/runner/MasterExecThread.java | 4 +- .../master/runner/MasterTaskExecThread.java | 26 ++- .../processor/TaskExecuteProcessor.java | 1 + .../worker/processor/TaskKillProcessor.java | 20 +- .../worker/runner/TaskExecuteThread.java | 7 +- .../worker/task/AbstractCommandExecutor.java | 214 +++++++++--------- .../server/worker/task/AbstractTask.java | 30 ++- .../server/worker/task/AbstractYarnTask.java | 20 +- .../worker/task/CommandExecuteResult.java | 69 ++++++ .../worker/task/PythonCommandExecutor.java | 13 +- .../worker/task/ShellCommandExecutor.java | 17 +- .../server/worker/task/TaskProps.java | 84 ++++--- .../server/worker/task/datax/DataxTask.java | 20 +- .../worker/task/dependent/DependentTask.java | 2 +- .../server/worker/task/flink/FlinkTask.java | 2 +- .../server/worker/task/http/HttpTask.java | 2 +- .../server/worker/task/python/PythonTask.java | 21 +- .../server/worker/task/shell/ShellTask.java | 16 +- .../server/worker/task/sql/SqlTask.java | 8 +- .../shell/ShellCommandExecutorTest.java | 4 +- .../server/worker/sql/SqlExecutorTest.java | 6 +- .../worker/task/datax/DataxTaskTest.java | 12 +- .../task/dependent/DependentTaskTest.java | 2 +- .../home/pages/dag/_source/formModel/log.vue | 4 +- 34 files changed, 678 insertions(+), 257 deletions(-) create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/CommandExecuteResult.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java index b4b89bfe26..d15ede2a27 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java @@ -26,7 +26,7 @@ import java.io.File; /** * common utils */ -public class CommonUtils { +public class CommonUtils { private CommonUtils() { throw new IllegalStateException("CommonUtils class"); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 428f5d4cad..92cb3af220 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -462,6 +462,14 @@ public class TaskInstance implements Serializable { this.workerGroupId = workerGroupId; } + public String getDependentResult() { + return dependentResult; + } + + public void setDependentResult(String dependentResult) { + this.dependentResult = dependentResult; + } + @Override public String toString() { return "TaskInstance{" + @@ -481,27 +489,19 @@ public class TaskInstance implements Serializable { ", logPath='" + logPath + '\'' + ", retryTimes=" + retryTimes + ", alertFlag=" + alertFlag + - ", flag=" + flag + ", processInstance=" + processInstance + ", processDefine=" + processDefine + ", pid=" + pid + ", appLink='" + appLink + '\'' + ", flag=" + flag + - ", dependency=" + dependency + + ", dependency='" + dependency + '\'' + ", duration=" + duration + ", maxRetryTimes=" + maxRetryTimes + ", retryInterval=" + retryInterval + ", taskInstancePriority=" + taskInstancePriority + ", processInstancePriority=" + processInstancePriority + - ", workGroupId=" + workerGroupId + + ", dependentResult='" + dependentResult + '\'' + + ", workerGroupId=" + workerGroupId + '}'; } - - public String getDependentResult() { - return dependentResult; - } - - public void setDependentResult(String dependentResult) { - this.dependentResult = dependentResult; - } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java index 6bbc2f76a0..d8253a88b9 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; /** * execute task response command */ public class ExecuteTaskResponseCommand implements Serializable { public ExecuteTaskResponseCommand() { } public ExecuteTaskResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } /** * package response command * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; /** * execute task response command */ public class ExecuteTaskResponseCommand implements Serializable { public ExecuteTaskResponseCommand() { } public ExecuteTaskResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } /** * package response command * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + ", endTime=" + endTime + '}'; } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java index e3da43a670..711e3d8c23 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java @@ -51,12 +51,16 @@ public class TaskExecutionContext implements Serializable{ */ private String executePath; + /** + * log path + */ + private String logPath; + /** * task json */ private String taskJson; - /** * process instance id */ @@ -228,6 +232,14 @@ public class TaskExecutionContext implements Serializable{ this.cmdTypeIfComplement = cmdTypeIfComplement; } + public String getLogPath() { + return logPath; + } + + public void setLogPath(String logPath) { + this.logPath = logPath; + } + @Override public String toString() { return "TaskExecutionContext{" + @@ -236,6 +248,7 @@ public class TaskExecutionContext implements Serializable{ ", startTime=" + startTime + ", taskType='" + taskType + '\'' + ", executePath='" + executePath + '\'' + + ", logPath='" + logPath + '\'' + ", taskJson='" + taskJson + '\'' + ", processInstanceId=" + processInstanceId + ", scheduleTime=" + scheduleTime + diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index a3ddd29e9f..8cdd13ef7f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -44,6 +44,7 @@ public class TaskExecutionContextBuilder { taskExecutionContext.setTaskName(taskInstance.getName()); taskExecutionContext.setStartTime(taskInstance.getStartTime()); taskExecutionContext.setTaskType(taskInstance.getTaskType()); + taskExecutionContext.setLogPath(taskInstance.getLogPath()); taskExecutionContext.setExecutePath(taskInstance.getExecutePath()); taskExecutionContext.setTaskJson(taskInstance.getTaskJson()); return this; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java new file mode 100644 index 0000000000..98d2a24726 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.cache; + +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; +import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; +import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; + +/** + * task instance state manager + */ +public interface TaskInstanceCacheManager { + + /** + * get taskInstance by taskInstance id + * + * @param taskInstanceId taskInstanceId + * @return taskInstance + */ + TaskInstance getByTaskInstanceId(Integer taskInstanceId); + + /** + * cache taskInstance + * + * @param taskExecutionContext taskExecutionContext + */ + void cacheTaskInstance(TaskExecutionContext taskExecutionContext); + + /** + * cache taskInstance + * + * @param taskAckCommand taskAckCommand + */ + void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand); + + /** + * cache taskInstance + * + * @param executeTaskResponseCommand executeTaskResponseCommand + */ + void cacheTaskInstance(ExecuteTaskResponseCommand executeTaskResponseCommand); + + /** + * remove taskInstance by taskInstanceId + * @param taskInstanceId taskInstanceId + */ + void removeByTaskInstanceId(Integer taskInstanceId); +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java new file mode 100644 index 0000000000..634a6a991a --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master.cache.impl; + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; +import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; +import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * taskInstance state manager + */ +@Component +public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager { + + /** + * taskInstance caceh + */ + private Map taskInstanceCache = new ConcurrentHashMap<>(); + + + /** + * get taskInstance by taskInstance id + * + * @param taskInstanceId taskInstanceId + * @return taskInstance + */ + @Override + public TaskInstance getByTaskInstanceId(Integer taskInstanceId) { + return taskInstanceCache.get(taskInstanceId); + } + + /** + * cache taskInstance + * + * @param taskExecutionContext taskExecutionContext + */ + @Override + public void cacheTaskInstance(TaskExecutionContext taskExecutionContext) { + TaskInstance taskInstance = getByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + if (taskInstance == null){ + taskInstance = new TaskInstance(); + } + taskInstance.setId(taskExecutionContext.getTaskInstanceId()); + taskInstance.setName(taskExecutionContext.getTaskName()); + taskInstance.setStartTime(taskExecutionContext.getStartTime()); + taskInstance.setTaskType(taskInstance.getTaskType()); + taskInstance.setExecutePath(taskInstance.getExecutePath()); + taskInstance.setTaskJson(taskInstance.getTaskJson()); + } + + /** + * cache taskInstance + * + * @param taskAckCommand taskAckCommand + */ + @Override + public void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand) { + TaskInstance taskInstance = getByTaskInstanceId(taskAckCommand.getTaskInstanceId()); + if (taskInstance == null){ + taskInstance = new TaskInstance(); + } + taskInstance.setState(ExecutionStatus.of(taskAckCommand.getStatus())); + taskInstance.setStartTime(taskAckCommand.getStartTime()); + taskInstance.setHost(taskAckCommand.getHost()); + taskInstance.setExecutePath(taskAckCommand.getExecutePath()); + taskInstance.setLogPath(taskAckCommand.getLogPath()); + } + + /** + * cache taskInstance + * + * @param executeTaskResponseCommand executeTaskResponseCommand + */ + @Override + public void cacheTaskInstance(ExecuteTaskResponseCommand executeTaskResponseCommand) { + TaskInstance taskInstance = getByTaskInstanceId(executeTaskResponseCommand.getTaskInstanceId()); + if (taskInstance == null){ + taskInstance = new TaskInstance(); + } + taskInstance.setState(ExecutionStatus.of(executeTaskResponseCommand.getStatus())); + taskInstance.setEndTime(executeTaskResponseCommand.getEndTime()); + } + + /** + * remove taskInstance by taskInstanceId + * @param taskInstanceId taskInstanceId + */ + @Override + public void removeByTaskInstanceId(Integer taskInstanceId) { + taskInstanceCache.remove(taskInstanceId); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index 83da3b03ee..cf3857995c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; +import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; @@ -42,8 +44,14 @@ public class TaskAckProcessor implements NettyRequestProcessor { */ private final ProcessService processService; + /** + * taskInstance cache manager + */ + private final TaskInstanceCacheManager taskInstanceCacheManager; + public TaskAckProcessor(){ this.processService = SpringApplicationContext.getBean(ProcessService.class); + this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); } /** @@ -55,7 +63,9 @@ public class TaskAckProcessor implements NettyRequestProcessor { public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == command.getType(), String.format("invalid command type : %s", command.getType())); ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class); - logger.info("taskAckCommand : {}",taskAckCommand); + logger.info("taskAckCommand : {}", taskAckCommand); + + taskInstanceCacheManager.cacheTaskInstance(taskAckCommand); /** * change Task state */ @@ -65,6 +75,7 @@ public class TaskAckProcessor implements NettyRequestProcessor { taskAckCommand.getExecutePath(), taskAckCommand.getLogPath(), taskAckCommand.getTaskInstanceId()); + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index bbc710c92c..d6279c625e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; +import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; @@ -42,8 +44,14 @@ public class TaskResponseProcessor implements NettyRequestProcessor { */ private final ProcessService processService; + /** + * taskInstance cache manager + */ + private final TaskInstanceCacheManager taskInstanceCacheManager; + public TaskResponseProcessor(){ this.processService = SpringApplicationContext.getBean(ProcessService.class); + this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); } /** @@ -56,9 +64,15 @@ public class TaskResponseProcessor implements NettyRequestProcessor { @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); - logger.info("received command : {}", command); + ExecuteTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskResponseCommand.class); - processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), responseCommand.getTaskInstanceId()); + logger.info("received command : {}", responseCommand); + + taskInstanceCacheManager.cacheTaskInstance(responseCommand); + + processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()), + responseCommand.getEndTime(), + responseCommand.getTaskInstanceId()); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 9bf69ddec8..d2a54396ff 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; @@ -39,6 +40,8 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; +import static org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_QUEUE; + /** * master task exec base class */ @@ -163,6 +166,7 @@ public class MasterBaseTaskExecThread implements Callable { String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId()); taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue); taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); + taskInstance.setExecutePath(getExecLocalPath(taskInstance)); return TaskExecutionContextBuilder.get() .buildTaskInstanceRelatedInfo(taskInstance) @@ -172,6 +176,19 @@ public class MasterBaseTaskExecThread implements Callable { } + /** + * get execute local path + * + * @return execute local path + */ + private String getExecLocalPath(TaskInstance taskInstance){ + return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(), + taskInstance.getProcessDefine().getId(), + taskInstance.getProcessInstance().getId(), + taskInstance.getId()); + } + + /** * whehter tenant is null * @param tenant tenant @@ -187,19 +204,6 @@ public class MasterBaseTaskExecThread implements Callable { } return false; } - - /** - * get execute local path - * - * @return execute local path - */ - private String getExecLocalPath(TaskInstance taskInstance){ - return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(), - taskInstance.getProcessDefine().getId(), - taskInstance.getProcessInstance().getId(), - taskInstance.getId()); - } - /** * submit master base task exec thread * @return TaskInstance @@ -210,7 +214,7 @@ public class MasterBaseTaskExecThread implements Callable { int retryTimes = 1; boolean submitDB = false; - boolean submitQueue = false; + boolean submitTask = false; TaskInstance task = null; while (retryTimes <= commitRetryTimes){ try { @@ -221,27 +225,60 @@ public class MasterBaseTaskExecThread implements Callable { submitDB = true; } } - if(submitDB && !submitQueue){ - // submit task to queue - submitQueue = dispatch(task); + if(submitDB && !submitTask){ + // dispatcht task + submitTask = dispatchtTask(task); } - if(submitDB && submitQueue){ + if(submitDB && submitTask){ return task; } if(!submitDB){ logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes); - }else if(!submitQueue){ - logger.error("task commit to queue failed , taskId {} has already retry {} times, please check the queue", taskInstance.getId(), retryTimes); + }else if(!submitTask){ + logger.error("task commit failed , taskId {} has already retry {} times, please check", taskInstance.getId(), retryTimes); } Thread.sleep(commitRetryInterval); } catch (Exception e) { - logger.error("task commit to mysql and queue failed",e); + logger.error("task commit to mysql and dispatcht task failed",e); } retryTimes += 1; } return task; } + + + /** + * dispatcht task + * @param taskInstance taskInstance + * @return whether submit task success + */ + public Boolean dispatchtTask(TaskInstance taskInstance) { + + try{ + if(taskInstance.isSubProcess()){ + return true; + } + if(taskInstance.getState().typeIsFinished()){ + logger.info(String.format("submit task , but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString())); + return true; + } + // task cannot submit when running + if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){ + logger.info(String.format("submit to task, but task [%s] state already be running. ", taskInstance.getName())); + return true; + } + logger.info("task ready to submit: {}" , taskInstance); + boolean submitTask = dispatch(taskInstance); + logger.info(String.format("master submit success, task : %s", taskInstance.getName()) ); + return submitTask; + }catch (Exception e){ + logger.error("submit task Exception: ", e); + logger.error("task error : %s", JSONUtils.toJson(taskInstance)); + return false; + } + } + /** * submit wait complete * @return true diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index d0f49272bd..576dc76ba7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -404,7 +404,7 @@ public class MasterExecThread implements Runnable { } /** - * submit task to execute + * TODO submit task to execute * @param taskInstance task instance * @return TaskInstance */ @@ -873,7 +873,7 @@ public class MasterExecThread implements Runnable { } logger.info("task :{}, id:{} complete, state is {} ", task.getName(), task.getId(), task.getState().toString()); - // node success , post node submit + //TODO node success , post node submit if(task.getState() == ExecutionStatus.SUCCESS){ completeTaskList.put(task.getName(), task); submitPostNode(task.getName()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index 66d1a3f4c2..deac503205 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -26,6 +26,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; +import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +46,12 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { */ private static final Logger logger = LoggerFactory.getLogger(MasterTaskExecThread.class); + + /** + * taskInstance state manager + */ + private TaskInstanceCacheManager taskInstanceCacheManager; + /** * constructor of MasterTaskExecThread * @param taskInstance task instance @@ -50,6 +59,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { */ public MasterTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){ super(taskInstance, processInstance); + this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); } /** @@ -67,7 +77,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { private Boolean alreadyKilled = false; /** - * submit task instance and wait complete + * TODO submit task instance and wait complete * @return true is task quit is true */ @Override @@ -89,12 +99,16 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { } /** - * wait task quit + * TODO 在这里轮询数据库 + * TODO wait task quit * @return true if task quit success */ public Boolean waitTaskQuit(){ // query new state - taskInstance = processService.findTaskInstanceById(taskInstance.getId()); + taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId()); + if (taskInstance == null){ + taskInstance = processService.findTaskInstanceById(taskInstance.getId()); + } logger.info("wait task: process id: {}, task id:{}, task name:{} complete", this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName()); // task time out @@ -119,6 +133,8 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { } // task instance finished if (taskInstance.getState().typeIsFinished()){ + // if task is final result , then remove taskInstance from cache + taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId()); break; } if(checkTimeout){ @@ -133,7 +149,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { } } // updateProcessInstance task instance - taskInstance = processService.findTaskInstanceById(taskInstance.getId()); + taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId()); processInstance = processService.findProcessInstanceById(processInstance.getId()); Thread.sleep(Constants.SLEEP_TIME_MILLIS); } catch (Exception e) { @@ -149,6 +165,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { /** + * TODO Kill 任务 * task instance add queue , waiting worker to kill */ private void cancelTaskInstance(){ @@ -162,6 +179,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { } String queueValue = String.format("%s-%d", host, taskInstance.getId()); + // TODO 这里写 taskQueue.sadd(DOLPHINSCHEDULER_TASKS_KILL, queueValue); logger.info("master add kill task :{} id:{} to kill queue", diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 818e223d3d..37dcc2c00b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -155,6 +155,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { }else{ ackCommand.setExecutePath(taskExecutionContext.getExecutePath()); } + taskExecutionContext.setLogPath(ackCommand.getLogPath()); return ackCommand; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 29853743fa..ce54eff944 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -42,6 +42,12 @@ public class TaskKillProcessor implements NettyRequestProcessor { private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class); + /** + * task kill process + * + * @param channel channel + * @param command command + */ @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.KILL_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); @@ -50,7 +56,11 @@ public class TaskKillProcessor implements NettyRequestProcessor { doKill(killCommand); } - + /** + * kill task logic + * + * @param killCommand killCommand + */ private void doKill(KillTaskRequestCommand killCommand){ try { if(killCommand.getProcessId() == 0 ){ @@ -71,6 +81,14 @@ public class TaskKillProcessor implements NettyRequestProcessor { } } + /** + * kill yarn job + * + * @param host host + * @param logPath logPath + * @param executePath executePath + * @param tenantCode tenantCode + */ public void killYarnJob(String host, String logPath, String executePath, String tenantCode) { try { Thread.sleep(Constants.SLEEP_TIME_MILLIS); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index be89401961..46e838aae5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -103,7 +103,6 @@ public class TaskExecuteThread implements Runnable { // set task props TaskProps taskProps = new TaskProps(taskNode.getParams(), - taskExecutionContext.getExecutePath(), taskExecutionContext.getScheduleTime(), taskExecutionContext.getTaskName(), taskExecutionContext.getTaskType(), @@ -114,7 +113,10 @@ public class TaskExecuteThread implements Runnable { taskExecutionContext.getStartTime(), getGlobalParamsMap(), null, - CommandType.of(taskExecutionContext.getCmdTypeIfComplement())); + CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), + OSUtils.getHost(), + taskExecutionContext.getLogPath(), + taskExecutionContext.getExecutePath()); // set task timeout setTaskTimeout(taskProps, taskNode); @@ -142,7 +144,6 @@ public class TaskExecuteThread implements Runnable { // task result process task.after(); - // responseCommand.setStatus(task.getExitStatus().getCode()); responseCommand.setEndTime(new Date()); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index c473f3a2aa..c2e9e28fa3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -18,13 +18,17 @@ package org.apache.dolphinscheduler.server.worker.task; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; + import org.slf4j.Logger; import java.io.*; @@ -37,6 +41,8 @@ import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.dolphinscheduler.common.Constants.*; + /** * abstract command executor */ @@ -69,7 +75,7 @@ public abstract class AbstractCommandExecutor { /** * task appId */ - protected final int taskInstId; + protected final int taskInstanceId; /** * tenant code , execute task linux user @@ -101,92 +107,36 @@ public abstract class AbstractCommandExecutor { */ protected final List logBuffer; + /** + * log path + */ + private String logPath; + + /** + * execute path + */ + private String executePath; public AbstractCommandExecutor(Consumer> logHandler, - String taskDir, String taskAppId,int taskInstId,String tenantCode, String envFile, - Date startTime, int timeout, Logger logger){ + String taskDir, + String taskAppId, + Integer taskInstanceId, + String tenantCode, String envFile, + Date startTime, int timeout, String logPath,String executePath,Logger logger){ this.logHandler = logHandler; this.taskDir = taskDir; this.taskAppId = taskAppId; - this.taskInstId = taskInstId; + this.taskInstanceId = taskInstanceId; this.tenantCode = tenantCode; this.envFile = envFile; this.startTime = startTime; this.timeout = timeout; + this.logPath = logPath; + this.executePath = executePath; this.logger = logger; this.logBuffer = Collections.synchronizedList(new ArrayList<>()); } - /** - * task specific execution logic - * - * @param execCommand exec command - * @param processService process dao - * @return exit status code - */ - public int run(String execCommand, ProcessService processService) { - int exitStatusCode; - - try { - if (StringUtils.isEmpty(execCommand)) { - exitStatusCode = 0; - return exitStatusCode; - } - - String commandFilePath = buildCommandFilePath(); - - // create command file if not exists - createCommandFileIfNotExists(execCommand, commandFilePath); - - //build process - buildProcess(commandFilePath); - - // parse process output - parseProcessOutput(process); - - // get process id - int pid = getProcessId(process); - - processService.updatePidByTaskInstId(taskInstId, pid, ""); - - logger.info("process start, process id is: {}", pid); - - // if timeout occurs, exit directly - long remainTime = getRemaintime(); - - // waiting for the run to finish - boolean status = process.waitFor(remainTime, TimeUnit.SECONDS); - - if (status) { - exitStatusCode = process.exitValue(); - logger.info("process has exited, work dir:{}, pid:{} ,exitStatusCode:{}", taskDir, pid,exitStatusCode); - //update process state to db - exitStatusCode = updateState(processService, exitStatusCode, pid, taskInstId); - - } else { - TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId); - if (taskInstance == null) { - logger.error("task instance id:{} not exist", taskInstId); - } else { - ProcessUtils.kill(taskInstance); - } - exitStatusCode = -1; - logger.warn("process timeout, work dir:{}, pid:{}", taskDir, pid); - } - - } catch (InterruptedException e) { - exitStatusCode = -1; - logger.error(String.format("interrupt exception: {}, task may be cancelled or killed",e.getMessage()), e); - throw new RuntimeException("interrupt exception. exitCode is : " + exitStatusCode); - } catch (Exception e) { - exitStatusCode = -1; - logger.error(e.getMessage(), e); - throw new RuntimeException("process error . exitCode is : " + exitStatusCode); - } - - return exitStatusCode; - } - /** * build process * @@ -217,35 +167,80 @@ public abstract class AbstractCommandExecutor { } /** - * update process state to db - * - * @param processService process dao - * @param exitStatusCode exit status code - * @param pid process id - * @param taskInstId task instance id - * @return exit status code + * task specific execution logic + * @param execCommand execCommand + * @return CommandExecuteResult + * @throws Exception */ - private int updateState(ProcessService processService, int exitStatusCode, int pid, int taskInstId) { - //get yarn state by log - if (exitStatusCode == 0) { - TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId); - logger.info("process id is {}", pid); - - List appIds = getAppLinks(taskInstance.getLogPath()); - if (appIds.size() > 0) { - String appUrl = String.join(Constants.COMMA, appIds); - logger.info("yarn log url:{}",appUrl); - processService.updatePidByTaskInstId(taskInstId, pid, appUrl); - } + public CommandExecuteResult run(String execCommand) throws Exception{ - // check if all operations are completed - if (!isSuccessOfYarnState(appIds)) { - exitStatusCode = -1; - } + CommandExecuteResult result = new CommandExecuteResult(); + + + if (StringUtils.isEmpty(execCommand)) { + return result; + } + + String commandFilePath = buildCommandFilePath(); + + // create command file if not exists + createCommandFileIfNotExists(execCommand, commandFilePath); + + //build process + buildProcess(commandFilePath); + + // parse process output + parseProcessOutput(process); + + + Integer processId = getProcessId(process); + + result.setProcessId(processId); + + // print process id + logger.info("process start, process id is: {}", processId); + + // if timeout occurs, exit directly + long remainTime = getRemaintime(); + + // waiting for the run to finish + boolean status = process.waitFor(remainTime, TimeUnit.SECONDS); + + // SHELL task state + result.setExitStatusCode(process.exitValue()); + + logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{}", + taskDir, + processId + , result.getExitStatusCode()); + + // if SHELL task exit + if (status) { + // set appIds + List appIds = getAppIds(logPath); + result.setAppIds(String.join(Constants.COMMA, appIds)); + + // if yarn task , yarn state is final state + result.setExitStatusCode(isSuccessOfYarnState(appIds) ? EXIT_CODE_SUCCESS : EXIT_CODE_FAILURE); + } else { + logger.error("process has failure , exitStatusCode : {} , ready to kill ...", result.getExitStatusCode()); + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setPid(processId); + taskInstance.setHost(OSUtils.getHost()); + taskInstance.setLogPath(logPath); + taskInstance.setExecutePath(executePath); + + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setTenantCode(tenantCode); + taskInstance.setProcessInstance(processInstance); + + ProcessUtils.kill(taskInstance); + result.setExitStatusCode(EXIT_CODE_FAILURE); } - return exitStatusCode; + return result; } + /** * cancel application * @throws Exception exception @@ -378,10 +373,6 @@ public abstract class AbstractCommandExecutor { parseProcessOutputExecutorService.shutdown(); } - public int getPid() { - return getProcessId(process); - } - /** * check yarn state * @@ -389,11 +380,10 @@ public abstract class AbstractCommandExecutor { * @return is success of yarn task state */ public boolean isSuccessOfYarnState(List appIds) { - boolean result = true; try { for (String appId : appIds) { - while(true){ + while(Stopper.isRunning()){ ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId); logger.info("appId:{}, final state:{}",appId,applicationStatus.name()); if (applicationStatus.equals(ExecutionStatus.FAILURE) || @@ -406,7 +396,7 @@ public abstract class AbstractCommandExecutor { } Thread.sleep(Constants.SLEEP_TIME_MILLIS); } - } + } } catch (Exception e) { logger.error(String.format("yarn applications: %s status failed ", appIds.toString()),e); result = false; @@ -415,15 +405,20 @@ public abstract class AbstractCommandExecutor { } + public int getProcessId() { + return getProcessId(process); + } + /** * get app links - * @param fileName file name + * + * @param logPath log path * @return app id list */ - private List getAppLinks(String fileName) { - List logs = convertFile2List(fileName); + private List getAppIds(String logPath) { + List logs = convertFile2List(logPath); - List appIds = new ArrayList(); + List appIds = new ArrayList<>(); /** * analysis log,get submited yarn application id */ @@ -565,6 +560,5 @@ public abstract class AbstractCommandExecutor { } protected abstract String buildCommandFilePath(); protected abstract String commandInterpreter(); - protected abstract boolean checkFindApp(String line); protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index f2772d0747..e7c4e69110 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -55,6 +55,17 @@ public abstract class AbstractTask { protected Logger logger; + /** + * SHELL process pid + */ + protected Integer processId; + + /** + * other resource manager appId , for example : YARN etc + */ + protected String appIds; + + /** * cancel */ @@ -119,6 +130,22 @@ public abstract class AbstractTask { this.exitStatusCode = exitStatusCode; } + public String getAppIds() { + return appIds; + } + + public void setAppIds(String appIds) { + this.appIds = appIds; + } + + public Integer getProcessId() { + return processId; + } + + public void setProcessId(Integer processId) { + this.processId = processId; + } + /** * get task parameters * @return AbstractParameters @@ -126,6 +153,7 @@ public abstract class AbstractTask { public abstract AbstractParameters getParameters(); + /** * result processing */ @@ -146,7 +174,7 @@ public abstract class AbstractTask { && paramsMap.containsKey("v_proc_date")){ String vProcDate = paramsMap.get("v_proc_date").getValue(); if (!StringUtils.isEmpty(vProcDate)){ - TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskProps.getNodeName(), vProcDate); + TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskProps.getTaskName(), vProcDate); logger.info("task record status : {}",taskRecordState); if (taskRecordState == TaskRecordStatus.FAILURE){ setExitStatusCode(Constants.EXIT_CODE_FAILURE); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java index 39f4dfbb97..7a6aca9c3d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java @@ -26,11 +26,6 @@ import org.slf4j.Logger; * abstract yarn task */ public abstract class AbstractYarnTask extends AbstractTask { - - /** - * process instance - */ - /** * process task */ @@ -50,21 +45,26 @@ public abstract class AbstractYarnTask extends AbstractTask { super(taskProps, logger); this.processService = SpringApplicationContext.getBean(ProcessService.class); this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, - taskProps.getTaskDir(), + taskProps.getExecutePath(), taskProps.getTaskAppId(), - taskProps.getTaskInstId(), + taskProps.getTaskInstanceId(), taskProps.getTenantCode(), taskProps.getEnvFile(), taskProps.getTaskStartTime(), taskProps.getTaskTimeout(), + taskProps.getLogPath(), + taskProps.getExecutePath(), logger); } @Override public void handle() throws Exception { try { - // construct process - exitStatusCode = shellCommandExecutor.run(buildCommand(), processService); + // SHELL task exit code + CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(buildCommand()); + setExitStatusCode(commandExecuteResult.getExitStatusCode()); + setAppIds(commandExecuteResult.getAppIds()); + setProcessId(commandExecuteResult.getProcessId()); } catch (Exception e) { logger.error("yarn process failure", e); exitStatusCode = -1; @@ -82,7 +82,7 @@ public abstract class AbstractYarnTask extends AbstractTask { cancel = true; // cancel process shellCommandExecutor.cancelApplication(); - TaskInstance taskInstance = processService.findTaskInstanceById(taskProps.getTaskInstId()); + TaskInstance taskInstance = processService.findTaskInstanceById(taskProps.getTaskInstanceId()); if (status && taskInstance != null){ ProcessUtils.killYarnJob(taskInstance); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/CommandExecuteResult.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/CommandExecuteResult.java new file mode 100644 index 0000000000..5d1afe5ebd --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/CommandExecuteResult.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.task; + +/** + * command execute result + */ +public class CommandExecuteResult { + + /** + * command exit code + */ + private Integer exitStatusCode; + + /** + * appIds + */ + private String appIds; + + /** + * process id + */ + private Integer processId; + + + public CommandExecuteResult(){ + this.exitStatusCode = 0; + } + + + public Integer getExitStatusCode() { + return exitStatusCode; + } + + public void setExitStatusCode(Integer exitStatusCode) { + this.exitStatusCode = exitStatusCode; + } + + public String getAppIds() { + return appIds; + } + + public void setAppIds(String appIds) { + this.appIds = appIds; + } + + public Integer getProcessId() { + return processId; + } + + public void setProcessId(Integer processId) { + this.processId = processId; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java index a673134488..544a355a6b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java @@ -67,8 +67,10 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { String envFile, Date startTime, int timeout, + String logPath, + String executePath, Logger logger) { - super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, startTime, timeout, logger); + super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, startTime, timeout,logPath,executePath,logger); } @@ -132,15 +134,6 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { return pythonHome; } - /** - * check find yarn application id - * @param line line - * @return boolean - */ - @Override - protected boolean checkFindApp(String line) { - return true; - } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java index db46d0d856..7985253e38 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java @@ -53,13 +53,15 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { public ShellCommandExecutor(Consumer> logHandler, String taskDir, String taskAppId, - int taskInstId, + Integer taskInstId, String tenantCode, String envFile, Date startTime, - int timeout, + Integer timeout, + String logPath, + String executePath, Logger logger) { - super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, startTime, timeout, logger); + super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, startTime, timeout,logPath,executePath,logger); } @@ -78,15 +80,6 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { return SH; } - /** - * check find yarn application id - * @param line line - * @return true if line contains task app id - */ - @Override - protected boolean checkFindApp(String line) { - return line.contains(taskAppId); - } /** * create command file if not exists diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java index 8e5644ed9c..a7b66bb398 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java @@ -35,12 +35,12 @@ public class TaskProps { /** * task node name **/ - private String nodeName; + private String taskName; /** * task instance id **/ - private int taskInstId; + private int taskInstanceId; /** * tenant code , execute task linux user @@ -57,11 +57,6 @@ public class TaskProps { **/ private String taskParams; - /** - * task dir - **/ - private String taskDir; - /** * queue **/ @@ -111,6 +106,22 @@ public class TaskProps { */ private CommandType cmdTypeIfComplement; + + /** + * host + */ + private String host; + + /** + * log path + */ + private String logPath; + + /** + * execute path + */ + private String executePath; + /** * constructor */ @@ -123,7 +134,7 @@ public class TaskProps { * @param scheduleTime schedule time * @param nodeName node name * @param taskType task type - * @param taskInstId task instance id + * @param taskInstanceId task instance id * @param envFile env file * @param tenantCode tenant code * @param queue queue @@ -133,24 +144,25 @@ public class TaskProps { * @param cmdTypeIfComplement cmd type if complement */ public TaskProps(String taskParams, - String taskDir, Date scheduleTime, String nodeName, String taskType, - int taskInstId, + int taskInstanceId, String envFile, String tenantCode, String queue, Date taskStartTime, Map definedParams, String dependence, - CommandType cmdTypeIfComplement){ + CommandType cmdTypeIfComplement, + String host, + String logPath, + String executePath){ this.taskParams = taskParams; - this.taskDir = taskDir; this.scheduleTime = scheduleTime; - this.nodeName = nodeName; + this.taskName = nodeName; this.taskType = taskType; - this.taskInstId = taskInstId; + this.taskInstanceId = taskInstanceId; this.envFile = envFile; this.tenantCode = tenantCode; this.queue = queue; @@ -158,7 +170,9 @@ public class TaskProps { this.definedParams = definedParams; this.dependence = dependence; this.cmdTypeIfComplement = cmdTypeIfComplement; - + this.host = host; + this.logPath = logPath; + this.executePath = executePath; } public String getTenantCode() { @@ -177,12 +191,12 @@ public class TaskProps { this.taskParams = taskParams; } - public String getTaskDir() { - return taskDir; + public String getExecutePath() { + return executePath; } - public void setTaskDir(String taskDir) { - this.taskDir = taskDir; + public void setExecutePath(String executePath) { + this.executePath = executePath; } public Map getDefinedParams() { @@ -202,20 +216,20 @@ public class TaskProps { } - public String getNodeName() { - return nodeName; + public String getTaskName() { + return taskName; } - public void setNodeName(String nodeName) { - this.nodeName = nodeName; + public void setTaskName(String taskName) { + this.taskName = taskName; } - public int getTaskInstId() { - return taskInstId; + public int getTaskInstanceId() { + return taskInstanceId; } - public void setTaskInstId(int taskInstId) { - this.taskInstId = taskInstId; + public void setTaskInstanceId(int taskInstanceId) { + this.taskInstanceId = taskInstanceId; } public String getQueue() { @@ -291,6 +305,22 @@ public class TaskProps { this.cmdTypeIfComplement = cmdTypeIfComplement; } + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getLogPath() { + return logPath; + } + + public void setLogPath(String logPath) { + this.logPath = logPath; + } + /** * get parameters map * @return user defined params map diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java index ef941cd062..7c867f1144 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -52,6 +52,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.server.utils.DataxUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; +import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -121,12 +122,12 @@ public class DataxTask extends AbstractTask { public DataxTask(TaskProps props, Logger logger) { super(props, logger); - this.taskDir = props.getTaskDir(); + this.taskDir = props.getExecutePath(); logger.info("task dir : {}", taskDir); - this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, props.getTaskDir(), props.getTaskAppId(), - props.getTaskInstId(), props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(), - props.getTaskTimeout(), logger); + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, props.getExecutePath(), props.getTaskAppId(), + props.getTaskInstanceId(), props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(), + props.getTaskTimeout(), props.getLogPath(),props.getExecutePath(),logger); this.processService = SpringApplicationContext.getBean(ProcessService.class); } @@ -160,10 +161,15 @@ public class DataxTask extends AbstractTask { // run datax process String jsonFilePath = buildDataxJsonFile(); String shellCommandFilePath = buildShellCommandFile(jsonFilePath); - exitStatusCode = shellCommandExecutor.run(shellCommandFilePath, processService); + CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath); + + setExitStatusCode(commandExecuteResult.getExitStatusCode()); + setAppIds(commandExecuteResult.getAppIds()); + setProcessId(commandExecuteResult.getProcessId()); } catch (Exception e) { - exitStatusCode = -1; + logger.error("datax task failure", e); + setExitStatusCode(Constants.EXIT_CODE_FAILURE); throw e; } } @@ -355,7 +361,7 @@ public class DataxTask extends AbstractTask { String dataxCommand = sbr.toString(); // find process instance by task id - ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId()); + ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId()); // combining local and global parameters Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java index f074d57e6c..22cd60e4a0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java @@ -107,7 +107,7 @@ public class DependentTask extends AbstractTask { try{ TaskInstance taskInstance = null; while(Stopper.isRunning()){ - taskInstance = processService.findTaskInstanceById(this.taskProps.getTaskInstId()); + taskInstance = processService.findTaskInstanceById(this.taskProps.getTaskInstanceId()); if(taskInstance == null){ exitStatusCode = -1; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java index c562fbe4dd..ead61f08ca 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java @@ -68,7 +68,7 @@ public class FlinkTask extends AbstractYarnTask { if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) { String args = flinkParameters.getMainArgs(); // get process instance by task instance id - ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId()); + ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId()); /** * combining local and global parameters diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java index c925f90b9e..c1d1ed8cc5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java @@ -138,7 +138,7 @@ public class HttpTask extends AbstractTask { */ protected CloseableHttpResponse sendRequest(CloseableHttpClient client) throws IOException { RequestBuilder builder = createRequestBuilder(); - ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId()); + ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId()); Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), taskProps.getDefinedParams(), diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java index fc212f866b..cae532411b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.task.python; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.python.PythonParameters; @@ -24,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; +import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -65,16 +67,18 @@ public class PythonTask extends AbstractTask { public PythonTask(TaskProps taskProps, Logger logger) { super(taskProps, logger); - this.taskDir = taskProps.getTaskDir(); + this.taskDir = taskProps.getExecutePath(); this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle, - taskProps.getTaskDir(), + taskProps.getExecutePath(), taskProps.getTaskAppId(), - taskProps.getTaskInstId(), + taskProps.getTaskInstanceId(), taskProps.getTenantCode(), taskProps.getEnvFile(), taskProps.getTaskStartTime(), taskProps.getTaskTimeout(), + taskProps.getLogPath(), + taskProps.getExecutePath(), logger); this.processService = SpringApplicationContext.getBean(ProcessService.class); } @@ -94,10 +98,15 @@ public class PythonTask extends AbstractTask { public void handle() throws Exception { try { // construct process - exitStatusCode = pythonCommandExecutor.run(buildCommand(), processService); - } catch (Exception e) { + CommandExecuteResult commandExecuteResult = pythonCommandExecutor.run(buildCommand()); + + setExitStatusCode(commandExecuteResult.getExitStatusCode()); + setAppIds(commandExecuteResult.getAppIds()); + setProcessId(commandExecuteResult.getProcessId()); + } + catch (Exception e) { logger.error("python task failure", e); - exitStatusCode = -1; + setExitStatusCode(Constants.EXIT_CODE_FAILURE); throw e; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index 5704c8052e..b703a440a2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; +import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -74,15 +75,17 @@ public class ShellTask extends AbstractTask { public ShellTask(TaskProps taskProps, Logger logger) { super(taskProps, logger); - this.taskDir = taskProps.getTaskDir(); + this.taskDir = taskProps.getExecutePath(); - this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskProps.getTaskDir(), + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskProps.getExecutePath(), taskProps.getTaskAppId(), - taskProps.getTaskInstId(), + taskProps.getTaskInstanceId(), taskProps.getTenantCode(), taskProps.getEnvFile(), taskProps.getTaskStartTime(), taskProps.getTaskTimeout(), + taskProps.getLogPath(), + taskProps.getExecutePath(), logger); this.processService = SpringApplicationContext.getBean(ProcessService.class); } @@ -102,10 +105,13 @@ public class ShellTask extends AbstractTask { public void handle() throws Exception { try { // construct process - exitStatusCode = shellCommandExecutor.run(buildCommand(), processService); + CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(buildCommand()); + setExitStatusCode(commandExecuteResult.getExitStatusCode()); + setAppIds(commandExecuteResult.getAppIds()); + setProcessId(commandExecuteResult.getProcessId()); } catch (Exception e) { logger.error("shell task failure", e); - exitStatusCode = -1; + setExitStatusCode(Constants.EXIT_CODE_FAILURE); throw e; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index aae11f5530..e3a4cf75a7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -316,7 +316,7 @@ public class SqlTask extends AbstractTask { sendAttachment(sqlParameters.getTitle(), JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); }else{ - sendAttachment(taskProps.getNodeName() + " query resultsets ", + sendAttachment(taskProps.getTaskName() + " query resultsets ", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); } } @@ -384,7 +384,7 @@ public class SqlTask extends AbstractTask { public void sendAttachment(String title,String content){ // process instance - ProcessInstance instance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId()); + ProcessInstance instance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId()); List users = alertDao.queryUserByAlertGroupId(instance.getWarningGroupId()); @@ -471,7 +471,7 @@ public class SqlTask extends AbstractTask { */ private void checkUdfPermission(Integer[] udfFunIds) throws Exception{ // process instance - ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId()); + ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId()); int userId = processInstance.getExecutorId(); PermissionCheck permissionCheckUdf = new PermissionCheck(AuthorizationType.UDF, processService,udfFunIds,userId,logger); @@ -485,7 +485,7 @@ public class SqlTask extends AbstractTask { */ private void checkDataSourcePermission(int dataSourceId) throws Exception{ // process instance - ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId()); + ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId()); int userId = processInstance.getExecutorId(); PermissionCheck permissionCheckDataSource = new PermissionCheck(AuthorizationType.DATASOURCE, processService,new Integer[]{dataSourceId},userId,logger); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java index 5d4263644b..1a8a4ff6a8 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java @@ -55,13 +55,13 @@ public class ShellCommandExecutorTest { TaskProps taskProps = new TaskProps(); // processDefineId_processInstanceId_taskInstanceId - taskProps.setTaskDir("/opt/soft/program/tmp/dolphinscheduler/exec/flow/5/36/2864/7657"); + taskProps.setExecutePath("/opt/soft/program/tmp/dolphinscheduler/exec/flow/5/36/2864/7657"); taskProps.setTaskAppId("36_2864_7657"); // set tenant -> task execute linux user taskProps.setTenantCode("hdfs"); taskProps.setTaskStartTime(new Date()); taskProps.setTaskTimeout(360000); - taskProps.setTaskInstId(7657); + taskProps.setTaskInstanceId(7657); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java index c395eabe51..32e0b2f11c 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java @@ -97,15 +97,15 @@ public class SqlExecutorTest { */ private void sharedTestSqlTask(String nodeName, String taskAppId, String tenantCode, int taskInstId) throws Exception { TaskProps taskProps = new TaskProps(); - taskProps.setTaskDir(""); + taskProps.setExecutePath(""); // processDefineId_processInstanceId_taskInstanceId taskProps.setTaskAppId(taskAppId); // set tenant -> task execute linux user taskProps.setTenantCode(tenantCode); taskProps.setTaskStartTime(new Date()); taskProps.setTaskTimeout(360000); - taskProps.setTaskInstId(taskInstId); - taskProps.setNodeName(nodeName); + taskProps.setTaskInstanceId(taskInstId); + taskProps.setTaskName(nodeName); taskProps.setCmdTypeIfComplement(CommandType.START_PROCESS); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java index bd7f27530a..d1b82f226d 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java @@ -71,9 +71,9 @@ public class DataxTaskTest { Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); TaskProps props = new TaskProps(); - props.setTaskDir("/tmp"); + props.setExecutePath("/tmp"); props.setTaskAppId(String.valueOf(System.currentTimeMillis())); - props.setTaskInstId(1); + props.setTaskInstanceId(1); props.setTenantCode("1"); props.setEnvFile(".dolphinscheduler_env.sh"); props.setTaskStartTime(new Date()); @@ -87,8 +87,8 @@ public class DataxTaskTest { Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource()); Mockito.when(processService.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance()); - String fileName = String.format("%s/%s_node.sh", props.getTaskDir(), props.getTaskAppId()); - Mockito.when(shellCommandExecutor.run(fileName, processService)).thenReturn(0); + String fileName = String.format("%s/%s_node.sh", props.getExecutePath(), props.getTaskAppId()); + Mockito.when(shellCommandExecutor.run(fileName)).thenReturn(null); } private DataSource getDataSource() { @@ -118,9 +118,9 @@ public class DataxTaskTest { public void testDataxTask() throws Exception { TaskProps props = new TaskProps(); - props.setTaskDir("/tmp"); + props.setExecutePath("/tmp"); props.setTaskAppId(String.valueOf(System.currentTimeMillis())); - props.setTaskInstId(1); + props.setTaskInstanceId(1); props.setTenantCode("1"); Assert.assertNotNull(new DataxTask(props, logger)); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java index 272fb546da..a6a2587f00 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java @@ -50,7 +50,7 @@ public class DependentTaskTest { "\"relation\":\"OR\"\n" + "}"; - taskProps.setTaskInstId(252612); + taskProps.setTaskInstanceId(252612); taskProps.setDependence(dependString); DependentTask dependentTask = new DependentTask(taskProps, logger); dependentTask.init(); diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue index dbe3e1d6b1..7874b53885 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue @@ -170,7 +170,7 @@ */ _downloadLog () { downloadFile('/dolphinscheduler/log/download-log', { - taskInstId: this.stateId || this.logId + taskInstanceId: this.stateId || this.logId }) }, /** @@ -256,7 +256,7 @@ computed: { _rtParam () { return { - taskInstId: this.stateId || this.logId, + taskInstanceId: this.stateId || this.logId, skipLineNum: parseInt(`${this.loadingIndex ? this.loadingIndex + '000' : 0}`), limit: parseInt(`${this.loadingIndex ? this.loadingIndex + 1 : 1}000`) }