From 20af8ff93e7d41485def1adaec90ba79065bbc46 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Mon, 9 Mar 2020 16:09:25 +0800 Subject: [PATCH] cancelTaskInstance set TaskExecutionContext host,logPath,executePath (#2126) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * add TaskInstanceCacheManager receive Worker report result * TaskInstance setExecutePath * add TaskInstanceCacheManager to receive Worker Task result report * TaskInstanceCacheManager add remove method * add license * add dispatcht task method * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * 1,worker TaskPros use TaskExecutionContext replase 2,Master kill Task , KillTaskProcessor modify * worker remove db * ShellTask modify * master persistence processId and appIds * master persistence processId and appIds * master add kill task logic * master add kill task logic * master add kill task logic * javadoc error modify * remove chinese log * executeDirectly method add Override * remote module modify * TaskKillResponseProcessor command type modify * create buildKillCommand * host add host:port format * host add host:port format * TaskAckProcessor modify * TaskAckProcessor modify * task prioriry refator * remove ITaskQueue * task prioriry refator * remove ITaskQueue * TaskPriority refactor * remove logs * WorkerServer refactor * MasterSchedulerService modify * WorkerConfig listen port modify * modify master and worker listen port * cancelTaskInstance set TaskExecutionContext host,logPath,executePath * cancelTaskInstance set TaskExecutionContext host,logPath,executePath Co-authored-by: qiaozhanwei --- .../server/master/config/MasterConfig.java | 2 +- .../master/runner/MasterSchedulerService.java | 12 +++-- .../master/runner/MasterTaskExecThread.java | 3 ++ .../server/utils/ProcessUtils.java | 8 ++-- .../server/worker/config/WorkerConfig.java | 2 +- .../worker/processor/TaskKillProcessor.java | 46 +++++++++++-------- 6 files changed, 44 insertions(+), 29 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index 7e6ae5618a..86dd1c9083 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -46,7 +46,7 @@ public class MasterConfig { @Value("${master.host.selector:lowerWeight}") private String hostSelector; - @Value("${master.listen.port:45678}") + @Value("${master.listen.port:5678}") private int listenPort; public int getListenPort() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 6949ada022..c1925e0adc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.annotation.PostConstruct; import java.util.concurrent.ThreadPoolExecutor; /** @@ -59,28 +60,33 @@ public class MasterSchedulerService extends Thread { @Autowired private ZKMasterClient zkMasterClient; + /** + * master config + */ @Autowired private MasterConfig masterConfig; /** * netty remoting client */ - private final NettyRemotingClient nettyRemotingClient; + private NettyRemotingClient nettyRemotingClient; /** * master exec service */ - private final ThreadPoolExecutor masterExecService; + private ThreadPoolExecutor masterExecService; /** * constructor of MasterSchedulerThread */ - public MasterSchedulerService(){ + @PostConstruct + public void init(){ this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()); NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); } + @Override public void start(){ super.setName("MasterSchedulerThread"); super.start(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index 1197dc279d..172794e815 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -184,6 +184,9 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); taskExecutionContext.setTaskInstanceId(taskInstance.getId()); + taskExecutionContext.setHost(taskInstance.getHost()); + taskExecutionContext.setLogPath(taskInstance.getLogPath()); + taskExecutionContext.setExecutePath(taskInstance.getExecutePath()); taskExecutionContext.setProcessId(taskInstance.getPid()); ExecutionContext executionContext = new ExecutionContext(taskExecutionContext.toKillCommand(), ExecutorType.WORKER, taskExecutionContext.getWorkerGroup()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java index ee1f09173b..f29e7df4f8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java @@ -273,15 +273,15 @@ public class ProcessUtils { * @param appIds app id list * @param logger logger * @param tenantCode tenant code - * @param workDir work dir + * @param executePath execute path * @throws IOException io exception */ - public static void cancelApplication(List appIds, Logger logger, String tenantCode,String workDir) + public static void cancelApplication(List appIds, Logger logger, String tenantCode,String executePath) throws IOException { if (appIds.size() > 0) { String appid = appIds.get(appIds.size() - 1); String commandFile = String - .format("%s/%s.kill", workDir, appid); + .format("%s/%s.kill", executePath, appid); String cmd = "yarn application -kill " + appid; try { StringBuilder sb = new StringBuilder(); @@ -309,7 +309,7 @@ public class ProcessUtils { Runtime.getRuntime().exec(runCmd); } catch (Exception e) { - logger.error("kill application failed", e); + logger.error("kill application error", e); } } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index f3e701b6c9..792f9229d0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -40,7 +40,7 @@ public class WorkerConfig { @Value("${worker.group: default}") private String workerGroup; - @Value("${worker.listen.port: 12345}") + @Value("${worker.listen.port: 1234}") private int listenPort; public int getListenPort() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index c23f1995e4..5a8c6686c6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; @@ -104,15 +105,24 @@ public class TaskKillProcessor implements NettyRequestProcessor { // find log and kill yarn job - killYarnJob(context.getHost(), context.getLogPath(), context.getExecutePath(), context.getTenantCode()); + killYarnJob(Host.of(context.getHost()).getIp(), + context.getLogPath(), + context.getExecutePath(), + context.getTenantCode()); return true; } catch (Exception e) { - logger.error("kill task failed", e); + logger.error("kill task error", e); return false; } } + /** + * task kill process + * + * @param channel channel channel + * @param command command command + */ @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); @@ -160,26 +170,18 @@ public class TaskKillProcessor implements NettyRequestProcessor { * @param executePath executePath * @param tenantCode tenantCode */ - public void killYarnJob(String host, String logPath, String executePath, String tenantCode) { - List appIds = null; + private void killYarnJob(String host, String logPath, String executePath, String tenantCode) { + LogClientService logClient = null; try { - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - LogClientService logClient = null; - String log = null; - try { - logClient = new LogClientService(); - logger.info("view log host : {},logPath : {}", host,logPath); - log = logClient.viewLog(host, Constants.RPC_PORT, logPath); - } finally { - if(logClient != null){ - logClient.close(); - } - } + logClient = new LogClientService(); + logger.info("view log host : {},logPath : {}", host,logPath); + String log = logClient.viewLog(host, Constants.RPC_PORT, logPath); + if (StringUtils.isNotEmpty(log)) { - appIds = LoggerUtils.getAppIds(log, logger); + List appIds = LoggerUtils.getAppIds(log, logger); if (StringUtils.isEmpty(executePath)) { - logger.error("task instance work dir is empty"); - throw new RuntimeException("task instance work dir is empty"); + logger.error("task instance execute path is empty"); + throw new RuntimeException("task instance execute path is empty"); } if (appIds.size() > 0) { ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath); @@ -187,7 +189,11 @@ public class TaskKillProcessor implements NettyRequestProcessor { } } catch (Exception e) { - logger.error("kill yarn job failure",e); + logger.error("kill yarn job error",e); + } finally { + if(logClient != null){ + logClient.close(); + } } }