Browse Source

[FIX-3617][Service]fix 2 tasks instance are generated when fault tolerance (#3873)

* fix 2 tasks instance are generated when fault tolerance

* update code style

* update code style

* update code style

* update code style

* update code style

* update code style

* update code style

* update code style

* update code style

* update code style

* update code style

* update ut

* update code style

Co-authored-by: baoliang <baoliang@analysys.com.cn>
pull/3/MERGE
bao liang 4 years ago committed by GitHub
parent
commit
39411ce03b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 604
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  2. 167
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  3. 116
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  4. 1
      pom.xml

604
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -20,6 +20,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.enums.ZKNodeType;
@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient; import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -45,309 +47,309 @@ import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/** /**
* zookeeper master client * zookeeper master client
* * <p>
* single instance * single instance
*/ */
@Component @Component
public class ZKMasterClient extends AbstractZKClient { public class ZKMasterClient extends AbstractZKClient {
/** /**
* logger * logger
*/ */
private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class); private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class);
/** /**
* process service * process service
*/ */
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
public void start() { public void start() {
InterProcessMutex mutex = null; InterProcessMutex mutex = null;
try { try {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
String znodeLock = getMasterStartUpLockPath(); String znodeLock = getMasterStartUpLockPath();
mutex = new InterProcessMutex(getZkClient(), znodeLock); mutex = new InterProcessMutex(getZkClient(), znodeLock);
mutex.acquire(); mutex.acquire();
// init system znode // init system znode
this.initSystemZNode(); this.initSystemZNode();
while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)){ while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)) {
ThreadUtils.sleep(SLEEP_TIME_MILLIS); ThreadUtils.sleep(SLEEP_TIME_MILLIS);
} }
// self tolerant
// self tolerant if (getActiveMasterNum() == 1) {
if (getActiveMasterNum() == 1) { failoverWorker(null, true);
failoverWorker(null, true); failoverMaster(null);
failoverMaster(null); }
}
} catch (Exception e) {
}catch (Exception e){ logger.error("master start up exception", e);
logger.error("master start up exception",e); } finally {
}finally { releaseMutex(mutex);
releaseMutex(mutex); }
} }
}
@Override
@Override public void close() {
public void close(){ super.close();
super.close(); }
}
/**
/** * handle path events that this class cares about
* handle path events that this class cares about *
* @param client zkClient * @param client zkClient
* @param event path event * @param event path event
* @param path zk path * @param path zk path
*/ */
@Override @Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
//monitor master //monitor master
if(path.startsWith(getZNodeParentPath(ZKNodeType.MASTER)+Constants.SINGLE_SLASH)){ if (path.startsWith(getZNodeParentPath(ZKNodeType.MASTER) + Constants.SINGLE_SLASH)) {
handleMasterEvent(event,path); handleMasterEvent(event, path);
}else if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){ } else if (path.startsWith(getZNodeParentPath(ZKNodeType.WORKER) + Constants.SINGLE_SLASH)) {
//monitor worker //monitor worker
handleWorkerEvent(event,path); handleWorkerEvent(event, path);
} }
} }
/** /**
* remove zookeeper node path * remove zookeeper node path
* *
* @param path zookeeper node path * @param path zookeeper node path
* @param zkNodeType zookeeper node type * @param zkNodeType zookeeper node type
* @param failover is failover * @param failover is failover
*/ */
private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) { private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {
logger.info("{} node deleted : {}", zkNodeType.toString(), path); logger.info("{} node deleted : {}", zkNodeType.toString(), path);
InterProcessMutex mutex = null; InterProcessMutex mutex = null;
try { try {
String failoverPath = getFailoverLockPath(zkNodeType); String failoverPath = getFailoverLockPath(zkNodeType);
// create a distributed lock // create a distributed lock
mutex = new InterProcessMutex(getZkClient(), failoverPath); mutex = new InterProcessMutex(getZkClient(), failoverPath);
mutex.acquire(); mutex.acquire();
String serverHost = getHostByEventDataPath(path); String serverHost = getHostByEventDataPath(path);
// handle dead server // handle dead server
handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
//failover server //failover server
if(failover){ if (failover) {
failoverServerWhenDown(serverHost, zkNodeType); failoverServerWhenDown(serverHost, zkNodeType);
} }
}catch (Exception e){ } catch (Exception e) {
logger.error("{} server failover failed.", zkNodeType.toString()); logger.error("{} server failover failed.", zkNodeType.toString());
logger.error("failover exception ",e); logger.error("failover exception ", e);
} } finally {
finally { releaseMutex(mutex);
releaseMutex(mutex); }
} }
}
/**
/** * failover server when server down
* failover server when server down *
* * @param serverHost server host
* @param serverHost server host * @param zkNodeType zookeeper node type
* @param zkNodeType zookeeper node type * @throws Exception exception
* @throws Exception exception */
*/ private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { if (StringUtils.isEmpty(serverHost)) {
if(StringUtils.isEmpty(serverHost) || serverHost.startsWith(NetUtils.getHost())){ return;
return ; }
} switch (zkNodeType) {
switch (zkNodeType){ case MASTER:
case MASTER: failoverMaster(serverHost);
failoverMaster(serverHost); break;
break; case WORKER:
case WORKER: failoverWorker(serverHost, true);
failoverWorker(serverHost, true); break;
default: default:
break; break;
} }
} }
/** /**
* get failover lock path * get failover lock path
* *
* @param zkNodeType zookeeper node type * @param zkNodeType zookeeper node type
* @return fail over lock path * @return fail over lock path
*/ */
private String getFailoverLockPath(ZKNodeType zkNodeType){ private String getFailoverLockPath(ZKNodeType zkNodeType) {
switch (zkNodeType){ switch (zkNodeType) {
case MASTER: case MASTER:
return getMasterFailoverLockPath(); return getMasterFailoverLockPath();
case WORKER: case WORKER:
return getWorkerFailoverLockPath(); return getWorkerFailoverLockPath();
default: default:
return ""; return "";
} }
} }
/** /**
* monitor master * monitor master
* @param event event *
* @param path path * @param event event
*/ * @param path path
public void handleMasterEvent(TreeCacheEvent event, String path){ */
switch (event.getType()) { public void handleMasterEvent(TreeCacheEvent event, String path) {
case NODE_ADDED: switch (event.getType()) {
logger.info("master node added : {}", path); case NODE_ADDED:
break; logger.info("master node added : {}", path);
case NODE_REMOVED: break;
removeZKNodePath(path, ZKNodeType.MASTER, true); case NODE_REMOVED:
break; removeZKNodePath(path, ZKNodeType.MASTER, true);
default: break;
break; default:
} break;
} }
}
/**
* monitor worker /**
* @param event event * monitor worker
* @param path path *
*/ * @param event event
public void handleWorkerEvent(TreeCacheEvent event, String path){ * @param path path
switch (event.getType()) { */
case NODE_ADDED: public void handleWorkerEvent(TreeCacheEvent event, String path) {
logger.info("worker node added : {}", path); switch (event.getType()) {
break; case NODE_ADDED:
case NODE_REMOVED: logger.info("worker node added : {}", path);
logger.info("worker node deleted : {}", path); break;
removeZKNodePath(path, ZKNodeType.WORKER, true); case NODE_REMOVED:
break; logger.info("worker node deleted : {}", path);
default: removeZKNodePath(path, ZKNodeType.WORKER, true);
break; break;
} default:
} break;
}
/** }
* task needs failover if task start before worker starts
* /**
* @param taskInstance task instance * task needs failover if task start before worker starts
* @return true if task instance need fail over *
*/ * @param taskInstance task instance
private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception { * @return true if task instance need fail over
*/
boolean taskNeedFailover = true; private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception {
//now no host will execute this task instance,so no need to failover the task boolean taskNeedFailover = true;
if(taskInstance.getHost() == null){
return false; //now no host will execute this task instance,so no need to failover the task
} if (taskInstance.getHost() == null) {
return false;
// if the worker node exists in zookeeper, we must check the task starts after the worker }
if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){
//if task start after worker starts, there is no need to failover the task. // if the worker node exists in zookeeper, we must check the task starts after the worker
if(checkTaskAfterWorkerStart(taskInstance)){ if (checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)) {
taskNeedFailover = false; //if task start after worker starts, there is no need to failover the task.
} if (checkTaskAfterWorkerStart(taskInstance)) {
} taskNeedFailover = false;
return taskNeedFailover; }
} }
return taskNeedFailover;
/** }
* check task start after the worker server starts.
* /**
* @param taskInstance task instance * check task start after the worker server starts.
* @return true if task instance start time after worker server start date *
*/ * @param taskInstance task instance
private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) { * @return true if task instance start time after worker server start date
if(StringUtils.isEmpty(taskInstance.getHost())){ */
return false; private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
} if (StringUtils.isEmpty(taskInstance.getHost())) {
Date workerServerStartDate = null; return false;
List<Server> workerServers = getServersList(ZKNodeType.WORKER); }
for(Server workerServer : workerServers){ Date workerServerStartDate = null;
if(taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())){ List<Server> workerServers = getServersList(ZKNodeType.WORKER);
workerServerStartDate = workerServer.getCreateTime(); for (Server workerServer : workerServers) {
break; if (taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())) {
} workerServerStartDate = workerServer.getCreateTime();
} break;
}
if(workerServerStartDate != null){ }
return taskInstance.getStartTime().after(workerServerStartDate); if (workerServerStartDate != null) {
}else{ return taskInstance.getStartTime().after(workerServerStartDate);
return false; }
} return false;
} }
/** /**
* failover worker tasks * failover worker tasks
* *
* 1. kill yarn job if there are yarn jobs in tasks. * 1. kill yarn job if there are yarn jobs in tasks.
* 2. change task state from running to need failover. * 2. change task state from running to need failover.
* 3. failover all tasks when workerHost is null * 3. failover all tasks when workerHost is null
* @param workerHost worker host * @param workerHost worker host
*/ */
/** /**
* failover worker tasks * failover worker tasks
* * <p>
* 1. kill yarn job if there are yarn jobs in tasks. * 1. kill yarn job if there are yarn jobs in tasks.
* 2. change task state from running to need failover. * 2. change task state from running to need failover.
* 3. failover all tasks when workerHost is null * 3. failover all tasks when workerHost is null
* @param workerHost worker host *
* @param needCheckWorkerAlive need check worker alive * @param workerHost worker host
* @throws Exception exception * @param needCheckWorkerAlive need check worker alive
*/ * @throws Exception exception
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { */
logger.info("start worker[{}] failover ...", workerHost); private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
logger.info("start worker[{}] failover ...", workerHost);
List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
for(TaskInstance taskInstance : needFailoverTaskInstanceList){ List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
if(needCheckWorkerAlive){ for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
if(!checkTaskInstanceNeedFailover(taskInstance)){ if (needCheckWorkerAlive) {
continue; if (!checkTaskInstanceNeedFailover(taskInstance)) {
} continue;
} }
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if(processInstance != null){ ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
taskInstance.setProcessInstance(processInstance); if (processInstance != null) {
} taskInstance.setProcessInstance(processInstance);
}
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance) TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildProcessInstanceRelatedInfo(processInstance) .buildTaskInstanceRelatedInfo(taskInstance)
.create(); .buildProcessInstanceRelatedInfo(processInstance)
// only kill yarn job if exists , the local thread has exited .create();
ProcessUtils.killYarnJob(taskExecutionContext); // only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processService.saveTaskInstance(taskInstance); taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
} processService.saveTaskInstance(taskInstance);
logger.info("end worker[{}] failover ...", workerHost); }
} logger.info("end worker[{}] failover ...", workerHost);
}
/**
* failover master tasks /**
* * failover master tasks
* @param masterHost master host *
*/ * @param masterHost master host
private void failoverMaster(String masterHost) { */
logger.info("start master failover ..."); private void failoverMaster(String masterHost) {
logger.info("start master failover ...");
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
//updateProcessInstance host is null and insert into command
for(ProcessInstance processInstance : needFailoverProcessInstanceList){ //updateProcessInstance host is null and insert into command
if(Constants.NULL.equals(processInstance.getHost()) ){ for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
continue; if (Constants.NULL.equals(processInstance.getHost())) {
} continue;
processService.processNeedFailoverProcessInstances(processInstance); }
} processService.processNeedFailoverProcessInstances(processInstance);
}
logger.info("master failover end");
} logger.info("master failover end");
}
public InterProcessMutex blockAcquireMutex() throws Exception {
InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath()); public InterProcessMutex blockAcquireMutex() throws Exception {
mutex.acquire(); InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath());
return mutex; mutex.acquire();
} return mutex;
}
} }

