Browse Source

[3.0.1-prepare]cherry-pick [Bug] [Worker] Optimize the getAppId method to avoid worker OOM when kill task (#11994)

* cherry-pick [Bug] [Worker] Optimize the getAppId method to avoid worker OOM when kill task
Co-authored-by: Wenjun Ruan <wenjun@apache.org>
3.0.1-release
Kerwin 2 years ago committed by GitHub
parent
commit
c4c943277c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 47
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
  2. 12
      dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
  3. 27
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
  4. 44
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdRequestCommand.java
  5. 44
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdResponseCommand.java
  6. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java
  7. 33
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
  8. 3
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
  9. 31
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
  10. 22
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
  11. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
  12. 75
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
  13. 36
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
  14. 29
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/appId.txt
  15. 2
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  16. 59
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  17. 1
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java

47
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java

@ -17,24 +17,29 @@
package org.apache.dolphinscheduler.common.utils;
import lombok.experimental.UtilityClass;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import lombok.experimental.UtilityClass;
import java.util.stream.Stream;
/**
* logger utils
@ -44,11 +49,6 @@ public class LoggerUtils {
private static final Logger logger = LoggerFactory.getLogger(LoggerUtils.class);
/**
* rules for extracting application ID
*/
private static final Pattern APPLICATION_REGEX = Pattern.compile(Constants.APPLICATION_REGEX);
/**
* build job id
*
@ -65,31 +65,6 @@ public class LoggerUtils {
TaskConstants.TASK_APPID_LOG_FORMAT, TaskConstants.TASK_LOGGER_INFO_PREFIX, firstSubmitTimeStr, processDefineCode, processDefineVersion, processInstId, taskId);
}
/**
* processing log
* get yarn application id list
*
* @param log log content
* @param logger logger
* @return app id list
*/
public static List<String> getAppIds(String log, Logger logger) {
List<String> appIds = new ArrayList<>();
Matcher matcher = APPLICATION_REGEX.matcher(log);
// analyse logs to get all submit yarn application id
while (matcher.find()) {
String appId = matcher.group();
if (!appIds.contains(appId)) {
logger.info("find app id: {}", appId);
appIds.add(appId);
}
}
return appIds;
}
/**
* read whole file content
*

12
dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java

@ -19,8 +19,11 @@ package org.apache.dolphinscheduler.server.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.log.GetAppIdRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.GetAppIdResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand;
@ -157,6 +160,15 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
RemoveTaskLogResponseCommand removeTaskLogResponse = new RemoveTaskLogResponseCommand(status);
channel.writeAndFlush(removeTaskLogResponse.convert2Command(command.getOpaque()));
break;
case GET_APP_ID_REQUEST:
GetAppIdRequestCommand getAppIdRequestCommand = JSONUtils.parseObject(command.getBody(), GetAppIdRequestCommand.class);
String logPath = getAppIdRequestCommand.getLogPath();
if (!checkPathSecurity(logPath)) {
throw new IllegalArgumentException("Illegal path");
}
List<String> appIds = LogUtils.getAppIdsFromLogFile(logPath);
channel.writeAndFlush(new GetAppIdResponseCommand(appIds).convert2Command(command.getOpaque()));
break;
default:
throw new IllegalArgumentException("unknown commandType: " + commandType);
}

27
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java

@ -19,44 +19,23 @@ package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* remove task log request,
*/
GET_APP_ID_REQUEST,
GET_APP_ID_RESPONSE,
REMOVE_TAK_LOG_REQUEST,
/**
* remove task log response
*/
REMOVE_TAK_LOG_RESPONSE,
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,

44
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdRequestCommand.java

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command.log;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import java.io.Serializable;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class GetAppIdRequestCommand implements Serializable {
private String logPath;
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.GET_APP_ID_REQUEST);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
}

