diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index 7e6ae5618a..86dd1c9083 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -46,7 +46,7 @@ public class MasterConfig { @Value("${master.host.selector:lowerWeight}") private String hostSelector; - @Value("${master.listen.port:45678}") + @Value("${master.listen.port:5678}") private int listenPort; public int getListenPort() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 6949ada022..c1925e0adc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.annotation.PostConstruct; import java.util.concurrent.ThreadPoolExecutor; /** @@ -59,28 +60,33 @@ public class MasterSchedulerService extends Thread { @Autowired private ZKMasterClient zkMasterClient; + /** + * master config + */ @Autowired private MasterConfig masterConfig; /** * netty remoting client */ - private final NettyRemotingClient nettyRemotingClient; + private NettyRemotingClient nettyRemotingClient; /** * master exec service */ - private final ThreadPoolExecutor masterExecService; + private ThreadPoolExecutor masterExecService; /** * constructor of MasterSchedulerThread */ - public MasterSchedulerService(){ + @PostConstruct + public void init(){ this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()); NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); } + @Override public void start(){ super.setName("MasterSchedulerThread"); super.start(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index 1197dc279d..172794e815 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -184,6 +184,9 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); taskExecutionContext.setTaskInstanceId(taskInstance.getId()); + taskExecutionContext.setHost(taskInstance.getHost()); + taskExecutionContext.setLogPath(taskInstance.getLogPath()); + taskExecutionContext.setExecutePath(taskInstance.getExecutePath()); taskExecutionContext.setProcessId(taskInstance.getPid()); ExecutionContext executionContext = new ExecutionContext(taskExecutionContext.toKillCommand(), ExecutorType.WORKER, taskExecutionContext.getWorkerGroup()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java index ee1f09173b..f29e7df4f8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java @@ -273,15 +273,15 @@ public class ProcessUtils { * @param appIds app id list * @param logger logger * @param tenantCode tenant code - * @param workDir work dir + * @param executePath execute path * @throws IOException io exception */ - public static void cancelApplication(List appIds, Logger logger, String tenantCode,String workDir) + public static void cancelApplication(List appIds, Logger logger, String tenantCode,String executePath) throws IOException { if (appIds.size() > 0) { String appid = appIds.get(appIds.size() - 1); String commandFile = String - .format("%s/%s.kill", workDir, appid); + .format("%s/%s.kill", executePath, appid); String cmd = "yarn application -kill " + appid; try { StringBuilder sb = new StringBuilder(); @@ -309,7 +309,7 @@ public class ProcessUtils { Runtime.getRuntime().exec(runCmd); } catch (Exception e) { - logger.error("kill application failed", e); + logger.error("kill application error", e); } } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index f3e701b6c9..792f9229d0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -40,7 +40,7 @@ public class WorkerConfig { @Value("${worker.group: default}") private String workerGroup; - @Value("${worker.listen.port: 12345}") + @Value("${worker.listen.port: 1234}") private int listenPort; public int getListenPort() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index c23f1995e4..5a8c6686c6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; @@ -104,15 +105,24 @@ public class TaskKillProcessor implements NettyRequestProcessor { // find log and kill yarn job - killYarnJob(context.getHost(), context.getLogPath(), context.getExecutePath(), context.getTenantCode()); + killYarnJob(Host.of(context.getHost()).getIp(), + context.getLogPath(), + context.getExecutePath(), + context.getTenantCode()); return true; } catch (Exception e) { - logger.error("kill task failed", e); + logger.error("kill task error", e); return false; } } + /** + * task kill process + * + * @param channel channel channel + * @param command command command + */ @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); @@ -160,26 +170,18 @@ public class TaskKillProcessor implements NettyRequestProcessor { * @param executePath executePath * @param tenantCode tenantCode */ - public void killYarnJob(String host, String logPath, String executePath, String tenantCode) { - List appIds = null; + private void killYarnJob(String host, String logPath, String executePath, String tenantCode) { + LogClientService logClient = null; try { - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - LogClientService logClient = null; - String log = null; - try { - logClient = new LogClientService(); - logger.info("view log host : {},logPath : {}", host,logPath); - log = logClient.viewLog(host, Constants.RPC_PORT, logPath); - } finally { - if(logClient != null){ - logClient.close(); - } - } + logClient = new LogClientService(); + logger.info("view log host : {},logPath : {}", host,logPath); + String log = logClient.viewLog(host, Constants.RPC_PORT, logPath); + if (StringUtils.isNotEmpty(log)) { - appIds = LoggerUtils.getAppIds(log, logger); + List appIds = LoggerUtils.getAppIds(log, logger); if (StringUtils.isEmpty(executePath)) { - logger.error("task instance work dir is empty"); - throw new RuntimeException("task instance work dir is empty"); + logger.error("task instance execute path is empty"); + throw new RuntimeException("task instance execute path is empty"); } if (appIds.size() > 0) { ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath); @@ -187,7 +189,11 @@ public class TaskKillProcessor implements NettyRequestProcessor { } } catch (Exception e) { - logger.error("kill yarn job failure",e); + logger.error("kill yarn job error",e); + } finally { + if(logClient != null){ + logClient.close(); + } } }