Browse Source

add kill command

pull/2/head
Technoboy- 5 years ago
parent
commit
67e7a6c849
  1. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
  2. 1
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
  3. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  4. 39
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
  5. 66
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  6. 71
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  7. 103
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  8. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
  9. 71
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ EXECUTE_TASK_REQUEST, /** * execute task ack */ EXECUTE_TASK_ACK, /** * execute task response */ EXECUTE_TASK_RESPONSE, /** * ping */ PING, /** * pong */ PONG; }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ EXECUTE_TASK_REQUEST, /** * execute task ack EXECUTE_TASK_ACK, /** * execute task ack */ */ EXECUTE_TASK_ACK, /** /** * execute task response */ EXECUTE /** * kill task response */ KILL_TASK_RESPONSE, /** * ping */ PING, /** * pong */ PONG; }

1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java

@ -0,0 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * kill task request command */ public class KillTaskRequestCommand implements Serializable { private int taskInstanceId; private int processId; private String host; private String tenantCode; private String logPath; private String executePath; public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public String getTenantCode() { return tenantCode; } public void setTenantCode(String tenantCode) { this.tenantCode = tenantCode; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.KILL_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -35,7 +35,8 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.WorkerRequestProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -168,7 +169,8 @@ public class WorkerServer implements IStoppable {
//init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerRequestProcessor(processService));
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new TaskExecuteProcessor(processService));
this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.start();
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort());

39
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java → dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java

@ -18,24 +18,35 @@
package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
/**
* callback channel
*/
public class CallbackChannel {
public class NettyRemoteChannel {
/**
* channel
*/
private Channel channel;
private final Channel channel;
/**
* equest unique identification
*/
private long opaque;
private final long opaque;
public CallbackChannel(Channel channel, long opaque) {
/**
* master host
*/
private final Host host;
public NettyRemoteChannel(Channel channel, long opaque) {
this.channel = channel;
this.host = ChannelUtils.toAddress(channel);
this.opaque = opaque;
}
@ -43,15 +54,23 @@ public class CallbackChannel {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
public long getOpaque() {
return opaque;
}
public void setOpaque(long opaque) {
this.opaque = opaque;
public Host getHost() {
return host;
}
public boolean isActive(){
return this.channel.isActive();
}
public ChannelFuture writeAndFlush(Command command){
return this.channel.writeAndFlush(command);
}
public void close(){
this.channel.close();
}
}

66
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -21,8 +21,12 @@ package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
@ -31,18 +35,31 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class TaskCallbackService {
private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class);
/**
* remote channels
*/
private static final ConcurrentHashMap<Integer, NettyRemoteChannel> REMOTE_CHANNELS = new ConcurrentHashMap<>();
/**
* callback channels
* netty remoting client
*/
private static final ConcurrentHashMap<Integer, CallbackChannel> CALL_BACK_CHANNELS = new ConcurrentHashMap<>();
private final NettyRemotingClient nettyRemotingClient;
public TaskCallbackService(){
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
/**
* add callback channel
* @param taskInstanceId taskInstanceId
* @param channel channel
*/
public void addCallbackChannel(int taskInstanceId, CallbackChannel channel){
CALL_BACK_CHANNELS.put(taskInstanceId, channel);
public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel){
REMOTE_CHANNELS.put(taskInstanceId, channel);
}
/**
@ -50,15 +67,18 @@ public class TaskCallbackService {
* @param taskInstanceId taskInstanceId
* @return callback channel
*/
public CallbackChannel getCallbackChannel(int taskInstanceId){
CallbackChannel callbackChannel = CALL_BACK_CHANNELS.get(taskInstanceId);
if(callbackChannel.getChannel().isActive()){
return callbackChannel;
public NettyRemoteChannel getRemoteChannel(int taskInstanceId){
NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
if(nettyRemoteChannel.isActive()){
return nettyRemoteChannel;
}
Channel newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
if(newChannel != null){
NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, nettyRemoteChannel.getOpaque());
addRemoteChannel(taskInstanceId, remoteChannel);
return remoteChannel;
}
Channel newChannel = createChannel();
callbackChannel.setChannel(newChannel);
CALL_BACK_CHANNELS.put(taskInstanceId, callbackChannel);
return callbackChannel;
return null;
}
/**
@ -66,7 +86,7 @@ public class TaskCallbackService {
* @param taskInstanceId taskInstanceId
*/
public void remove(int taskInstanceId){
CALL_BACK_CHANNELS.remove(taskInstanceId);
REMOTE_CHANNELS.remove(taskInstanceId);
}
/**
@ -75,8 +95,12 @@ public class TaskCallbackService {
* @param ackCommand ackCommand
*/
public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command());
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if(nettyRemoteChannel == null){
//TODO
} else{
nettyRemoteChannel.writeAndFlush(ackCommand.convert2Command());
}
}
/**
@ -86,8 +110,11 @@ public class TaskCallbackService {
* @param responseCommand responseCommand
*/
public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if(nettyRemoteChannel == null){
//TODO
} else{
nettyRemoteChannel.writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@ -98,10 +125,5 @@ public class TaskCallbackService {
}
});
}
// TODO
private Channel createChannel(){
return null;
}
}

71
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java → dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -17,32 +17,41 @@
package org.apache.dolphinscheduler.server.worker.processor;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskScheduleThread;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.concurrent.ExecutorService;
/**
* worker request processor
*/
public class WorkerRequestProcessor implements NettyRequestProcessor {
public class TaskExecuteProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(WorkerRequestProcessor.class);
private final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
/**
* process service
@ -64,7 +73,7 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
*/
private final TaskCallbackService taskCallbackService;
public WorkerRequestProcessor(ProcessService processService){
public TaskExecuteProcessor(ProcessService processService){
this.processService = processService;
this.taskCallbackService = new TaskCallbackService();
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
@ -92,14 +101,62 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
} catch (Exception ex){
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
}
taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskInstanceId(),
new CallbackChannel(channel, command.getOpaque()));
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
this.doAck(taskExecutionContext);
// submit task
workerExecService.submit(new TaskScheduleThread(taskExecutionContext,
workerExecService.submit(new TaskExecuteThread(taskExecutionContext,
processService, taskCallbackService));
}
private void doAck(TaskExecutionContext taskExecutionContext){
// tell master that task is in executing
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext);
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
}
/**
* get task log path
* @return log path
*/
private String getTaskLogPath(TaskExecutionContext taskExecutionContext) {
String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory())
.getLogger("ROOT")
.getAppender("TASKLOGFILE"))
.getDiscriminator()).getLogBase();
if (baseLog.startsWith(Constants.SINGLE_SLASH)){
return baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskExecutionContext.getTaskInstanceId() + ".log";
}
return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskExecutionContext.getTaskInstanceId() + ".log";
}
/**
* build ack command
* @param taskExecutionContext taskExecutionContext
* @return ExecuteTaskAckCommand
*/
private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
ackCommand.setLogPath(getTaskLogPath(taskExecutionContext));
ackCommand.setHost(OSUtils.getHost());
ackCommand.setStartTime(new Date());
if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
ackCommand.setExecutePath(null);
}else{
ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
}
return ackCommand;
}
/**
* get execute local path

103
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* task kill processor
*/
public class TaskKillProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class);
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.KILL_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
logger.info("received command : {}", command);
KillTaskRequestCommand killCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskRequestCommand.class);
doKill(killCommand);
}
private void doKill(KillTaskRequestCommand killCommand){
try {
if(killCommand.getProcessId() == 0 ){
logger.error("process kill failed, process id :{}, task id:{}", killCommand.getProcessId(), killCommand.getTaskInstanceId());
return;
}
String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(killCommand.getProcessId()));
logger.info("process id:{}, cmd:{}", killCommand.getProcessId(), cmd);
OSUtils.exeCmd(cmd);
// find log and kill yarn job
killYarnJob(killCommand.getHost(), killCommand.getLogPath(), killCommand.getExecutePath(), killCommand.getTenantCode());
} catch (Exception e) {
logger.error("kill task failed", e);
}
}
public void killYarnJob(String host, String logPath, String executePath, String tenantCode) {
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
LogClientService logClient = null;
String log = null;
try {
logClient = new LogClientService();
log = logClient.viewLog(host, Constants.RPC_PORT, logPath);
} finally {
if(logClient != null){
logClient.close();
}
}
if (StringUtils.isNotEmpty(log)) {
List<String> appIds = LoggerUtils.getAppIds(log, logger);
if (StringUtils.isEmpty(executePath)) {
logger.error("task instance work dir is empty");
throw new RuntimeException("task instance work dir is empty");
}
if (appIds.size() > 0) {
ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath);
}
}
} catch (Exception e) {
logger.error("kill yarn job failure",e);
}
}
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java

