diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java index 7b61a1eaec..e8e31faa6d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java @@ -39,6 +39,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.SystemUtils; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -117,33 +118,45 @@ public final class ProcessUtils { * @throws Exception exception */ public static String getPidsStr(int processId) throws Exception { - StringBuilder sb = new StringBuilder(); - Matcher mat = null; + + String rawPidStr; + // pstree pid get sub pids if (SystemUtils.IS_OS_MAC) { - String pids = OSUtils.exeCmd(String.format("%s -sp %d", TaskConstants.PSTREE, processId)); - if (StringUtils.isNotEmpty(pids)) { - mat = MACPATTERN.matcher(pids); + rawPidStr = OSUtils.exeCmd(String.format("%s -sp %d", TaskConstants.PSTREE, processId)); + } else if (SystemUtils.IS_OS_LINUX) { + rawPidStr = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId)); + } else { + rawPidStr = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId)); + } + + return parsePidStr(rawPidStr); + } + + public static String parsePidStr(String rawPidStr) { + + log.info("prepare to parse pid, raw pid string: {}", rawPidStr); + ArrayList allPidList = new ArrayList<>(); + Matcher mat = null; + if (SystemUtils.IS_OS_MAC) { + if (StringUtils.isNotEmpty(rawPidStr)) { + mat = MACPATTERN.matcher(rawPidStr); } } else if (SystemUtils.IS_OS_LINUX) { - String pids = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId)); - if (StringUtils.isNotEmpty(pids)) { - mat = LINUXPATTERN.matcher(pids); + if (StringUtils.isNotEmpty(rawPidStr)) { + mat = LINUXPATTERN.matcher(rawPidStr); } } else { - String pids = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId)); - if (StringUtils.isNotEmpty(pids)) { - mat = WINDOWSPATTERN.matcher(pids); + if (StringUtils.isNotEmpty(rawPidStr)) { + mat = WINDOWSPATTERN.matcher(rawPidStr); } } - if (null != mat) { while (mat.find()) { - sb.append(mat.group(1)).append(" "); + allPidList.add(mat.group(1)); } } - - return sb.toString().trim(); + return String.join(" ", allPidList).trim(); } /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java index c590fa9e44..334ebb6195 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java @@ -17,12 +17,16 @@ package org.apache.dolphinscheduler.plugin.task.remoteshell; +import static org.apache.dolphinscheduler.plugin.task.remoteshell.RemoteExecutor.COMMAND.PSTREE_COMMAND; + import org.apache.dolphinscheduler.plugin.datasource.ssh.SSHUtils; import org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHConnectionParam; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser; +import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; import org.apache.sshd.client.SshClient; import org.apache.sshd.client.channel.ChannelExec; import org.apache.sshd.client.channel.ClientChannelEvent; @@ -50,7 +54,6 @@ public class RemoteExecutor implements AutoCloseable { static final int TRACK_INTERVAL = 5000; protected Map taskOutputParams = new HashMap<>(); - private SshClient sshClient; private ClientSession session; private SSHConnectionParam sshConnectionParam; @@ -154,11 +157,45 @@ public class RemoteExecutor implements AutoCloseable { public void kill(String taskId) throws IOException { String pid = getTaskPid(taskId); - String killCommand = String.format(COMMAND.KILL_COMMAND, pid); + + if (StringUtils.isEmpty(pid)) { + log.warn("query remote-shell task remote process id with empty"); + return; + } + if (!NumberUtils.isParsable(pid)) { + log.error("query remote-shell task remote process id error, pid {} can not parse to number", pid); + return; + } + + // query all pid + String remotePidStr = getAllRemotePidStr(pid); + String killCommand = String.format(COMMAND.KILL_COMMAND, remotePidStr); + log.info("prepare to execute kill command in host: {}, kill cmd: {}", sshConnectionParam.getHost(), + killCommand); runRemote(killCommand); cleanData(taskId); } + protected String getAllRemotePidStr(String pid) { + + String remoteProcessIdStr = ""; + String cmd = String.format(PSTREE_COMMAND, pid); + log.info("query all process id cmd: {}", cmd); + + try { + String rawPidStr = runRemote(cmd); + remoteProcessIdStr = ProcessUtils.parsePidStr(rawPidStr); + if (!remoteProcessIdStr.startsWith(pid)) { + log.error("query remote process id error, [{}] first pid not equal [{}]", remoteProcessIdStr, pid); + remoteProcessIdStr = pid; + } + } catch (Exception e) { + log.error("query remote all process id error", e); + remoteProcessIdStr = pid; + } + return remoteProcessIdStr; + } + public String getTaskPid(String taskId) throws IOException { String pidCommand = String.format(COMMAND.GET_PID_COMMAND, taskId); return runRemote(pidCommand).trim(); @@ -238,6 +275,9 @@ public class RemoteExecutor implements AutoCloseable { static final String ADD_STATUS_COMMAND = "\necho %s$?"; static final String CAT_FINAL_SCRIPT = "cat %s%s.sh"; + + static final String PSTREE_COMMAND = "pstree -p %s"; + } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java index 975f059695..3cc9757ce1 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.plugin.task.remoteshell; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -135,4 +136,22 @@ public class RemoteExecutorTest { doReturn("DOLPHINSCHEDULER-REMOTE-SHELL-TASK-STATUS-1").when(remoteExecutor).runRemote(trackCommand); Assertions.assertEquals(1, remoteExecutor.getTaskExitCode(taskId)); } + + @Test + void getAllRemotePidStr() throws IOException { + + RemoteExecutor remoteExecutor = spy(new RemoteExecutor(sshConnectionParam)); + doReturn("bash(9527)───sleep(9528)").when(remoteExecutor).runRemote(anyString()); + String allPidStr = remoteExecutor.getAllRemotePidStr("9527"); + Assertions.assertEquals("9527 9528", allPidStr); + + doReturn("systemd(1)───sleep(9528)").when(remoteExecutor).runRemote(anyString()); + allPidStr = remoteExecutor.getAllRemotePidStr("9527"); + Assertions.assertEquals("9527", allPidStr); + + doThrow(new TaskException()).when(remoteExecutor).runRemote(anyString()); + allPidStr = remoteExecutor.getAllRemotePidStr("9527"); + Assertions.assertEquals("9527", allPidStr); + + } }