Browse Source

Refactor worker (#2279)

* let quartz use the same datasource

* move master/worker config from dao.properties to each config
add master/worker registry test

* move mybatis config from application.properties to SpringConnectionFactory

* move mybatis-plus config from application.properties to SpringConnectionFactory

* refactor TaskCallbackService

* add ZookeeperNodeManagerTest

* add NettyExecutorManagerTest

* refactor TaskKillProcessor
pull/2/head
Tboy 4 years ago committed by GitHub
parent
commit
92fd6479a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
  2. 7
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
  3. 16
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
  4. 116
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * kill task request command */ public class TaskKillRequestCommand implements Serializable { /** * task execution context */ private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } public void setTaskExecutionContext(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } execution context } execution context execution context this.taskExecutionContext = taskExecutionContext; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_KILL_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskKillRequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; } }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * kill task request command */ public class TaskKillRequestCommand implements Serializable { /** * task execution context private String taskExecutionContext; execution context execution context public String getTaskExecutionContext() { execution context return taskExecutionContext; execution context } execution context public void setTaskExecutionContext(String taskExecutionContext) { execution context this.taskExecutionContext = taskExecutionContext; execution context return taskExecutionContext; */ */ execution context ; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_KILL_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskKillRequestCommand{" + "taskInstanceId=" + taskInstanceId + '}'; } }

7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.entity;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
@ -421,12 +420,6 @@ public class TaskExecutionContext implements Serializable{
return requestCommand.convert2Command();
}
public Command toKillCommand(){
TaskKillRequestCommand requestCommand = new TaskKillRequestCommand();
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(this));
return requestCommand.convert2Command();
}
@Override
public String toString() {
return "TaskExecutionContext{" +

16
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.server.master.runner;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
@ -25,9 +26,8 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@ -172,8 +172,6 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
/**
* TODO Kill TASK
*
* task instance add queue , waiting worker to kill
*/
private void cancelTaskInstance() throws Exception{
@ -182,14 +180,10 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
}
alreadyKilled = true;
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(taskInstance.getId());
taskExecutionContext.setHost(taskInstance.getHost());
taskExecutionContext.setLogPath(taskInstance.getLogPath());
taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
taskExecutionContext.setProcessId(taskInstance.getPid());
TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
killCommand.setTaskInstanceId(taskInstance.getId());
ExecutionContext executionContext = new ExecutionContext(taskExecutionContext.toKillCommand(), ExecutorType.WORKER, taskExecutionContext.getWorkerGroup());
ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER);
Host host = Host.of(taskInstance.getHost());
executionContext.setHost(host);

116
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.processor;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@ -32,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
@ -42,6 +42,7 @@ import org.apache.dolphinscheduler.service.log.LogClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
/**
@ -66,11 +67,6 @@ public class TaskKillProcessor implements NettyRequestProcessor {
*/
private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
/**
* appIds
*/
private List<String> appIds;
public TaskKillProcessor(){
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
@ -79,86 +75,80 @@ public class TaskKillProcessor implements NettyRequestProcessor {
}
/**
* kill task logic
* task kill process
*
* @param context context
* @return execute result
* @param channel channel channel
* @param command command command
*/
private Boolean doKill(TaskExecutionContext context){
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskKillRequestCommand killCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class);
logger.info("received kill command : {}", killCommand);
Pair<Boolean, List<String>> result = doKill(killCommand);
taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand,result);
taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
}
/**
* do kill
* @param killCommand
* @return kill result
*/
private Pair<Boolean, List<String>> doKill(TaskKillRequestCommand killCommand){
List<String> appIds = Collections.EMPTY_LIST;
try {
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(context.getTaskInstanceId());
context.setProcessId(taskExecutionContext.getProcessId());
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
Integer processId = taskExecutionContext.getProcessId();
if (processId == null || processId.equals(0)){
logger.error("process kill failed, process id :{}, task id:{}", processId, context.getTaskInstanceId());
return false;
logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId());
return Pair.of(false, appIds);
}
String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(taskExecutionContext.getProcessId()));
String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(context.getProcessId()));
logger.info("process id:{}, cmd:{}", context.getProcessId(), cmd);
logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd);
OSUtils.exeCmd(cmd);
// find log and kill yarn job
killYarnJob(Host.of(context.getHost()).getIp(),
context.getLogPath(),
context.getExecutePath(),
context.getTenantCode());
appIds = killYarnJob(Host.of(taskExecutionContext.getHost()).getIp(),
taskExecutionContext.getLogPath(),
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTenantCode());
return true;
return Pair.of(true, appIds);
} catch (Exception e) {
logger.error("kill task error", e);
return false;
}
}
/**
* task kill process
*
* @param channel channel channel
* @param command command command
*/
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskKillRequestCommand taskKillRequestCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class);
logger.info("received command : {}", taskKillRequestCommand);
String contextJson = taskKillRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
Boolean killStatus = doKill(taskExecutionContext);
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus);
taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
return Pair.of(false, appIds);
}
/**
* build TaskKillResponseCommand
*
* @param taskExecutionContext taskExecutionContext
* @param killStatus killStatus
* @param killCommand kill command
* @param result exe result
* @return build TaskKillResponseCommand
*/
private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext,
Boolean killStatus) {
private TaskKillResponseCommand buildKillTaskResponseCommand(TaskKillRequestCommand killCommand,
Pair<Boolean, List<String>> result) {
TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskKillResponseCommand.setHost(taskExecutionContext.getHost());
taskKillResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
taskKillResponseCommand.setAppIds(appIds);
taskKillResponseCommand.setStatus(result.getLeft() ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
taskKillResponseCommand.setAppIds(result.getRight());
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
if(taskExecutionContext != null){
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskKillResponseCommand.setHost(taskExecutionContext.getHost());
taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
}
return taskKillResponseCommand;
}
@ -169,8 +159,9 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* @param logPath logPath
* @param executePath executePath
* @param tenantCode tenantCode
* @return List<String> appIds
*/
private void killYarnJob(String host, String logPath, String executePath, String tenantCode) {
private List<String> killYarnJob(String host, String logPath, String executePath, String tenantCode) {
LogClientService logClient = null;
try {
logClient = new LogClientService();
@ -185,9 +176,9 @@ public class TaskKillProcessor implements NettyRequestProcessor {
}
if (appIds.size() > 0) {
ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath);
return appIds;
}
}
} catch (Exception e) {
logger.error("kill yarn job error",e);
} finally {
@ -195,6 +186,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
logClient.close();
}
}
return Collections.EMPTY_LIST;
}
}

Loading…
Cancel
Save