@ -235,7 +235,7 @@ public class FetchTaskThread implements Runnable{
logger.info("task : {} ready to submit to task scheduler thread",taskInstId);
// submit task
// workerExecService.submit(new TaskScheduleThread(taskInstance, processService));
// workerExecService.submit(new TaskExecuteThread(taskInstance, processService));
// remove node from zk
removeNodeFromTaskQueue(taskQueueStr);

71
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java → dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -17,21 +17,15 @@
package org.apache.dolphinscheduler.server.worker.runner;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
@ -51,12 +45,12 @@ import java.util.stream.Collectors;
/**
* task scheduler thread
*/
public class TaskScheduleThread implements Runnable {
public class TaskExecuteThread implements Runnable {
/**
* logger
*/
private final Logger logger = LoggerFactory.getLogger(TaskScheduleThread.class);
private final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
/**
* task instance
@ -74,32 +68,27 @@ public class TaskScheduleThread implements Runnable {
private AbstractTask task;
/**
* task instance callback service
* task callback service
*/
private TaskCallbackService taskInstanceCallbackService;
private TaskCallbackService taskCallbackService;
/**
* constructor
* @param taskExecutionContext taskExecutionContext
* @param processService processService
* @param taskInstanceCallbackService taskInstanceCallbackService
* @param taskCallbackService taskCallbackService
*/
public TaskScheduleThread(TaskExecutionContext taskExecutionContext, ProcessService processService, TaskCallbackService taskInstanceCallbackService){
public TaskExecuteThread(TaskExecutionContext taskExecutionContext, ProcessService processService, TaskCallbackService taskCallbackService){
this.processService = processService;
this.taskExecutionContext = taskExecutionContext;
this.taskInstanceCallbackService = taskInstanceCallbackService;
this.taskCallbackService = taskCallbackService;
}
@Override
public void run() {
ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskInstanceId());
try {
// tell master that task is in executing
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext);
taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
logger.info("script path : {}", taskExecutionContext.getExecutePath());
// task node
TaskNode taskNode = JSONObject.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
@ -163,7 +152,7 @@ public class TaskScheduleThread implements Runnable {
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date());
} finally {
taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand);
}
}
@ -183,48 +172,6 @@ public class TaskScheduleThread implements Runnable {
return globalParamsMap;
}
/**
* build ack command
* @param taskExecutionContext taskExecutionContext
* @return ExecuteTaskAckCommand
*/
private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
ackCommand.setLogPath(getTaskLogPath());
ackCommand.setHost(OSUtils.getHost());
ackCommand.setStartTime(new Date());
if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
ackCommand.setExecutePath(null);
}else{
ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
}
return ackCommand;
}
/**
* get task log path
* @return log path
*/
private String getTaskLogPath() {
String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory())
.getLogger("ROOT")
.getAppender("TASKLOGFILE"))
.getDiscriminator()).getLogBase();
if (baseLog.startsWith(Constants.SINGLE_SLASH)){
return baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskExecutionContext.getTaskInstanceId() + ".log";
}
return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskExecutionContext.getTaskInstanceId() + ".log";
}
/**
* set task timeout
* @param taskProps
@ -259,8 +206,6 @@ public class TaskScheduleThread implements Runnable {
}
/**
* kill task
*/
Loading…
Cancel
Save