diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java index 8c749c53fc..231d84877b 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java @@ -17,24 +17,29 @@ package org.apache.dolphinscheduler.common.utils; +import lombok.experimental.UtilityClass; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import java.io.BufferedReader; +import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; - -import lombok.experimental.UtilityClass; +import java.util.stream.Stream; /** * logger utils @@ -44,11 +49,6 @@ public class LoggerUtils { private static final Logger logger = LoggerFactory.getLogger(LoggerUtils.class); - /** - * rules for extracting application ID - */ - private static final Pattern APPLICATION_REGEX = Pattern.compile(Constants.APPLICATION_REGEX); - /** * build job id * @@ -65,31 +65,6 @@ public class LoggerUtils { TaskConstants.TASK_APPID_LOG_FORMAT, TaskConstants.TASK_LOGGER_INFO_PREFIX, firstSubmitTimeStr, processDefineCode, processDefineVersion, processInstId, taskId); } - /** - * processing log - * get yarn application id list - * - * @param log log content - * @param logger logger - * @return app id list - */ - public static List getAppIds(String log, Logger logger) { - - List appIds = new ArrayList<>(); - - Matcher matcher = APPLICATION_REGEX.matcher(log); - - // analyse logs to get all submit yarn application id - while (matcher.find()) { - String appId = matcher.group(); - if (!appIds.contains(appId)) { - logger.info("find app id: {}", appId); - appIds.add(appId); - } - } - return appIds; - } - /** * read whole file content * diff --git a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java index b5cae3562d..ea60fe918c 100644 --- a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java +++ b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java @@ -19,8 +19,11 @@ package org.apache.dolphinscheduler.server.log; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.log.GetAppIdRequestCommand; +import org.apache.dolphinscheduler.remote.command.log.GetAppIdResponseCommand; import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand; import org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand; import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand; @@ -157,6 +160,15 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { RemoveTaskLogResponseCommand removeTaskLogResponse = new RemoveTaskLogResponseCommand(status); channel.writeAndFlush(removeTaskLogResponse.convert2Command(command.getOpaque())); break; + case GET_APP_ID_REQUEST: + GetAppIdRequestCommand getAppIdRequestCommand = JSONUtils.parseObject(command.getBody(), GetAppIdRequestCommand.class); + String logPath = getAppIdRequestCommand.getLogPath(); + if (!checkPathSecurity(logPath)) { + throw new IllegalArgumentException("Illegal path"); + } + List appIds = LogUtils.getAppIdsFromLogFile(logPath); + channel.writeAndFlush(new GetAppIdResponseCommand(appIds).convert2Command(command.getOpaque())); + break; default: throw new IllegalArgumentException("unknown commandType: " + commandType); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index 6e26973a76..b83c9ec53c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -19,44 +19,23 @@ package org.apache.dolphinscheduler.remote.command; public enum CommandType { - /** - * remove task log request, - */ + GET_APP_ID_REQUEST, + GET_APP_ID_RESPONSE, + REMOVE_TAK_LOG_REQUEST, - /** - * remove task log response - */ REMOVE_TAK_LOG_RESPONSE, - /** - * roll view log request - */ ROLL_VIEW_LOG_REQUEST, - /** - * roll view log response - */ ROLL_VIEW_LOG_RESPONSE, - /** - * view whole log request - */ VIEW_WHOLE_LOG_REQUEST, - /** - * view whole log response - */ VIEW_WHOLE_LOG_RESPONSE, - /** - * get log bytes request - */ GET_LOG_BYTES_REQUEST, - /** - * get log bytes response - */ GET_LOG_BYTES_RESPONSE, diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdRequestCommand.java new file mode 100644 index 0000000000..26412b8283 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdRequestCommand.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command.log; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; + +import java.io.Serializable; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class GetAppIdRequestCommand implements Serializable { + + private String logPath; + + public Command convert2Command() { + Command command = new Command(); + command.setType(CommandType.GET_APP_ID_REQUEST); + byte[] body = JSONUtils.toJsonByteArray(this); + command.setBody(body); + return command; + } + +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdResponseCommand.java new file mode 100644 index 0000000000..2e54a9008d --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdResponseCommand.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command.log; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; + +import java.io.Serializable; +import java.util.List; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class GetAppIdResponseCommand implements Serializable { + + private List appIds; + + public Command convert2Command(long opaque) { + Command command = new Command(opaque); + command.setType(CommandType.GET_APP_ID_RESPONSE); + byte[] body = JSONUtils.toJsonByteArray(this); + command.setBody(body); + return command; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java index 7b34cae867..1c8e81a79e 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java @@ -94,7 +94,7 @@ public class NettyClient { /** * channels */ - private final ConcurrentHashMap channels = new ConcurrentHashMap(128); + private final ConcurrentHashMap channels = new ConcurrentHashMap<>(128); /** * get channel diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java index 36888fef45..4392d6e14a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java @@ -17,21 +17,22 @@ package org.apache.dolphinscheduler.server.utils; +import lombok.NonNull; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.SystemUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.log.LogClientService; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.SystemUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.nio.charset.StandardCharsets; @@ -41,11 +42,6 @@ import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import lombok.NonNull; - /** * mainly used to get the start command line of a process. */ @@ -196,12 +192,12 @@ public class ProcessUtils { } try { Thread.sleep(Constants.SLEEP_TIME_MILLIS); - String log; + List appIds; try (LogClientService logClient = new LogClientService()) { Host host = Host.of(taskExecutionContext.getHost()); - log = logClient.viewLog(host.getIp(), host.getPort(), taskExecutionContext.getLogPath()); + appIds = logClient.getAppIds(host.getIp(), host.getPort(), taskExecutionContext.getLogPath()); } - if (!StringUtils.isEmpty(log)) { + if (CollectionUtils.isNotEmpty(appIds)) { if (StringUtils.isEmpty(taskExecutionContext.getExecutePath())) { taskExecutionContext .setExecutePath(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(), @@ -211,16 +207,13 @@ public class ProcessUtils { taskExecutionContext.getTaskInstanceId())); } FileUtils.createWorkDirIfAbsent(taskExecutionContext.getExecutePath()); - List appIds = LoggerUtils.getAppIds(log, logger); - if (CollectionUtils.isNotEmpty(appIds)) { - cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), - taskExecutionContext.getExecutePath()); - return appIds; - } + cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath()); + return appIds; + } else { + logger.info("The current appId is empty, don't need to kill the yarn job, taskInstanceId: {}", taskExecutionContext.getTaskInstanceId()); } - } catch (Exception e) { - logger.error("kill yarn job failure", e); + logger.error("Kill yarn job failure, taskInstanceId: {}", taskExecutionContext.getTaskInstanceId(), e); } return Collections.emptyList(); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java index f682780ef7..3088724ed4 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java @@ -17,11 +17,15 @@ package org.apache.dolphinscheduler.service.log; +import lombok.NonNull; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.log.GetAppIdRequestCommand; +import org.apache.dolphinscheduler.remote.command.log.GetAppIdResponseCommand; import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand; import org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand; import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand; @@ -31,11 +35,14 @@ import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand; import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.utils.Host; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; +import java.util.List; + /** * log client */ @@ -195,6 +202,28 @@ public class LogClientService implements AutoCloseable { return result; } + public @Nullable List getAppIds(@NonNull String host, int port, @NonNull String taskLogFilePath) throws RemotingException, InterruptedException { + logger.info("Begin to get appIds from worker: {}:{} taskLogPath: {}", host, port, taskLogFilePath); + final Host workerAddress = new Host(host, port); + List appIds = null; + try { + if (NetUtils.getHost().equals(host)) { + appIds = LogUtils.getAppIdsFromLogFile(taskLogFilePath); + } else { + final Command command = new GetAppIdRequestCommand(taskLogFilePath).convert2Command(); + Command response = this.client.sendSync(workerAddress, command, LOG_REQUEST_TIMEOUT); + if (response != null) { + GetAppIdResponseCommand responseCommand = JSONUtils.parseObject(response.getBody(), GetAppIdResponseCommand.class); + appIds = responseCommand.getAppIds(); + } + } + } finally { + client.closeChannel(workerAddress); + } + logger.info("Get appIds: {} from worker: {}:{} taskLogPath: {}", appIds, host, port, taskLogFilePath); + return appIds; + } + public boolean isRunning() { return isRunning; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index 9efd33f6dd..49e224e95d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -17,25 +17,20 @@ package org.apache.dolphinscheduler.plugin.task.api; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL; - +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.lang3.SystemUtils; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants; import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils; import org.apache.dolphinscheduler.spi.utils.PropertyUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; - -import org.apache.commons.lang3.SystemUtils; +import org.slf4j.Logger; import java.io.BufferedReader; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.lang.reflect.Field; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -48,15 +43,14 @@ import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.slf4j.Logger; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL; /** * abstract command executor */ public abstract class AbstractCommandExecutor { - + /** * rules for extracting Var Pool */ @@ -391,6 +385,7 @@ public abstract class AbstractCommandExecutor { /** * find var pool + * * @param line * @return */ diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java new file mode 100644 index 0000000000..220d2583d2 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.utils; + +import lombok.NonNull; +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Stream; + +@Slf4j +@UtilityClass +public class LogUtils { + + private static final Pattern APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX); + + public List getAppIdsFromLogFile(@NonNull String logPath) { + return getAppIdsFromLogFile(logPath, log); + } + + public List getAppIdsFromLogFile(@NonNull String logPath, Logger logger) { + File logFile = new File(logPath); + if (!logFile.exists() || !logFile.isFile()) { + return Collections.emptyList(); + } + Set appIds = new HashSet<>(); + try (Stream stream = Files.lines(Paths.get(logPath))) { + stream.filter(line -> { + Matcher matcher = APPLICATION_REGEX.matcher(line); + return matcher.find(); + } + ).forEach(line -> { + Matcher matcher = APPLICATION_REGEX.matcher(line); + if (matcher.find()) { + String appId = matcher.group(); + if (appIds.add(appId)) { + logger.info("Find appId: {} from {}", appId, logPath); + } + } + }); + return new ArrayList<>(appIds); + } catch (IOException e) { + logger.error("Get appId from log file erro, logPath: {}", logPath, e); + return Collections.emptyList(); + } + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java new file mode 100644 index 0000000000..feb10ad859 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.utils; + +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class LogUtilsTest { + + private static final String APP_ID_FILE = LogUtilsTest.class.getResource("/appId.txt") + .getFile(); + + @Test + public void getAppIdsFromLogFile() { + List appIds = LogUtils.getAppIdsFromLogFile(APP_ID_FILE); + Assert.assertEquals(Lists.newArrayList("application_1548381669007_1234"), appIds); + } +} \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/appId.txt b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/appId.txt new file mode 100644 index 0000000000..7de4480eea --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/appId.txt @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +19/04/02 11:40:22 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED) +19/04/02 11:40:23 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED) +19/04/02 11:40:24 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED) +19/04/02 11:40:25 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED) +19/04/02 11:40:26 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED) +19/04/02 11:40:27 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED) +19/04/02 11:40:28 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED) +19/04/02 11:40:29 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED) +19/04/02 11:40:30 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED) +19/04/02 11:40:31 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED) +19/04/02 11:40:32 INFO yarn.Client: Application report for application_1548381669007_1234 (state: RUNNING) +19/04/02 11:40:33 INFO yarn.Client: Application report for application_1548381669007_1234 (state: RUNNING) \ No newline at end of file diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index fa1db46121..f50e6f2d03 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -69,7 +69,7 @@ public class WorkerConfig implements Validator { if (workerConfig.getExecThreads() <= 0) { errors.rejectValue("exec-threads", null, "should be a positive value"); } - if (workerConfig.getHeartbeatInterval().toMillis() <= 0) { + if (workerConfig.getHeartbeatInterval().getSeconds() <= 0) { errors.rejectValue("heartbeat-interval", null, "shoule be a valid duration"); } if (workerConfig.getMaxCpuLoadAvg() <= 0) { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index fd7e1d726c..3aadb5a5cc 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -22,8 +22,9 @@ import com.google.common.base.Strings; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import lombok.NonNull; +import org.apache.commons.collections4.CollectionUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; @@ -137,11 +138,6 @@ public class TaskKillProcessor implements NettyRequestProcessor { }); } - /** - * do kill - * - * @return kill result - */ private Pair> doKill(TaskExecutionContext taskExecutionContext) { // kill system process boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId()); @@ -204,29 +200,33 @@ public class TaskKillProcessor implements NettyRequestProcessor { /** * kill yarn job * - * @param host host - * @param logPath logPath + * @param host host + * @param logPath logPath * @param executePath executePath - * @param tenantCode tenantCode + * @param tenantCode tenantCode * @return Pair> yarn kill result */ - private Pair> killYarnJob(Host host, String logPath, String executePath, String tenantCode) { - try (LogClientService logClient = new LogClientService();) { - logger.info("log host : {} , logPath : {} , port : {}", host.getIp(), logPath, - host.getPort()); - String log = logClient.viewLog(host.getIp(), host.getPort(), logPath); - List appIds = Collections.emptyList(); - if (!Strings.isNullOrEmpty(log)) { - appIds = LoggerUtils.getAppIds(log, logger); - if (Strings.isNullOrEmpty(executePath)) { - logger.error("task instance execute path is empty"); - throw new RuntimeException("task instance execute path is empty"); - } - if (appIds.size() > 0) { - ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath); - } + private Pair> killYarnJob(@NonNull Host host, + String logPath, + String executePath, + String tenantCode) { + if (logPath == null || executePath == null || tenantCode == null) { + logger.error("Kill yarn job error, the input params is illegal, host: {}, logPath: {}, executePath: {}, tenantCode: {}", + host, logPath, executePath, tenantCode); + return Pair.of(false, Collections.emptyList()); + } + try (LogClientService logClient = new LogClientService()) { + logger.info("Get appIds from worker {}:{} taskLogPath: {}", host.getIp(), host.getPort(), logPath); + List appIds = logClient.getAppIds(host.getIp(), host.getPort(), logPath); + if (CollectionUtils.isEmpty(appIds)) { + return Pair.of(true, Collections.emptyList()); } + + ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath); return Pair.of(true, appIds); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("kill yarn job error, the current thread has been interrtpted", e); } catch (Exception e) { logger.error("kill yarn job error", e); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java index 1150b9102b..140e410242 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java @@ -84,6 +84,7 @@ public class WorkerRpcServer implements Closeable { this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_SAVEPOINT_REQUEST, taskSavePointProcessor); // logger server + this.nettyRemotingServer.registerProcessor(CommandType.GET_APP_ID_REQUEST, loggerRequestProcessor); this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor); this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor); this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);