Browse Source

cancelTaskInstance set TaskExecutionContext host,logPath,executePath (#2126)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify

* worker remove db

* ShellTask modify

* master persistence processId and appIds

* master persistence processId and appIds

* master add kill task logic

* master add kill task logic

* master add kill task logic

* javadoc error modify

* remove chinese log

* executeDirectly method add Override

* remote module modify

* TaskKillResponseProcessor command type modify

* create buildKillCommand

* host add host:port format

* host add host:port format

* TaskAckProcessor modify

* TaskAckProcessor modify

* task prioriry refator

* remove ITaskQueue

* task prioriry refator

* remove ITaskQueue

* TaskPriority refactor

* remove logs

* WorkerServer refactor

* MasterSchedulerService modify

* WorkerConfig listen port modify

* modify master and worker listen port

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
pull/2/head
qiaozhanwei 5 years ago committed by GitHub
parent
commit
20af8ff93e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  2. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  3. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
  4. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
  5. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  6. 40
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -46,7 +46,7 @@ public class MasterConfig {
@Value("${master.host.selector:lowerWeight}")
private String hostSelector;
@Value("${master.listen.port:45678}")
@Value("${master.listen.port:5678}")
private int listenPort;
public int getListenPort() {

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.ThreadPoolExecutor;
/**
@ -59,28 +60,33 @@ public class MasterSchedulerService extends Thread {
@Autowired
private ZKMasterClient zkMasterClient;
/**
* master config
*/
@Autowired
private MasterConfig masterConfig;
/**
* netty remoting client
*/
private final NettyRemotingClient nettyRemotingClient;
private NettyRemotingClient nettyRemotingClient;
/**
* master exec service
*/
private final ThreadPoolExecutor masterExecService;
private ThreadPoolExecutor masterExecService;
/**
* constructor of MasterSchedulerThread
*/
public MasterSchedulerService(){
@PostConstruct
public void init(){
this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
@Override
public void start(){
super.setName("MasterSchedulerThread");
super.start();

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

@ -184,6 +184,9 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(taskInstance.getId());
taskExecutionContext.setHost(taskInstance.getHost());
taskExecutionContext.setLogPath(taskInstance.getLogPath());
taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
taskExecutionContext.setProcessId(taskInstance.getPid());
ExecutionContext executionContext = new ExecutionContext(taskExecutionContext.toKillCommand(), ExecutorType.WORKER, taskExecutionContext.getWorkerGroup());

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

@ -273,15 +273,15 @@ public class ProcessUtils {
* @param appIds app id list
* @param logger logger
* @param tenantCode tenant code
* @param workDir work dir
* @param executePath execute path
* @throws IOException io exception
*/
public static void cancelApplication(List<String> appIds, Logger logger, String tenantCode,String workDir)
public static void cancelApplication(List<String> appIds, Logger logger, String tenantCode,String executePath)
throws IOException {
if (appIds.size() > 0) {
String appid = appIds.get(appIds.size() - 1);
String commandFile = String
.format("%s/%s.kill", workDir, appid);
.format("%s/%s.kill", executePath, appid);
String cmd = "yarn application -kill " + appid;
try {
StringBuilder sb = new StringBuilder();
@ -309,7 +309,7 @@ public class ProcessUtils {
Runtime.getRuntime().exec(runCmd);
} catch (Exception e) {
logger.error("kill application failed", e);
logger.error("kill application error", e);
}
}
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -40,7 +40,7 @@ public class WorkerConfig {
@Value("${worker.group: default}")
private String workerGroup;
@Value("${worker.listen.port: 12345}")
@Value("${worker.listen.port: 1234}")
private int listenPort;
public int getListenPort() {

40
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
@ -104,15 +105,24 @@ public class TaskKillProcessor implements NettyRequestProcessor {
// find log and kill yarn job
killYarnJob(context.getHost(), context.getLogPath(), context.getExecutePath(), context.getTenantCode());
killYarnJob(Host.of(context.getHost()).getIp(),
context.getLogPath(),
context.getExecutePath(),
context.getTenantCode());
return true;
} catch (Exception e) {
logger.error("kill task failed", e);
logger.error("kill task error", e);
return false;
}
}
/**
* task kill process
*
* @param channel channel channel
* @param command command command
*/
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
@ -160,26 +170,18 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* @param executePath executePath
* @param tenantCode tenantCode
*/
public void killYarnJob(String host, String logPath, String executePath, String tenantCode) {
List<String> appIds = null;
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
private void killYarnJob(String host, String logPath, String executePath, String tenantCode) {
LogClientService logClient = null;
String log = null;
try {
logClient = new LogClientService();
logger.info("view log host : {},logPath : {}", host,logPath);
log = logClient.viewLog(host, Constants.RPC_PORT, logPath);
} finally {
if(logClient != null){
logClient.close();
}
}
String log = logClient.viewLog(host, Constants.RPC_PORT, logPath);
if (StringUtils.isNotEmpty(log)) {
appIds = LoggerUtils.getAppIds(log, logger);
List<String> appIds = LoggerUtils.getAppIds(log, logger);
if (StringUtils.isEmpty(executePath)) {
logger.error("task instance work dir is empty");
throw new RuntimeException("task instance work dir is empty");
logger.error("task instance execute path is empty");
throw new RuntimeException("task instance execute path is empty");
}
if (appIds.size() > 0) {
ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath);
@ -187,7 +189,11 @@ public class TaskKillProcessor implements NettyRequestProcessor {
}
} catch (Exception e) {
logger.error("kill yarn job failure",e);
logger.error("kill yarn job error",e);
} finally {
if(logClient != null){
logClient.close();
}
}
}

Loading…
Cancel
Save