44
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdResponseCommand.java

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command.log;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import java.io.Serializable;
import java.util.List;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class GetAppIdResponseCommand implements Serializable {
private List<String> appIds;
public Command convert2Command(long opaque) {
Command command = new Command(opaque);
command.setType(CommandType.GET_APP_ID_RESPONSE);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
}

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/rpc/remote/NettyClient.java

@ -93,7 +93,7 @@ public class NettyClient {
/**
* channels
*/
private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap(128);
private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap<>(128);
/**
* get channel

33
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

@ -17,11 +17,14 @@
package org.apache.dolphinscheduler.server.utils;
import lombok.NonNull;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -29,9 +32,8 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.nio.charset.StandardCharsets;
@ -41,11 +43,6 @@ import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.NonNull;
/**
* mainly used to get the start command line of a process.
*/
@ -190,12 +187,12 @@ public class ProcessUtils {
}
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
String log;
List<String> appIds;
try (LogClientService logClient = new LogClientService()) {
Host host = Host.of(taskExecutionContext.getHost());
log = logClient.viewLog(host.getIp(), host.getPort(), taskExecutionContext.getLogPath());
appIds = logClient.getAppIds(host.getIp(), host.getPort(), taskExecutionContext.getLogPath());
}
if (!StringUtils.isEmpty(log)) {
if (CollectionUtils.isNotEmpty(appIds)) {
if (StringUtils.isEmpty(taskExecutionContext.getExecutePath())) {
taskExecutionContext.setExecutePath(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
taskExecutionContext.getProcessDefineCode(),
@ -204,15 +201,13 @@ public class ProcessUtils {
taskExecutionContext.getTaskInstanceId()));
}
FileUtils.createWorkDirIfAbsent(taskExecutionContext.getExecutePath());
List<String> appIds = LoggerUtils.getAppIds(log, logger);
if (CollectionUtils.isNotEmpty(appIds)) {
cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
return appIds;
}
cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
return appIds;
} else {
logger.info("The current appId is empty, don't need to kill the yarn job, taskInstanceId: {}", taskExecutionContext.getTaskInstanceId());
}
} catch (Exception e) {
logger.error("kill yarn job failure", e);
logger.error("Kill yarn job failure, taskInstanceId: {}", taskExecutionContext.getTaskInstanceId(), e);
}
return Collections.emptyList();
}

3
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java

@ -19,14 +19,13 @@ package org.apache.dolphinscheduler.server.utils;
import static org.powermock.api.mockito.PowerMockito.when;
import org.apache.commons.lang3.SystemUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.commons.lang.SystemUtils;
import java.util.ArrayList;
import java.util.List;

31
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java

@ -17,11 +17,15 @@
package org.apache.dolphinscheduler.service.log;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.log.GetAppIdRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.GetAppIdResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand;
@ -31,11 +35,14 @@ import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand
import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.List;
/**
* log client
*/
@ -195,6 +202,28 @@ public class LogClientService implements AutoCloseable {
return result;
}
public @Nullable List<String> getAppIds(@NonNull String host, int port, @NonNull String taskLogFilePath) throws RemotingException, InterruptedException {
logger.info("Begin to get appIds from worker: {}:{} taskLogPath: {}", host, port, taskLogFilePath);
final Host workerAddress = new Host(host, port);
List<String> appIds = null;
try {
if (NetUtils.getHost().equals(host)) {
appIds = LogUtils.getAppIdsFromLogFile(taskLogFilePath);
} else {
final Command command = new GetAppIdRequestCommand(taskLogFilePath).convert2Command();
Command response = this.client.sendSync(workerAddress, command, LOG_REQUEST_TIMEOUT);
if (response != null) {
GetAppIdResponseCommand responseCommand = JSONUtils.parseObject(response.getBody(), GetAppIdResponseCommand.class);
appIds = responseCommand.getAppIds();
}
}
} finally {
client.closeChannel(workerAddress);
}
logger.info("Get appIds: {} from worker: {}:{} taskLogPath: {}", appIds, host, port, taskLogFilePath);
return appIds;
}
public boolean isRunning() {
return isRunning;
}

22
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

@ -17,18 +17,14 @@
package org.apache.dolphinscheduler.plugin.task.api;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import org.slf4j.Logger;
import java.io.*;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -44,9 +40,8 @@ import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
/**
* abstract command executor
@ -55,8 +50,8 @@ public abstract class AbstractCommandExecutor {
/**
* rules for extracting application ID
*/
protected static final Pattern APPLICATION_REGEX = Pattern.compile(TaskConstants.APPLICATION_REGEX);
protected static final Pattern APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX);
/**
* rules for extracting Var Pool
*/
@ -406,6 +401,7 @@ public abstract class AbstractCommandExecutor {
/**
* find var pool
*
* @param line
* @return
*/

2
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java

@ -26,7 +26,7 @@ public class TaskConstants {
throw new IllegalStateException("Utility class");
}
public static final String APPLICATION_REGEX = "application_\\d+_\\d+";
public static final String YARN_APPLICATION_REGEX = "application_\\d+_\\d+";
public static final String SETVALUE_REGEX = "[\\$#]\\{setValue\\(([^)]*)\\)}";

75
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.utils;
import lombok.NonNull;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
@Slf4j
@UtilityClass
public class LogUtils {
private static final Pattern APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX);
public List<String> getAppIdsFromLogFile(@NonNull String logPath) {
return getAppIdsFromLogFile(logPath, log);
}
public List<String> getAppIdsFromLogFile(@NonNull String logPath, Logger logger) {
File logFile = new File(logPath);
if (!logFile.exists() || !logFile.isFile()) {
return Collections.emptyList();
}
Set<String> appIds = new HashSet<>();
try (Stream<String> stream = Files.lines(Paths.get(logPath))) {
stream.filter(line -> {
Matcher matcher = APPLICATION_REGEX.matcher(line);
return matcher.find();
}
).forEach(line -> {
Matcher matcher = APPLICATION_REGEX.matcher(line);
if (matcher.find()) {
String appId = matcher.group();
if (appIds.add(appId)) {
logger.info("Find appId: {} from {}", appId, logPath);
}
}
});
return new ArrayList<>(appIds);
} catch (IOException e) {
logger.error("Get appId from log file erro, logPath: {}", logPath, e);
return Collections.emptyList();
}
}
}

