diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index 7d28a77938..a758028233 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -38,7 +38,6 @@ public class TaskExecutionContext implements Serializable { */ private int taskInstanceId; - /** * task name */ @@ -506,22 +505,22 @@ public class TaskExecutionContext implements Serializable { @Override public String toString() { return "TaskExecutionContext{" - + "taskInstanceId=" + taskInstanceId - + ", taskName='" + taskName + '\'' - + ", currentExecutionStatus=" + currentExecutionStatus - + ", firstSubmitTime=" + firstSubmitTime - + ", startTime=" + startTime - + ", taskType='" + taskType + '\'' - + ", host='" + host + '\'' - + ", executePath='" + executePath + '\'' - + ", logPath='" + logPath + '\'' - + ", taskJson='" + taskJson + '\'' - + ", processId=" + processId - + ", appIds='" + appIds + '\'' - + ", processInstanceId=" + processInstanceId - + ", scheduleTime=" + scheduleTime - + ", globalParams='" + globalParams + '\'' - + ", executorId=" + executorId + + "taskInstanceId=" + taskInstanceId + + ", taskName='" + taskName + '\'' + + ", currentExecutionStatus=" + currentExecutionStatus + + ", firstSubmitTime=" + firstSubmitTime + + ", startTime=" + startTime + + ", taskType='" + taskType + '\'' + + ", host='" + host + '\'' + + ", executePath='" + executePath + '\'' + + ", logPath='" + logPath + '\'' + + ", taskJson='" + taskJson + '\'' + + ", processId=" + processId + + ", appIds='" + appIds + '\'' + + ", processInstanceId=" + processInstanceId + + ", scheduleTime=" + scheduleTime + + ", globalParams='" + globalParams + '\'' + + ", executorId=" + executorId + ", cmdTypeIfComplement=" + cmdTypeIfComplement + ", tenantCode='" + tenantCode + '\'' + ", queue='" + queue + '\'' diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java index 7df8e01b3d..71c795b0a3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.worker.cache; - import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; /** @@ -42,7 +41,16 @@ public interface TaskExecutionContextCacheManager { /** * remove taskInstance by taskInstanceId + * * @param taskInstanceId taskInstanceId */ void removeByTaskInstanceId(Integer taskInstanceId); + + /** + * If the value for the specified key is present and non-null,then perform the update,otherwise it will return false + * + * @param taskExecutionContext taskExecutionContext + * @return status + */ + boolean updateTaskExecutionContext(TaskExecutionContext taskExecutionContext); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java index 9c92fb2d64..5c3f9904b6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java @@ -19,13 +19,14 @@ package org.apache.dolphinscheduler.server.worker.cache.impl; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; -import org.springframework.stereotype.Service; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.springframework.stereotype.Service; + /** - * TaskExecutionContextCache + * TaskExecutionContextCache */ @Service public class TaskExecutionContextCacheManagerImpl implements TaskExecutionContextCacheManager { @@ -34,7 +35,7 @@ public class TaskExecutionContextCacheManagerImpl implements TaskExecutionContex /** * taskInstance cache */ - private Map taskExecutionContextCache = new ConcurrentHashMap<>(); + private Map taskExecutionContextCache = new ConcurrentHashMap<>(); /** * get taskInstance by taskInstance id @@ -54,15 +55,22 @@ public class TaskExecutionContextCacheManagerImpl implements TaskExecutionContex */ @Override public void cacheTaskExecutionContext(TaskExecutionContext taskExecutionContext) { - taskExecutionContextCache.put(taskExecutionContext.getTaskInstanceId(),taskExecutionContext); + taskExecutionContextCache.put(taskExecutionContext.getTaskInstanceId(), taskExecutionContext); } /** * remove taskInstance by taskInstanceId + * * @param taskInstanceId taskInstanceId */ @Override public void removeByTaskInstanceId(Integer taskInstanceId) { taskExecutionContextCache.remove(taskInstanceId); } + + @Override + public boolean updateTaskExecutionContext(TaskExecutionContext taskExecutionContext) { + taskExecutionContextCache.computeIfPresent(taskExecutionContext.getTaskInstanceId(), (k, v) -> taskExecutionContext); + return taskExecutionContextCache.containsKey(taskExecutionContext.getTaskInstanceId()); + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 6080baf3cb..3fe3b6dc53 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -21,13 +21,11 @@ import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.Preconditions; -import org.apache.dolphinscheduler.common.utils.RetryerUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; @@ -36,6 +34,8 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -47,44 +47,60 @@ import java.util.concurrent.ExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import io.netty.channel.Channel; /** - * worker request processor + * worker request processor */ public class TaskExecuteProcessor implements NettyRequestProcessor { - private final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class); + private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class); /** - * thread executor service + * thread executor service */ private final ExecutorService workerExecService; /** - * worker config + * worker config */ private final WorkerConfig workerConfig; /** - * task callback service + * task callback service */ private final TaskCallbackService taskCallbackService; + /** + * taskExecutionContextCacheManager + */ + private TaskExecutionContextCacheManager taskExecutionContextCacheManager; + public TaskExecuteProcessor() { this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()); + this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); + } + + /** + * Pre-cache task to avoid extreme situations when kill task. There is no such task in the cache + * + * @param taskExecutionContext task + */ + private void setTaskCache(TaskExecutionContext taskExecutionContext) { + TaskExecutionContext preTaskCache = new TaskExecutionContext(); + preTaskCache.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); } @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(), - String.format("invalid command type : %s", command.getType())); + String.format("invalid command type : %s", command.getType())); TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject( - command.getBody(), TaskExecuteRequestCommand.class); + command.getBody(), TaskExecuteRequestCommand.class); logger.info("received command : {}", taskRequestCommand); @@ -100,11 +116,12 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { logger.error("task execution context is null"); return; } + setTaskCache(taskExecutionContext); // custom logger Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, - taskExecutionContext.getProcessDefineId(), - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId())); + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId())); taskExecutionContext.setHost(NetUtils.getHost() + ":" + workerConfig.getListenPort()); taskExecutionContext.setStartTime(new Date()); @@ -120,13 +137,14 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode()); } catch (Throwable ex) { String errorLog = String.format("create execLocalPath : %s", execLocalPath); - LoggerUtils.logError(Optional.ofNullable(logger), errorLog, ex); + LoggerUtils.logError(Optional.of(logger), errorLog, ex); LoggerUtils.logError(Optional.ofNullable(taskLogger), errorLog, ex); + taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); } FileUtils.taskLoggerThreadLocal.remove(); taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), - new NettyRemoteChannel(channel, command.getOpaque())); + new NettyRemoteChannel(channel, command.getOpaque())); this.doAck(taskExecutionContext); @@ -134,15 +152,16 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger)); } - private void doAck(TaskExecutionContext taskExecutionContext){ + private void doAck(TaskExecutionContext taskExecutionContext) { // tell master that task is in executing TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext); - ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),ackCommand.convert2Command(),Event.ACK); + ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command(), Event.ACK); taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command()); } /** * build ack command + * * @param taskExecutionContext taskExecutionContext * @return TaskExecuteAckCommand */ @@ -164,13 +183,14 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { /** * get execute local path + * * @param taskExecutionContext taskExecutionContext * @return execute local path */ private String getExecLocalPath(TaskExecutionContext taskExecutionContext) { return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(), - taskExecutionContext.getProcessDefineId(), - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId()); + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId()); } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 21108d1291..45268e6d86 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -48,19 +48,19 @@ import org.slf4j.LoggerFactory; import io.netty.channel.Channel; /** - * task kill processor + * task kill processor */ public class TaskKillProcessor implements NettyRequestProcessor { private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class); /** - * worker config + * worker config */ private final WorkerConfig workerConfig; /** - * task callback service + * task callback service */ private final TaskCallbackService taskCallbackService; @@ -90,28 +90,29 @@ public class TaskKillProcessor implements NettyRequestProcessor { Pair> result = doKill(killCommand); taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(), - new NettyRemoteChannel(channel, command.getOpaque())); + new NettyRemoteChannel(channel, command.getOpaque())); - TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand,result); + TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand, result); taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command()); taskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId()); } /** - * do kill + * do kill + * * @param killCommand * @return kill result */ private Pair> doKill(TaskKillRequestCommand killCommand) { - List appIds = Collections.EMPTY_LIST; + List appIds = Collections.emptyList(); try { - TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId()); - + int taskInstanceId = killCommand.getTaskInstanceId(); + TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); Integer processId = taskExecutionContext.getProcessId(); - - if (processId == null || processId.equals(0)) { - logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId()); - return Pair.of(false, appIds); + if (processId.equals(0)) { + taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); + logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId); + return Pair.of(true, appIds); } String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(taskExecutionContext.getProcessId())); @@ -122,9 +123,9 @@ public class TaskKillProcessor implements NettyRequestProcessor { // find log and kill yarn job appIds = killYarnJob(Host.of(taskExecutionContext.getHost()).getIp(), - taskExecutionContext.getLogPath(), - taskExecutionContext.getExecutePath(), - taskExecutionContext.getTenantCode()); + taskExecutionContext.getLogPath(), + taskExecutionContext.getExecutePath(), + taskExecutionContext.getTenantCode()); return Pair.of(true, appIds); } catch (Exception e) { @@ -136,8 +137,8 @@ public class TaskKillProcessor implements NettyRequestProcessor { /** * build TaskKillResponseCommand * - * @param killCommand kill command - * @param result exe result + * @param killCommand kill command + * @param result exe result * @return build TaskKillResponseCommand */ private TaskKillResponseCommand buildKillTaskResponseCommand(TaskKillRequestCommand killCommand, @@ -155,20 +156,20 @@ public class TaskKillProcessor implements NettyRequestProcessor { } /** - * kill yarn job + * kill yarn job * - * @param host host - * @param logPath logPath + * @param host host + * @param logPath logPath * @param executePath executePath - * @param tenantCode tenantCode + * @param tenantCode tenantCode * @return List appIds */ private List killYarnJob(String host, String logPath, String executePath, String tenantCode) { LogClientService logClient = null; try { logClient = new LogClientService(); - logger.info("view log host : {},logPath : {}", host,logPath); - String log = logClient.viewLog(host, Constants.RPC_PORT, logPath); + logger.info("view log host : {},logPath : {}", host, logPath); + String log = logClient.viewLog(host, Constants.RPC_PORT, logPath); if (StringUtils.isNotEmpty(log)) { List appIds = LoggerUtils.getAppIds(log, logger); @@ -182,7 +183,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { } } } catch (Exception e) { - logger.error("kill yarn job error",e); + logger.error("kill yarn job error", e); } finally { if (logClient != null) { logClient.close(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 89af95278c..da5c0e6980 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -14,36 +14,44 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task; +import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_KILL; +import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; -import org.slf4j.Logger; -import java.io.*; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; -import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE; -import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS; +import org.slf4j.Logger; /** * abstract command executor @@ -56,22 +64,22 @@ public abstract class AbstractCommandExecutor { protected StringBuilder varPool = new StringBuilder(); /** - * process + * process */ private Process process; /** - * log handler + * log handler */ protected Consumer> logHandler; /** - * logger + * logger */ protected Logger logger; /** - * log list + * log list */ protected final List logBuffer; @@ -86,8 +94,8 @@ public abstract class AbstractCommandExecutor { private TaskExecutionContextCacheManager taskExecutionContextCacheManager; public AbstractCommandExecutor(Consumer> logHandler, - TaskExecutionContext taskExecutionContext , - Logger logger){ + TaskExecutionContext taskExecutionContext, + Logger logger) { this.logHandler = logHandler; this.taskExecutionContext = taskExecutionContext; this.logger = logger; @@ -135,12 +143,18 @@ public abstract class AbstractCommandExecutor { * @return CommandExecuteResult * @throws Exception if error throws Exception */ - public CommandExecuteResult run(String execCommand) throws Exception{ + public CommandExecuteResult run(String execCommand) throws Exception { CommandExecuteResult result = new CommandExecuteResult(); - + int taskInstanceId = taskExecutionContext.getTaskInstanceId(); + // If the task has been killed, then the task in the cache is null + if (null == taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) { + result.setExitStatusCode(EXIT_CODE_KILL); + return result; + } if (StringUtils.isEmpty(execCommand)) { + taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); return result; } @@ -155,14 +169,18 @@ public abstract class AbstractCommandExecutor { // parse process output parseProcessOutput(process); - Integer processId = getProcessId(process); result.setProcessId(processId); // cache processId taskExecutionContext.setProcessId(processId); - taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); + boolean updateTaskExecutionContextStatus = taskExecutionContextCacheManager.updateTaskExecutionContext(taskExecutionContext); + if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) { + ProcessUtils.kill(taskExecutionContext); + result.setExitStatusCode(EXIT_CODE_KILL); + return result; + } // print process id logger.info("process start, process id is: {}", processId); @@ -173,11 +191,10 @@ public abstract class AbstractCommandExecutor { // waiting for the run to finish boolean status = process.waitFor(remainTime, TimeUnit.SECONDS); - logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{}", - taskExecutionContext.getExecutePath(), - processId - , result.getExitStatusCode()); + taskExecutionContext.getExecutePath(), + processId + , result.getExitStatusCode()); // if SHELL task exit if (status) { @@ -189,7 +206,7 @@ public abstract class AbstractCommandExecutor { result.setExitStatusCode(process.exitValue()); // if yarn task , yarn state is final state - if (process.exitValue() == 0){ + if (process.exitValue() == 0) { result.setExitStatusCode(isSuccessOfYarnState(appIds) ? EXIT_CODE_SUCCESS : EXIT_CODE_FAILURE); } } else { @@ -198,7 +215,6 @@ public abstract class AbstractCommandExecutor { result.setExitStatusCode(EXIT_CODE_FAILURE); } - return result; } @@ -208,6 +224,7 @@ public abstract class AbstractCommandExecutor { /** * cancel application + * * @throws Exception exception */ public void cancelApplication() throws Exception { @@ -238,6 +255,7 @@ public abstract class AbstractCommandExecutor { /** * soft kill + * * @param processId process id * @return process is alive * @throws InterruptedException interrupted exception @@ -262,6 +280,7 @@ public abstract class AbstractCommandExecutor { /** * hard kill + * * @param processId process id */ private void hardKill(int processId) { @@ -280,6 +299,7 @@ public abstract class AbstractCommandExecutor { /** * print command + * * @param commands process builder */ private void printCommand(List commands) { @@ -311,12 +331,13 @@ public abstract class AbstractCommandExecutor { /** * get the standard output of the process + * * @param process process */ private void parseProcessOutput(Process process) { String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId()); ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName); - parseProcessOutputExecutorService.submit(new Runnable(){ + parseProcessOutputExecutorService.submit(new Runnable() { @Override public void run() { BufferedReader inReader = null; @@ -337,7 +358,7 @@ public abstract class AbstractCommandExecutor { } } } catch (Exception e) { - logger.error(e.getMessage(),e); + logger.error(e.getMessage(), e); } finally { clear(); close(inReader); @@ -357,22 +378,22 @@ public abstract class AbstractCommandExecutor { boolean result = true; try { for (String appId : appIds) { - while(Stopper.isRunning()){ + while (Stopper.isRunning()) { ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId); - logger.info("appId:{}, final state:{}",appId,applicationStatus.name()); - if (applicationStatus.equals(ExecutionStatus.FAILURE) || - applicationStatus.equals(ExecutionStatus.KILL)) { + logger.info("appId:{}, final state:{}", appId, applicationStatus.name()); + if (applicationStatus.equals(ExecutionStatus.FAILURE) + || applicationStatus.equals(ExecutionStatus.KILL)) { return false; } - if (applicationStatus.equals(ExecutionStatus.SUCCESS)){ + if (applicationStatus.equals(ExecutionStatus.SUCCESS)) { break; } Thread.sleep(Constants.SLEEP_TIME_MILLIS); } } } catch (Exception e) { - logger.error(String.format("yarn applications: %s status failed ", appIds.toString()),e); + logger.error(String.format("yarn applications: %s status failed ", appIds.toString()), e); result = false; } return result; @@ -408,14 +429,15 @@ public abstract class AbstractCommandExecutor { /** * convert file to list + * * @param filename file name * @return line list */ private List convertFile2List(String filename) { List lineList = new ArrayList(100); - File file=new File(filename); + File file = new File(filename); - if (!file.exists()){ + if (!file.exists()) { return lineList; } @@ -427,13 +449,13 @@ public abstract class AbstractCommandExecutor { lineList.add(line); } } catch (Exception e) { - logger.error(String.format("read file: %s failed : ",filename),e); + logger.error(String.format("read file: %s failed : ", filename), e); } finally { - if(br != null){ + if (br != null) { try { br.close(); } catch (IOException e) { - logger.error(e.getMessage(),e); + logger.error(e.getMessage(), e); } } @@ -443,6 +465,7 @@ public abstract class AbstractCommandExecutor { /** * find app id + * * @param line line * @return appid */ @@ -454,7 +477,6 @@ public abstract class AbstractCommandExecutor { return null; } - /** * get remain time(s) * @@ -495,7 +517,7 @@ public abstract class AbstractCommandExecutor { /** * when log buffer siz or flush time reach condition , then flush * - * @param lastFlushTime last flush time + * @param lastFlushTime last flush time * @return last flush time */ private long flush(long lastFlushTime) { @@ -532,7 +554,10 @@ public abstract class AbstractCommandExecutor { protected List commandOptions() { return Collections.emptyList(); } + protected abstract String buildCommandFilePath(); + protected abstract String commandInterpreter(); + protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException; } \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManagerTest.java new file mode 100644 index 0000000000..d871257e82 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManagerTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.cache; + +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * TaskExecutionContextCacheManagerTest + */ +public class TaskExecutionContextCacheManagerTest { + + private TaskExecutionContextCacheManager taskExecutionContextCacheManager; + private TaskExecutionContext taskExecutionContext; + + @Before + public void before() { + taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl(); + } + + @Test + public void testGetByTaskInstanceId() { + taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskInstanceId(2); + taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); + Assert.assertEquals(2, taskExecutionContextCacheManager.getByTaskInstanceId(2).getTaskInstanceId()); + } + + @Test + public void updateTaskExecutionContext() { + taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskInstanceId(1); + taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); + Assert.assertTrue(taskExecutionContextCacheManager.updateTaskExecutionContext(taskExecutionContext)); + taskExecutionContextCacheManager.removeByTaskInstanceId(1); + Assert.assertFalse(taskExecutionContextCacheManager.updateTaskExecutionContext(taskExecutionContext)); + } + +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index 2e00a160c3..5a5561d1bd 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -14,9 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.processor; -import java.util.Date; +package org.apache.dolphinscheduler.server.worker.processor; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -37,6 +36,7 @@ import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseSer import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.zk.SpringZKServer; @@ -44,10 +44,12 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; + +import java.util.Date; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Mockito; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @@ -56,26 +58,28 @@ import io.netty.channel.Channel; /** * test task call back service + * todo refactor it in the form of mock */ @RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes={ - TaskCallbackServiceTestConfig.class, - SpringZKServer.class, - SpringApplicationContext.class, - MasterRegistry.class, - WorkerRegistry.class, - ZookeeperRegistryCenter.class, - MasterConfig.class, - WorkerConfig.class, - ZookeeperCachedOperator.class, - ZookeeperConfig.class, - ZookeeperNodeManager.class, - TaskCallbackService.class, - TaskResponseService.class, - TaskAckProcessor.class, - TaskResponseProcessor.class, - TaskExecuteProcessor.class, - CuratorZookeeperClient.class}) +@ContextConfiguration(classes = { + TaskCallbackServiceTestConfig.class, + SpringZKServer.class, + SpringApplicationContext.class, + MasterRegistry.class, + WorkerRegistry.class, + ZookeeperRegistryCenter.class, + MasterConfig.class, + WorkerConfig.class, + ZookeeperCachedOperator.class, + ZookeeperConfig.class, + ZookeeperNodeManager.class, + TaskCallbackService.class, + TaskResponseService.class, + TaskAckProcessor.class, + TaskResponseProcessor.class, + TaskExecuteProcessor.class, + CuratorZookeeperClient.class, + TaskExecutionContextCacheManagerImpl.class}) public class TaskCallbackServiceTest { @Autowired @@ -95,10 +99,11 @@ public class TaskCallbackServiceTest { /** * send ack test + * * @throws Exception */ @Test - public void testSendAck() throws Exception{ + public void testSendAck() throws Exception { final NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(30000); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); @@ -122,10 +127,11 @@ public class TaskCallbackServiceTest { /** * send result test + * * @throws Exception */ @Test - public void testSendResult() throws Exception{ + public void testSendResult() throws Exception { final NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(30000); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); @@ -136,7 +142,7 @@ public class TaskCallbackServiceTest { NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig); Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000")); taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1)); - TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(); + TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(); responseCommand.setTaskInstanceId(1); responseCommand.setEndTime(new Date()); @@ -152,20 +158,13 @@ public class TaskCallbackServiceTest { nettyRemotingClient.close(); } -// @Test(expected = IllegalArgumentException.class) -// public void testSendAckWithIllegalArgumentException(){ -// TaskExecuteAckCommand ackCommand = Mockito.mock(TaskExecuteAckCommand.class); -// taskCallbackService.sendAck(1, ackCommand.convert2Command()); -// Stopper.stop(); -// } - @Test - public void testPause(){ - Assert.assertEquals(5000, taskCallbackService.pause(3));; + public void testPause() { + Assert.assertEquals(5000, taskCallbackService.pause(3)); } @Test - public void testSendAck1(){ + public void testSendAck1() { masterRegistry.registry(); final NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(30000); @@ -177,7 +176,7 @@ public class TaskCallbackServiceTest { NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig); Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000")); taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1)); -// channel.close(); + // channel.close(); TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); ackCommand.setTaskInstanceId(1); @@ -185,7 +184,7 @@ public class TaskCallbackServiceTest { taskCallbackService.sendAck(1, ackCommand.convert2Command()); - Assert.assertEquals(true, channel.isOpen()); + Assert.assertTrue(channel.isOpen()); Stopper.stop(); @@ -195,7 +194,7 @@ public class TaskCallbackServiceTest { } @Test - public void testTaskExecuteProcessor() throws Exception{ + public void testTaskExecuteProcessor() throws Exception { final NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(30000); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); @@ -207,11 +206,11 @@ public class TaskCallbackServiceTest { TaskExecuteRequestCommand taskExecuteRequestCommand = new TaskExecuteRequestCommand(); - nettyRemotingClient.send(new Host("localhost",30000),taskExecuteRequestCommand.convert2Command()); + nettyRemotingClient.send(new Host("localhost", 30000), taskExecuteRequestCommand.convert2Command()); taskExecuteRequestCommand.setTaskExecutionContext(JSONUtils.toJsonString(new TaskExecutionContext())); - nettyRemotingClient.send(new Host("localhost",30000),taskExecuteRequestCommand.convert2Command()); + nettyRemotingClient.send(new Host("localhost", 30000), taskExecuteRequestCommand.convert2Command()); Thread.sleep(5000); @@ -223,40 +222,4 @@ public class TaskCallbackServiceTest { nettyRemotingClient.close(); } -// @Test(expected = IllegalStateException.class) -// public void testSendAckWithIllegalStateException2(){ -// masterRegistry.registry(); -// final NettyServerConfig serverConfig = new NettyServerConfig(); -// serverConfig.setListenPort(30000); -// NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); -// nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); -// nettyRemotingServer.start(); -// -// final NettyClientConfig clientConfig = new NettyClientConfig(); -// NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig); -// Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000")); -// taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1)); -// channel.close(); -// TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); -// ackCommand.setTaskInstanceId(1); -// ackCommand.setStartTime(new Date()); -// -// nettyRemotingServer.close(); -// -// taskCallbackService.sendAck(1, ackCommand.convert2Command()); -// try { -// Thread.sleep(5000); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// -// Stopper.stop(); -// -// try { -// Thread.sleep(5000); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// } - } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java new file mode 100644 index 0000000000..36a758ab1f --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.processor; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.utils.ProcessUtils; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.log.LogClientService; + +import java.util.Collections; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import io.netty.channel.Channel; + +/** + * TaskKillProcessorTest + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({SpringApplicationContext.class, TaskKillProcessor.class, OSUtils.class, ProcessUtils.class, LoggerUtils.class}) +public class TaskKillProcessorTest { + + private TaskKillProcessor taskKillProcessor; + + private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager; + + private Channel channel; + + private Command command; + + private TaskExecutionContext taskExecutionContext; + + @Before + public void before() throws Exception { + + TaskCallbackService taskCallbackService = PowerMockito.mock(TaskCallbackService.class); + WorkerConfig workerConfig = PowerMockito.mock(WorkerConfig.class); + taskExecutionContextCacheManager = PowerMockito.mock(TaskExecutionContextCacheManagerImpl.class); + + channel = PowerMockito.mock(Channel.class); + command = new Command(); + command.setType(CommandType.TASK_KILL_REQUEST); + TaskKillRequestCommand taskKillRequestCommand = new TaskKillRequestCommand(); + taskKillRequestCommand.setTaskInstanceId(1); + command.setBody(JSONUtils.toJsonString(taskKillRequestCommand).getBytes()); + taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskInstanceId(1); + LogClientService logClient = PowerMockito.mock(LogClientService.class); + + NettyRemoteChannel nettyRemoteChannel = PowerMockito.mock(NettyRemoteChannel.class); + PowerMockito.mockStatic(SpringApplicationContext.class); + PowerMockito.mockStatic(OSUtils.class); + PowerMockito.mockStatic(ProcessUtils.class); + PowerMockito.mockStatic(LoggerUtils.class); + PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class)).thenReturn(taskCallbackService); + PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)).thenReturn(workerConfig); + PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)).thenReturn(taskExecutionContextCacheManager); + PowerMockito.doNothing().when(taskCallbackService).addRemoteChannel(anyInt(), any()); + PowerMockito.whenNew(NettyRemoteChannel.class).withAnyArguments().thenReturn(null); + PowerMockito.when(OSUtils.exeCmd(any())).thenReturn(null); + PowerMockito.when(ProcessUtils.getPidsStr(102)).thenReturn("123"); + PowerMockito.whenNew(LogClientService.class).withAnyArguments().thenReturn(logClient); + PowerMockito.when(logClient.viewLog(any(), anyInt(), any())).thenReturn("test"); + PowerMockito.when(LoggerUtils.getAppIds(any(), any())).thenReturn(Collections.singletonList("id")); + + Command viewLogResponseCommand = new Command(); + viewLogResponseCommand.setBody("success".getBytes()); + + taskKillProcessor = new TaskKillProcessor(); + } + + @Test + public void testProcess() { + + PowerMockito.when(taskExecutionContextCacheManager.getByTaskInstanceId(1)).thenReturn(taskExecutionContext); + taskKillProcessor.process(channel, command); + + taskExecutionContext.setProcessId(101); + taskExecutionContext.setHost("127.0.0.1:22"); + taskExecutionContext.setLogPath("/log"); + taskExecutionContext.setExecutePath("/path"); + taskExecutionContext.setTenantCode("ten"); + taskKillProcessor.process(channel, command); + } + +}