From aef5524ee7d44cfed55aab3304dd259772c99ff3 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Thu, 9 Mar 2023 20:46:29 +0800 Subject: [PATCH] Clearer task runnable. (#13689) * Clean unused method in AbstractTask * Kill task by process.destroy * wait 5/s after destroy process --- .../task/api/AbstractCommandExecutor.java | 63 ++++--------------- .../plugin/task/api/AbstractTask.java | 18 +----- .../plugin/task/api/TaskChannel.java | 2 + .../task/api/parameters/IParameters.java | 33 +++++----- .../plugin/task/api/utils/ProcessUtils.java | 1 + .../plugin/task/java/JavaTask.java | 16 ----- .../plugin/task/openmldb/OpenmldbTask.java | 6 -- .../plugin/task/python/PythonTask.java | 43 ------------- .../worker/processor/TaskKillProcessor.java | 1 + 9 files changed, 35 insertions(+), 148 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index 92297558d7..684db5d414 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.api; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL; -import static org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.getPidsStr; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; @@ -191,7 +190,7 @@ public abstract class AbstractCommandExecutor { command.add(String.format("--uid=%s", taskRequest.getTenantCode())); } - public TaskResponse run(String execCommand, TaskCallBack taskCallBack) throws IOException, InterruptedException { + public TaskResponse run(String execCommand, TaskCallBack taskCallBack) throws Exception { TaskResponse result = new TaskResponse(); int taskInstanceId = taskRequest.getTaskInstanceId(); if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) { @@ -223,8 +222,8 @@ public abstract class AbstractCommandExecutor { boolean updateTaskExecutionContextStatus = TaskExecutionContextCacheManager.updateTaskExecutionContext(taskRequest); if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) { - ProcessUtils.kill(taskRequest); result.setExitStatusCode(EXIT_CODE_KILL); + cancelApplication(); return result; } // print process id @@ -262,14 +261,13 @@ public abstract class AbstractCommandExecutor { } else { logger.error("process has failure, the task timeout configuration value is:{}, ready to kill ...", taskRequest.getTaskTimeout()); - ProcessUtils.kill(taskRequest); result.setExitStatusCode(EXIT_CODE_FAILURE); + cancelApplication(); } int exitCode = process.exitValue(); String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited."; - logger.info(exitLogMessage - + " execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}", - taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode); + logger.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}", + exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode); return result; } @@ -278,53 +276,18 @@ public abstract class AbstractCommandExecutor { return varPool.toString(); } - /** - * cancel application - * - * @throws Exception exception - */ - public void cancelApplication() throws Exception { + public void cancelApplication() throws InterruptedException { if (process == null) { return; } - int processId = getProcessId(process); - logger.info("Begin to kill process process, pid is : {}", processId); - // kill , waiting for completion - boolean alive = softKill(processId); - - if (alive) { - String cmd = String.format("kill -9 %s", getPidsStr(processId)); - cmd = OSUtils.getSudoCmd(taskRequest.getTenantCode(), cmd); - OSUtils.exeCmd(cmd); - logger.info("Success kill task: {}, pid: {}, cmd: {}", taskRequest.getTaskAppId(), processId, cmd); - } else { - logger.info("The process: {} is not alive, no need to kill", processId); + // soft kill + logger.info("Begin to kill process process, pid is : {}", taskRequest.getProcessId()); + process.destroy(); + if (!process.waitFor(5, TimeUnit.SECONDS)) { + process.destroyForcibly(); } - } - - /** - * soft kill - * - * @param processId process id - * @return process is alive - */ - private boolean softKill(int processId) { - - if (processId != 0 && process.isAlive()) { - try { - // sudo -u user command to run command - String cmd = String.format("kill %d", processId); - cmd = OSUtils.getSudoCmd(taskRequest.getTenantCode(), cmd); - logger.info("soft kill task:{}, process id:{}, cmd:{}", taskRequest.getTaskAppId(), processId, cmd); - - Runtime.getRuntime().exec(cmd); - } catch (IOException e) { - logger.info("kill attempt failed", e); - } - } - - return process.isAlive(); + logger.info("Success kill task: {}, pid: {}", taskRequest.getTaskAppId(), taskRequest.getProcessId()); } private void printCommand(List commands) { @@ -424,7 +387,7 @@ public abstract class AbstractCommandExecutor { f.setAccessible(true); processId = f.getInt(process); - } catch (Throwable e) { + } catch (Exception e) { logger.error("Get task pid failed", e); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java index 1c51594b9e..1986957c98 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java @@ -55,11 +55,6 @@ public abstract class AbstractTask { */ protected int processId; - /** - * SHELL result string - */ - protected String resultString; - /** * other resource manager appId , for example : YARN etc */ @@ -89,10 +84,7 @@ public abstract class AbstractTask { public void init() { } - public String getPreScript() { - return null; - } - + // todo: return TaskResult rather than store the result in Task public abstract void handle(TaskCallBack taskCallBack) throws TaskException; public abstract void cancel() throws TaskException; @@ -126,14 +118,6 @@ public abstract class AbstractTask { this.processId = processId; } - public String getResultString() { - return resultString; - } - - public void setResultString(String resultString) { - this.resultString = resultString; - } - public String getAppIds() { return appIds; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannel.java index 421a2646c2..77abae5047 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannel.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannel.java @@ -25,8 +25,10 @@ public interface TaskChannel { void cancelApplication(boolean status); + // todo: return ITask AbstractTask createTask(TaskExecutionContext taskRequest); + // todo: return IParameters AbstractParameters parseParameters(ParametersNode parametersNode); ResourceParametersHelper getResources(String parameters); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/IParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/IParameters.java index bcd914c8be..a23c25dd07 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/IParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/IParameters.java @@ -1,19 +1,20 @@ -package org.apache.dolphinscheduler.plugin.task.api.parameters;/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +package org.apache.dolphinscheduler.plugin.task.api.parameters; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java index 9ec8480589..e260277020 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java @@ -79,6 +79,7 @@ public final class ProcessUtils { /** * kill tasks according to different task types. */ + @Deprecated public static boolean kill(@NonNull TaskExecutionContext request) { try { log.info("Begin kill task instance, processId: {}", request.getProcessId()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java index 8628c01ea6..280ede0d7e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java @@ -99,22 +99,6 @@ public class JavaTask extends AbstractTask { log.info("Initialize java task params {}", JSONUtils.toPrettyJsonString(javaParameters)); } - /** - * Gets the Java source file that was initially processed - * - * @return String - **/ - @Override - public String getPreScript() { - String rawJavaScript = javaParameters.getRawScript().replaceAll("\\r\\n", "\n"); - try { - rawJavaScript = convertJavaSourceCodePlaceholders(rawJavaScript); - } catch (StringIndexOutOfBoundsException e) { - log.error("setShareVar field format error, raw java script: {}", rawJavaScript); - } - return rawJavaScript; - } - /** * Execute Java tasks * diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java index 75e6b3bce3..88371675ef 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java @@ -70,12 +70,6 @@ public class OpenmldbTask extends PythonTask { } } - @Override - @Deprecated - public String getPreScript() { - return ""; - } - /** * build python command file path * diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java index cb2cdfdcdc..f2853bab5e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java @@ -87,17 +87,6 @@ public class PythonTask extends AbstractTask { } } - @Override - public String getPreScript() { - String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", System.lineSeparator()); - try { - rawPythonScript = convertPythonScriptPlaceholders(rawPythonScript); - } catch (StringIndexOutOfBoundsException e) { - log.error("setShareVar field format error, raw python script : {}", rawPythonScript); - } - return rawPythonScript; - } - @Override public void handle(TaskCallBack taskCallBack) throws TaskException { try { @@ -137,38 +126,6 @@ public class PythonTask extends AbstractTask { return pythonParameters; } - /** - * convertPythonScriptPlaceholders - * - * @param rawScript rawScript - * @return String - * @throws StringIndexOutOfBoundsException if substring index is out of bounds - */ - private static String convertPythonScriptPlaceholders(String rawScript) throws StringIndexOutOfBoundsException { - int len = "${setShareVar(${".length(); - int scriptStart = 0; - while ((scriptStart = rawScript.indexOf("${setShareVar(${", scriptStart)) != -1) { - int start = -1; - int end = rawScript.indexOf('}', scriptStart + len); - String prop = rawScript.substring(scriptStart + len, end); - - start = rawScript.indexOf(',', end); - end = rawScript.indexOf(')', start); - - String value = rawScript.substring(start + 1, end); - - start = rawScript.indexOf('}', start) + 1; - end = rawScript.length(); - - String replaceScript = String.format("print(\"${{setValue({},{})}}\".format(\"%s\",%s))", prop, value); - - rawScript = rawScript.substring(0, scriptStart) + replaceScript + rawScript.substring(start, end); - - scriptStart += replaceScript.length(); - } - return rawScript; - } - /** * create python command file if not exists * diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 9afc4708c7..6421dc7490 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -186,6 +186,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { * @param processId */ protected boolean killProcess(String tenantCode, Integer processId) { + // todo: directly interrupt the process boolean processFlag = true; if (processId == null || processId.equals(0)) { return true;