36
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.utils;
import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class LogUtilsTest {
private static final String APP_ID_FILE = LogUtilsTest.class.getResource("/appId.txt")
.getFile();
@Test
public void getAppIdsFromLogFile() {
List<String> appIds = LogUtils.getAppIdsFromLogFile(APP_ID_FILE);
Assert.assertEquals(Lists.newArrayList("application_1548381669007_1234"), appIds);
}
}

29
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/appId.txt

@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
19/04/02 11:40:22 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED)
19/04/02 11:40:23 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED)
19/04/02 11:40:24 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED)
19/04/02 11:40:25 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED)
19/04/02 11:40:26 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED)
19/04/02 11:40:27 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED)
19/04/02 11:40:28 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED)
19/04/02 11:40:29 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED)
19/04/02 11:40:30 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED)
19/04/02 11:40:31 INFO yarn.Client: Application report for application_1548381669007_1234 (state: ACCEPTED)
19/04/02 11:40:32 INFO yarn.Client: Application report for application_1548381669007_1234 (state: RUNNING)
19/04/02 11:40:33 INFO yarn.Client: Application report for application_1548381669007_1234 (state: RUNNING)

2
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -68,7 +68,7 @@ public class WorkerConfig implements Validator {
if (workerConfig.getExecThreads() <= 0) {
errors.rejectValue("exec-threads", null, "should be a positive value");
}
if (workerConfig.getHeartbeatInterval().toMillis() <= 0) {
if (workerConfig.getHeartbeatInterval().getSeconds() <= 0) {
errors.rejectValue("heartbeat-interval", null, "shoule be a valid duration");
}
if (workerConfig.getMaxCpuLoadAvg() <= 0) {

59
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -17,8 +17,14 @@
package org.apache.dolphinscheduler.server.worker.processor;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@ -49,12 +55,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
/**
* task kill processor
*/
@ -140,11 +140,6 @@ public class TaskKillProcessor implements NettyRequestProcessor {
});
}
/**
* do kill
*
* @return kill result
*/
private Pair<Boolean, List<String>> doKill(TaskExecutionContext taskExecutionContext) {
// kill system process
boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId());
@ -207,29 +202,33 @@ public class TaskKillProcessor implements NettyRequestProcessor {
/**
* kill yarn job
*
* @param host host
* @param logPath logPath
* @param host host
* @param logPath logPath
* @param executePath executePath
* @param tenantCode tenantCode
* @param tenantCode tenantCode
* @return Pair<Boolean, List < String>> yarn kill result
*/
private Pair<Boolean, List<String>> killYarnJob(Host host, String logPath, String executePath, String tenantCode) {
try (LogClientService logClient = new LogClientService();) {
logger.info("log host : {} , logPath : {} , port : {}", host.getIp(), logPath,
host.getPort());
String log = logClient.viewLog(host.getIp(), host.getPort(), logPath);
List<String> appIds = Collections.emptyList();
if (!StringUtils.isEmpty(log)) {
appIds = LoggerUtils.getAppIds(log, logger);
if (StringUtils.isEmpty(executePath)) {
logger.error("task instance execute path is empty");
throw new RuntimeException("task instance execute path is empty");
}
if (appIds.size() > 0) {
ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath);
}
private Pair<Boolean, List<String>> killYarnJob(@NonNull Host host,
String logPath,
String executePath,
String tenantCode) {
if (logPath == null || executePath == null || tenantCode == null) {
logger.error("Kill yarn job error, the input params is illegal, host: {}, logPath: {}, executePath: {}, tenantCode: {}",
host, logPath, executePath, tenantCode);
return Pair.of(false, Collections.emptyList());
}
try (LogClientService logClient = new LogClientService()) {
logger.info("Get appIds from worker {}:{} taskLogPath: {}", host.getIp(), host.getPort(), logPath);
List<String> appIds = logClient.getAppIds(host.getIp(), host.getPort(), logPath);
if (CollectionUtils.isEmpty(appIds)) {
return Pair.of(true, Collections.emptyList());
}
ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath);
return Pair.of(true, appIds);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("kill yarn job error, the current thread has been interrtpted", e);
} catch (Exception e) {
logger.error("kill yarn job error", e);
}

1
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java

@ -74,6 +74,7 @@ public class WorkerRpcServer implements Closeable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_APP_ID_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);

Loading…
Cancel
Save