167
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -896,7 +896,7 @@ public class ProcessService {
return task; return task;
} }
if(!task.getState().typeIsFinished()){ if(!task.getState().typeIsFinished()){
createSubWorkProcessCommand(processInstance, task); createSubWorkProcess(processInstance, task);
} }
logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ", logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ",
@ -906,20 +906,22 @@ public class ProcessService {
/** /**
* set work process instance map * set work process instance map
* consider o
* repeat running does not generate new sub process instance
* set map {parent instance id, task instance id, 0(child instance id)}
* @param parentInstance parentInstance * @param parentInstance parentInstance
* @param parentTask parentTask * @param parentTask parentTask
* @return process instance map * @return process instance map
*/ */
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask){ private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask){
ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId()); ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId());
if(processMap != null){ if (processMap != null) {
return processMap; return processMap;
}else if(parentInstance.getCommandType() == CommandType.REPEAT_RUNNING }
|| parentInstance.isComplementData()){ if (parentInstance.getCommandType() == CommandType.REPEAT_RUNNING) {
// update current task id to map // update current task id to map
// repeat running does not generate new sub process instance
processMap = findPreviousTaskProcessMap(parentInstance, parentTask); processMap = findPreviousTaskProcessMap(parentInstance, parentTask);
if(processMap!= null){ if (processMap != null) {
processMap.setParentTaskInstanceId(parentTask.getId()); processMap.setParentTaskInstanceId(parentTask.getId());
updateWorkProcessInstanceMap(processMap); updateWorkProcessInstanceMap(processMap);
return processMap; return processMap;
@ -944,11 +946,11 @@ public class ProcessService {
Integer preTaskId = 0; Integer preTaskId = 0;
List<TaskInstance> preTaskList = this.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId()); List<TaskInstance> preTaskList = this.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId());
for(TaskInstance task : preTaskList){ for (TaskInstance task : preTaskList) {
if(task.getName().equals(parentTask.getName())){ if (task.getName().equals(parentTask.getName())) {
preTaskId = task.getId(); preTaskId = task.getId();
ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId); ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId);
if(map!=null){ if (map != null) {
return map; return map;
} }
} }
@ -960,66 +962,111 @@ public class ProcessService {
/** /**
* create sub work process command * create sub work process command
*
* @param parentProcessInstance parentProcessInstance * @param parentProcessInstance parentProcessInstance
* @param task task * @param task task
*/ */
private void createSubWorkProcessCommand(ProcessInstance parentProcessInstance, public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) {
TaskInstance task){ if (!task.isSubProcess()) {
if(!task.isSubProcess()){
return; return;
} }
ProcessInstanceMap instanceMap = setProcessInstanceMap(parentProcessInstance, task); //check create sub work flow firstly
TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); ProcessInstanceMap instanceMap = findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId());
Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams()); if (null != instanceMap && CommandType.RECOVER_TOLERANCE_FAULT_PROCESS == parentProcessInstance.getCommandType()) {
Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); // recover failover tolerance would not create a new command when the sub command already have been created
return;
ProcessInstance childInstance = findSubProcessInstance(parentProcessInstance.getId(), task.getId());
CommandType fatherType = parentProcessInstance.getCommandType();
CommandType commandType = fatherType;
if(childInstance == null){
String fatherHistoryCommand = parentProcessInstance.getHistoryCmd();
// sub process must begin with schedule/complement data
// if father begin with scheduler/complement data
if(fatherHistoryCommand.startsWith(CommandType.SCHEDULER.toString()) ||
fatherHistoryCommand.startsWith(CommandType.COMPLEMENT_DATA.toString())){
commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
}
} }
instanceMap = setProcessInstanceMap(parentProcessInstance, task);
if(childInstance != null){ ProcessInstance childInstance = null;
childInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); if (instanceMap.getProcessInstanceId() != 0) {
updateProcessInstance(childInstance); childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId());
} }
Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task);
updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionId());
initSubInstanceState(childInstance);
createCommand(subProcessCommand);
logger.info("sub process command created: {} ", subProcessCommand);
}
/**
* complement data needs transform parent parameter to child.
* @param instanceMap
* @param parentProcessInstance
* @return
*/
private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance) {
// set sub work process command // set sub work process command
String processMapStr = JSONUtils.toJsonString(instanceMap); String processMapStr = JSONUtils.toJsonString(instanceMap);
Map<String, String> cmdParam = JSONUtils.toMap(processMapStr); Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);
if (parentProcessInstance.isComplementData()) {
if(commandType == CommandType.COMPLEMENT_DATA ||
(childInstance != null && childInstance.isComplementData())){
Map<String, String> parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam()); Map<String, String> parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam());
String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime);
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
processMapStr = JSONUtils.toJsonString(cmdParam); processMapStr = JSONUtils.toJsonString(cmdParam);
} }
return processMapStr;
}
updateSubProcessDefinitionByParent(parentProcessInstance, childDefineId); /**
* create sub work process command
* @param parentProcessInstance
* @param childInstance
* @param instanceMap
* @param task
*/
public Command createSubProcessCommand(ProcessInstance parentProcessInstance,
ProcessInstance childInstance,
ProcessInstanceMap instanceMap,
TaskInstance task) {
CommandType commandType = getSubCommandType(parentProcessInstance, childInstance);
TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance);
return new Command(
commandType,
TaskDependType.TASK_POST,
parentProcessInstance.getFailureStrategy(),
parentProcessInstance.getExecutorId(),
childDefineId,
processParam,
parentProcessInstance.getWarningType(),
parentProcessInstance.getWarningGroupId(),
parentProcessInstance.getScheduleTime(),
parentProcessInstance.getProcessInstancePriority()
);
}
/**
* initialize sub work flow state
* child instance state would be initialized when 'recovery from pause/stop/failure'
* @param childInstance
*/
private void initSubInstanceState(ProcessInstance childInstance) {
if (childInstance != null) {
childInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
updateProcessInstance(childInstance);
}
}
Command command = new Command(); /**
command.setWarningType(parentProcessInstance.getWarningType()); * get sub work flow command type
command.setWarningGroupId(parentProcessInstance.getWarningGroupId()); * child instance exist: child command = fatherCommand
command.setFailureStrategy(parentProcessInstance.getFailureStrategy()); * child instance not exists: child command = fatherCommand[0]
command.setProcessDefinitionId(childDefineId); *
command.setScheduleTime(parentProcessInstance.getScheduleTime()); * @param parentProcessInstance
command.setExecutorId(parentProcessInstance.getExecutorId()); * @return
command.setCommandParam(processMapStr); */
command.setCommandType(commandType); private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) {
command.setProcessInstancePriority(parentProcessInstance.getProcessInstancePriority()); CommandType commandType = parentProcessInstance.getCommandType();
command.setWorkerGroup(parentProcessInstance.getWorkerGroup()); if (childInstance == null) {
createCommand(command); String fatherHistoryCommand = parentProcessInstance.getHistoryCmd();
logger.info("sub process command created: {} ", command.toString()); commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
}
return commandType;
} }
/** /**
@ -1497,20 +1544,6 @@ public class ProcessService {
return result; return result;
} }
/**
* update pid and app links field by task instance id
* @param taskInstId taskInstId
* @param pid pid
* @param appLinks appLinks
*/
public void updatePidByTaskInstId(int taskInstId, int pid,String appLinks) {
TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId);
taskInstance.setPid(pid);
taskInstance.setAppLink(appLinks);
saveTaskInstance(taskInstance);
}
/** /**
* query schedule by id * query schedule by id
* @param id id * @param id id

116
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.process;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import com.fasterxml.jackson.databind.JsonNode;
/**
* process service test
*/
public class ProcessServiceTest {
@Test
public void testCreateSubCommand() {
ProcessService processService = new ProcessService();
ProcessInstance parentInstance = new ProcessInstance();
parentInstance.setProcessDefinitionId(1);
parentInstance.setWarningType(WarningType.SUCCESS);
parentInstance.setWarningGroupId(0);
TaskInstance task = new TaskInstance();
task.setTaskJson("{\"params\":{\"processDefinitionId\":100}}");
task.setId(10);
ProcessInstance childInstance = null;
ProcessInstanceMap instanceMap = new ProcessInstanceMap();
instanceMap.setParentProcessInstanceId(1);
instanceMap.setParentTaskInstanceId(10);
Command command = null;
//father history: start; child null == command type: start
parentInstance.setHistoryCmd("START_PROCESS");
parentInstance.setCommandType(CommandType.START_PROCESS);
command = processService.createSubProcessCommand(
parentInstance, childInstance, instanceMap, task
);
Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType());
//father history: start,start failure; child null == command type: start
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
command = processService.createSubProcessCommand(
parentInstance, childInstance, instanceMap, task
);
Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType());
//father history: scheduler,start failure; child null == command type: scheduler
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS");
command = processService.createSubProcessCommand(
parentInstance, childInstance, instanceMap, task
);
Assert.assertEquals(CommandType.SCHEDULER, command.getCommandType());
//father history: complement,start failure; child null == command type: complement
String startString = "2020-01-01 00:00:00";
String endString = "2020-01-10 00:00:00";
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS");
Map<String,String> complementMap = new HashMap<>();
complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE, startString);
complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE, endString);
parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap));
command = processService.createSubProcessCommand(
parentInstance, childInstance, instanceMap, task
);
Assert.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType());
JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam());
Date start = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE).asText());
Date end = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE).asText());
Assert.assertEquals(startString, DateUtils.dateToString(start));
Assert.assertEquals(endString, DateUtils.dateToString(end));
//father history: start,failure,start failure; child not null == command type: start failure
childInstance = new ProcessInstance();
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
command = processService.createSubProcessCommand(
parentInstance, childInstance, instanceMap, task
);
Assert.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType());
}
}

1
pom.xml

@ -853,6 +853,7 @@
<include>**/server/worker/EnvFileTest.java</include> <include>**/server/worker/EnvFileTest.java</include>
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include> <include>**/server/worker/runner/TaskExecuteThreadTest.java</include>
<include>**/service/quartz/cron/CronUtilsTest.java</include> <include>**/service/quartz/cron/CronUtilsTest.java</include>
<include>**/service/process/ProcessServiceTest.java</include>
<include>**/service/zk/DefaultEnsembleProviderTest.java</include> <include>**/service/zk/DefaultEnsembleProviderTest.java</include>
<include>**/service/zk/ZKServerTest.java</include> <include>**/service/zk/ZKServerTest.java</include>
<include>**/service/zk/CuratorZookeeperClientTest.java</include> <include>**/service/zk/CuratorZookeeperClientTest.java</include>

Loading…
Cancel
Save