diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index 6abb381583..49bfb5f9a8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -20,6 +20,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
@@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -45,309 +47,309 @@ import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/**
- * zookeeper master client
- *
- * single instance
+ * zookeeper master client
+ *
+ * single instance
*/
@Component
public class ZKMasterClient extends AbstractZKClient {
- /**
- * logger
- */
- private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class);
-
- /**
- * process service
- */
- @Autowired
- private ProcessService processService;
-
- public void start() {
-
- InterProcessMutex mutex = null;
- try {
- // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
- String znodeLock = getMasterStartUpLockPath();
- mutex = new InterProcessMutex(getZkClient(), znodeLock);
- mutex.acquire();
-
- // init system znode
- this.initSystemZNode();
-
- while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)){
- ThreadUtils.sleep(SLEEP_TIME_MILLIS);
- }
-
-
- // self tolerant
- if (getActiveMasterNum() == 1) {
- failoverWorker(null, true);
- failoverMaster(null);
- }
-
- }catch (Exception e){
- logger.error("master start up exception",e);
- }finally {
- releaseMutex(mutex);
- }
- }
-
- @Override
- public void close(){
- super.close();
- }
-
- /**
- * handle path events that this class cares about
- * @param client zkClient
- * @param event path event
- * @param path zk path
- */
- @Override
- protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
- //monitor master
- if(path.startsWith(getZNodeParentPath(ZKNodeType.MASTER)+Constants.SINGLE_SLASH)){
- handleMasterEvent(event,path);
- }else if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){
- //monitor worker
- handleWorkerEvent(event,path);
- }
- }
-
- /**
- * remove zookeeper node path
- *
- * @param path zookeeper node path
- * @param zkNodeType zookeeper node type
- * @param failover is failover
- */
- private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {
- logger.info("{} node deleted : {}", zkNodeType.toString(), path);
- InterProcessMutex mutex = null;
- try {
- String failoverPath = getFailoverLockPath(zkNodeType);
- // create a distributed lock
- mutex = new InterProcessMutex(getZkClient(), failoverPath);
- mutex.acquire();
-
- String serverHost = getHostByEventDataPath(path);
- // handle dead server
- handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
- //failover server
- if(failover){
- failoverServerWhenDown(serverHost, zkNodeType);
- }
- }catch (Exception e){
- logger.error("{} server failover failed.", zkNodeType.toString());
- logger.error("failover exception ",e);
- }
- finally {
- releaseMutex(mutex);
- }
- }
-
- /**
- * failover server when server down
- *
- * @param serverHost server host
- * @param zkNodeType zookeeper node type
- * @throws Exception exception
- */
- private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
- if(StringUtils.isEmpty(serverHost) || serverHost.startsWith(NetUtils.getHost())){
- return ;
- }
- switch (zkNodeType){
- case MASTER:
- failoverMaster(serverHost);
- break;
- case WORKER:
- failoverWorker(serverHost, true);
- default:
- break;
- }
- }
-
- /**
- * get failover lock path
- *
- * @param zkNodeType zookeeper node type
- * @return fail over lock path
- */
- private String getFailoverLockPath(ZKNodeType zkNodeType){
-
- switch (zkNodeType){
- case MASTER:
- return getMasterFailoverLockPath();
- case WORKER:
- return getWorkerFailoverLockPath();
- default:
- return "";
- }
- }
-
- /**
- * monitor master
- * @param event event
- * @param path path
- */
- public void handleMasterEvent(TreeCacheEvent event, String path){
- switch (event.getType()) {
- case NODE_ADDED:
- logger.info("master node added : {}", path);
- break;
- case NODE_REMOVED:
- removeZKNodePath(path, ZKNodeType.MASTER, true);
- break;
- default:
- break;
- }
- }
-
- /**
- * monitor worker
- * @param event event
- * @param path path
- */
- public void handleWorkerEvent(TreeCacheEvent event, String path){
- switch (event.getType()) {
- case NODE_ADDED:
- logger.info("worker node added : {}", path);
- break;
- case NODE_REMOVED:
- logger.info("worker node deleted : {}", path);
- removeZKNodePath(path, ZKNodeType.WORKER, true);
- break;
- default:
- break;
- }
- }
-
- /**
- * task needs failover if task start before worker starts
- *
- * @param taskInstance task instance
- * @return true if task instance need fail over
- */
- private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception {
-
- boolean taskNeedFailover = true;
-
- //now no host will execute this task instance,so no need to failover the task
- if(taskInstance.getHost() == null){
- return false;
- }
-
- // if the worker node exists in zookeeper, we must check the task starts after the worker
- if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){
- //if task start after worker starts, there is no need to failover the task.
- if(checkTaskAfterWorkerStart(taskInstance)){
- taskNeedFailover = false;
- }
- }
- return taskNeedFailover;
- }
-
- /**
- * check task start after the worker server starts.
- *
- * @param taskInstance task instance
- * @return true if task instance start time after worker server start date
- */
- private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
- if(StringUtils.isEmpty(taskInstance.getHost())){
- return false;
- }
- Date workerServerStartDate = null;
- List workerServers = getServersList(ZKNodeType.WORKER);
- for(Server workerServer : workerServers){
- if(taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())){
- workerServerStartDate = workerServer.getCreateTime();
- break;
- }
- }
-
- if(workerServerStartDate != null){
- return taskInstance.getStartTime().after(workerServerStartDate);
- }else{
- return false;
- }
- }
-
- /**
- * failover worker tasks
- *
- * 1. kill yarn job if there are yarn jobs in tasks.
- * 2. change task state from running to need failover.
- * 3. failover all tasks when workerHost is null
- * @param workerHost worker host
- */
-
- /**
- * failover worker tasks
- *
- * 1. kill yarn job if there are yarn jobs in tasks.
- * 2. change task state from running to need failover.
- * 3. failover all tasks when workerHost is null
- * @param workerHost worker host
- * @param needCheckWorkerAlive need check worker alive
- * @throws Exception exception
- */
- private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
- logger.info("start worker[{}] failover ...", workerHost);
-
- List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
- for(TaskInstance taskInstance : needFailoverTaskInstanceList){
- if(needCheckWorkerAlive){
- if(!checkTaskInstanceNeedFailover(taskInstance)){
- continue;
- }
- }
-
- ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
- if(processInstance != null){
- taskInstance.setProcessInstance(processInstance);
- }
-
- TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
- .buildTaskInstanceRelatedInfo(taskInstance)
- .buildProcessInstanceRelatedInfo(processInstance)
- .create();
- // only kill yarn job if exists , the local thread has exited
- ProcessUtils.killYarnJob(taskExecutionContext);
-
- taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
- processService.saveTaskInstance(taskInstance);
- }
- logger.info("end worker[{}] failover ...", workerHost);
- }
-
- /**
- * failover master tasks
- *
- * @param masterHost master host
- */
- private void failoverMaster(String masterHost) {
- logger.info("start master failover ...");
-
- List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
-
- //updateProcessInstance host is null and insert into command
- for(ProcessInstance processInstance : needFailoverProcessInstanceList){
- if(Constants.NULL.equals(processInstance.getHost()) ){
- continue;
- }
- processService.processNeedFailoverProcessInstances(processInstance);
- }
-
- logger.info("master failover end");
- }
-
- public InterProcessMutex blockAcquireMutex() throws Exception {
- InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath());
- mutex.acquire();
- return mutex;
- }
-
+ /**
+ * logger
+ */
+ private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class);
+
+ /**
+ * process service
+ */
+ @Autowired
+ private ProcessService processService;
+
+ public void start() {
+
+ InterProcessMutex mutex = null;
+ try {
+ // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
+ String znodeLock = getMasterStartUpLockPath();
+ mutex = new InterProcessMutex(getZkClient(), znodeLock);
+ mutex.acquire();
+
+ // init system znode
+ this.initSystemZNode();
+
+ while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)) {
+ ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+ }
+
+ // self tolerant
+ if (getActiveMasterNum() == 1) {
+ failoverWorker(null, true);
+ failoverMaster(null);
+ }
+
+ } catch (Exception e) {
+ logger.error("master start up exception", e);
+ } finally {
+ releaseMutex(mutex);
+ }
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ }
+
+ /**
+ * handle path events that this class cares about
+ *
+ * @param client zkClient
+ * @param event path event
+ * @param path zk path
+ */
+ @Override
+ protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
+ //monitor master
+ if (path.startsWith(getZNodeParentPath(ZKNodeType.MASTER) + Constants.SINGLE_SLASH)) {
+ handleMasterEvent(event, path);
+ } else if (path.startsWith(getZNodeParentPath(ZKNodeType.WORKER) + Constants.SINGLE_SLASH)) {
+ //monitor worker
+ handleWorkerEvent(event, path);
+ }
+ }
+
+ /**
+ * remove zookeeper node path
+ *
+ * @param path zookeeper node path
+ * @param zkNodeType zookeeper node type
+ * @param failover is failover
+ */
+ private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {
+ logger.info("{} node deleted : {}", zkNodeType.toString(), path);
+ InterProcessMutex mutex = null;
+ try {
+ String failoverPath = getFailoverLockPath(zkNodeType);
+ // create a distributed lock
+ mutex = new InterProcessMutex(getZkClient(), failoverPath);
+ mutex.acquire();
+
+ String serverHost = getHostByEventDataPath(path);
+ // handle dead server
+ handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
+ //failover server
+ if (failover) {
+ failoverServerWhenDown(serverHost, zkNodeType);
+ }
+ } catch (Exception e) {
+ logger.error("{} server failover failed.", zkNodeType.toString());
+ logger.error("failover exception ", e);
+ } finally {
+ releaseMutex(mutex);
+ }
+ }
+
+ /**
+ * failover server when server down
+ *
+ * @param serverHost server host
+ * @param zkNodeType zookeeper node type
+ * @throws Exception exception
+ */
+ private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
+ if (StringUtils.isEmpty(serverHost)) {
+ return;
+ }
+ switch (zkNodeType) {
+ case MASTER:
+ failoverMaster(serverHost);
+ break;
+ case WORKER:
+ failoverWorker(serverHost, true);
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * get failover lock path
+ *
+ * @param zkNodeType zookeeper node type
+ * @return fail over lock path
+ */
+ private String getFailoverLockPath(ZKNodeType zkNodeType) {
+
+ switch (zkNodeType) {
+ case MASTER:
+ return getMasterFailoverLockPath();
+ case WORKER:
+ return getWorkerFailoverLockPath();
+ default:
+ return "";
+ }
+ }
+
+ /**
+ * monitor master
+ *
+ * @param event event
+ * @param path path
+ */
+ public void handleMasterEvent(TreeCacheEvent event, String path) {
+ switch (event.getType()) {
+ case NODE_ADDED:
+ logger.info("master node added : {}", path);
+ break;
+ case NODE_REMOVED:
+ removeZKNodePath(path, ZKNodeType.MASTER, true);
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * monitor worker
+ *
+ * @param event event
+ * @param path path
+ */
+ public void handleWorkerEvent(TreeCacheEvent event, String path) {
+ switch (event.getType()) {
+ case NODE_ADDED:
+ logger.info("worker node added : {}", path);
+ break;
+ case NODE_REMOVED:
+ logger.info("worker node deleted : {}", path);
+ removeZKNodePath(path, ZKNodeType.WORKER, true);
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * task needs failover if task start before worker starts
+ *
+ * @param taskInstance task instance
+ * @return true if task instance need fail over
+ */
+ private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception {
+
+ boolean taskNeedFailover = true;
+
+ //now no host will execute this task instance,so no need to failover the task
+ if (taskInstance.getHost() == null) {
+ return false;
+ }
+
+ // if the worker node exists in zookeeper, we must check the task starts after the worker
+ if (checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)) {
+ //if task start after worker starts, there is no need to failover the task.
+ if (checkTaskAfterWorkerStart(taskInstance)) {
+ taskNeedFailover = false;
+ }
+ }
+ return taskNeedFailover;
+ }
+
+ /**
+ * check task start after the worker server starts.
+ *
+ * @param taskInstance task instance
+ * @return true if task instance start time after worker server start date
+ */
+ private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
+ if (StringUtils.isEmpty(taskInstance.getHost())) {
+ return false;
+ }
+ Date workerServerStartDate = null;
+ List workerServers = getServersList(ZKNodeType.WORKER);
+ for (Server workerServer : workerServers) {
+ if (taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())) {
+ workerServerStartDate = workerServer.getCreateTime();
+ break;
+ }
+ }
+ if (workerServerStartDate != null) {
+ return taskInstance.getStartTime().after(workerServerStartDate);
+ }
+ return false;
+ }
+
+ /**
+ * failover worker tasks
+ *
+ * 1. kill yarn job if there are yarn jobs in tasks.
+ * 2. change task state from running to need failover.
+ * 3. failover all tasks when workerHost is null
+ * @param workerHost worker host
+ */
+
+ /**
+ * failover worker tasks
+ *
+ * 1. kill yarn job if there are yarn jobs in tasks.
+ * 2. change task state from running to need failover.
+ * 3. failover all tasks when workerHost is null
+ *
+ * @param workerHost worker host
+ * @param needCheckWorkerAlive need check worker alive
+ * @throws Exception exception
+ */
+ private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
+ logger.info("start worker[{}] failover ...", workerHost);
+
+ List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
+ for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
+ if (needCheckWorkerAlive) {
+ if (!checkTaskInstanceNeedFailover(taskInstance)) {
+ continue;
+ }
+ }
+
+ ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
+ if (processInstance != null) {
+ taskInstance.setProcessInstance(processInstance);
+ }
+
+ TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
+ .buildTaskInstanceRelatedInfo(taskInstance)
+ .buildProcessInstanceRelatedInfo(processInstance)
+ .create();
+ // only kill yarn job if exists , the local thread has exited
+ ProcessUtils.killYarnJob(taskExecutionContext);
+
+ taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
+ processService.saveTaskInstance(taskInstance);
+ }
+ logger.info("end worker[{}] failover ...", workerHost);
+ }
+
+ /**
+ * failover master tasks
+ *
+ * @param masterHost master host
+ */
+ private void failoverMaster(String masterHost) {
+ logger.info("start master failover ...");
+
+ List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
+
+ //updateProcessInstance host is null and insert into command
+ for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
+ if (Constants.NULL.equals(processInstance.getHost())) {
+ continue;
+ }
+ processService.processNeedFailoverProcessInstances(processInstance);
+ }
+
+ logger.info("master failover end");
+ }
+
+ public InterProcessMutex blockAcquireMutex() throws Exception {
+ InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath());
+ mutex.acquire();
+ return mutex;
+ }
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 7344cf13e5..7fca37470d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -896,7 +896,7 @@ public class ProcessService {
return task;
}
if(!task.getState().typeIsFinished()){
- createSubWorkProcessCommand(processInstance, task);
+ createSubWorkProcess(processInstance, task);
}
logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ",
@@ -906,20 +906,22 @@ public class ProcessService {
/**
* set work process instance map
+ * consider o
+ * repeat running does not generate new sub process instance
+ * set map {parent instance id, task instance id, 0(child instance id)}
* @param parentInstance parentInstance
* @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask){
ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId());
- if(processMap != null){
+ if (processMap != null) {
return processMap;
- }else if(parentInstance.getCommandType() == CommandType.REPEAT_RUNNING
- || parentInstance.isComplementData()){
+ }
+ if (parentInstance.getCommandType() == CommandType.REPEAT_RUNNING) {
// update current task id to map
- // repeat running does not generate new sub process instance
processMap = findPreviousTaskProcessMap(parentInstance, parentTask);
- if(processMap!= null){
+ if (processMap != null) {
processMap.setParentTaskInstanceId(parentTask.getId());
updateWorkProcessInstanceMap(processMap);
return processMap;
@@ -944,11 +946,11 @@ public class ProcessService {
Integer preTaskId = 0;
List preTaskList = this.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId());
- for(TaskInstance task : preTaskList){
- if(task.getName().equals(parentTask.getName())){
+ for (TaskInstance task : preTaskList) {
+ if (task.getName().equals(parentTask.getName())) {
preTaskId = task.getId();
ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId);
- if(map!=null){
+ if (map != null) {
return map;
}
}
@@ -960,66 +962,111 @@ public class ProcessService {
/**
* create sub work process command
+ *
* @param parentProcessInstance parentProcessInstance
- * @param task task
+ * @param task task
*/
- private void createSubWorkProcessCommand(ProcessInstance parentProcessInstance,
- TaskInstance task){
- if(!task.isSubProcess()){
+ public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) {
+ if (!task.isSubProcess()) {
return;
}
- ProcessInstanceMap instanceMap = setProcessInstanceMap(parentProcessInstance, task);
- TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
- Map subProcessParam = JSONUtils.toMap(taskNode.getParams());
- Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
-
- ProcessInstance childInstance = findSubProcessInstance(parentProcessInstance.getId(), task.getId());
-
- CommandType fatherType = parentProcessInstance.getCommandType();
- CommandType commandType = fatherType;
- if(childInstance == null){
- String fatherHistoryCommand = parentProcessInstance.getHistoryCmd();
- // sub process must begin with schedule/complement data
- // if father begin with scheduler/complement data
- if(fatherHistoryCommand.startsWith(CommandType.SCHEDULER.toString()) ||
- fatherHistoryCommand.startsWith(CommandType.COMPLEMENT_DATA.toString())){
- commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
- }
+ //check create sub work flow firstly
+ ProcessInstanceMap instanceMap = findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId());
+ if (null != instanceMap && CommandType.RECOVER_TOLERANCE_FAULT_PROCESS == parentProcessInstance.getCommandType()) {
+ // recover failover tolerance would not create a new command when the sub command already have been created
+ return;
}
-
- if(childInstance != null){
- childInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
- updateProcessInstance(childInstance);
+ instanceMap = setProcessInstanceMap(parentProcessInstance, task);
+ ProcessInstance childInstance = null;
+ if (instanceMap.getProcessInstanceId() != 0) {
+ childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId());
}
+ Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task);
+ updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionId());
+ initSubInstanceState(childInstance);
+ createCommand(subProcessCommand);
+ logger.info("sub process command created: {} ", subProcessCommand);
+ }
+
+ /**
+ * complement data needs transform parent parameter to child.
+ * @param instanceMap
+ * @param parentProcessInstance
+ * @return
+ */
+ private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance) {
// set sub work process command
String processMapStr = JSONUtils.toJsonString(instanceMap);
Map cmdParam = JSONUtils.toMap(processMapStr);
-
- if(commandType == CommandType.COMPLEMENT_DATA ||
- (childInstance != null && childInstance.isComplementData())){
+ if (parentProcessInstance.isComplementData()) {
Map parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam());
- String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
- String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
+ String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
+ String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime);
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
processMapStr = JSONUtils.toJsonString(cmdParam);
}
+ return processMapStr;
+ }
- updateSubProcessDefinitionByParent(parentProcessInstance, childDefineId);
+ /**
+ * create sub work process command
+ * @param parentProcessInstance
+ * @param childInstance
+ * @param instanceMap
+ * @param task
+ */
+ public Command createSubProcessCommand(ProcessInstance parentProcessInstance,
+ ProcessInstance childInstance,
+ ProcessInstanceMap instanceMap,
+ TaskInstance task) {
+ CommandType commandType = getSubCommandType(parentProcessInstance, childInstance);
+ TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
+ Map subProcessParam = JSONUtils.toMap(taskNode.getParams());
+ Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
+ String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance);
+
+ return new Command(
+ commandType,
+ TaskDependType.TASK_POST,
+ parentProcessInstance.getFailureStrategy(),
+ parentProcessInstance.getExecutorId(),
+ childDefineId,
+ processParam,
+ parentProcessInstance.getWarningType(),
+ parentProcessInstance.getWarningGroupId(),
+ parentProcessInstance.getScheduleTime(),
+ parentProcessInstance.getProcessInstancePriority()
+ );
+ }
+
+ /**
+ * initialize sub work flow state
+ * child instance state would be initialized when 'recovery from pause/stop/failure'
+ * @param childInstance
+ */
+ private void initSubInstanceState(ProcessInstance childInstance) {
+ if (childInstance != null) {
+ childInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+ updateProcessInstance(childInstance);
+ }
+ }
- Command command = new Command();
- command.setWarningType(parentProcessInstance.getWarningType());
- command.setWarningGroupId(parentProcessInstance.getWarningGroupId());
- command.setFailureStrategy(parentProcessInstance.getFailureStrategy());
- command.setProcessDefinitionId(childDefineId);
- command.setScheduleTime(parentProcessInstance.getScheduleTime());
- command.setExecutorId(parentProcessInstance.getExecutorId());
- command.setCommandParam(processMapStr);
- command.setCommandType(commandType);
- command.setProcessInstancePriority(parentProcessInstance.getProcessInstancePriority());
- command.setWorkerGroup(parentProcessInstance.getWorkerGroup());
- createCommand(command);
- logger.info("sub process command created: {} ", command.toString());
+ /**
+ * get sub work flow command type
+ * child instance exist: child command = fatherCommand
+ * child instance not exists: child command = fatherCommand[0]
+ *
+ * @param parentProcessInstance
+ * @return
+ */
+ private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) {
+ CommandType commandType = parentProcessInstance.getCommandType();
+ if (childInstance == null) {
+ String fatherHistoryCommand = parentProcessInstance.getHistoryCmd();
+ commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
+ }
+ return commandType;
}
/**
@@ -1497,20 +1544,6 @@ public class ProcessService {
return result;
}
- /**
- * update pid and app links field by task instance id
- * @param taskInstId taskInstId
- * @param pid pid
- * @param appLinks appLinks
- */
- public void updatePidByTaskInstId(int taskInstId, int pid,String appLinks) {
-
- TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId);
- taskInstance.setPid(pid);
- taskInstance.setAppLink(appLinks);
- saveTaskInstance(taskInstance);
- }
-
/**
* query schedule by id
* @param id id
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
new file mode 100644
index 0000000000..74b52bb316
--- /dev/null
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.process;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * process service test
+ */
+public class ProcessServiceTest {
+
+ @Test
+ public void testCreateSubCommand() {
+ ProcessService processService = new ProcessService();
+ ProcessInstance parentInstance = new ProcessInstance();
+ parentInstance.setProcessDefinitionId(1);
+ parentInstance.setWarningType(WarningType.SUCCESS);
+ parentInstance.setWarningGroupId(0);
+
+ TaskInstance task = new TaskInstance();
+ task.setTaskJson("{\"params\":{\"processDefinitionId\":100}}");
+ task.setId(10);
+
+ ProcessInstance childInstance = null;
+ ProcessInstanceMap instanceMap = new ProcessInstanceMap();
+ instanceMap.setParentProcessInstanceId(1);
+ instanceMap.setParentTaskInstanceId(10);
+ Command command = null;
+
+ //father history: start; child null == command type: start
+ parentInstance.setHistoryCmd("START_PROCESS");
+ parentInstance.setCommandType(CommandType.START_PROCESS);
+ command = processService.createSubProcessCommand(
+ parentInstance, childInstance, instanceMap, task
+ );
+ Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType());
+
+ //father history: start,start failure; child null == command type: start
+ parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
+ parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
+ command = processService.createSubProcessCommand(
+ parentInstance, childInstance, instanceMap, task
+ );
+ Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType());
+
+ //father history: scheduler,start failure; child null == command type: scheduler
+ parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
+ parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS");
+ command = processService.createSubProcessCommand(
+ parentInstance, childInstance, instanceMap, task
+ );
+ Assert.assertEquals(CommandType.SCHEDULER, command.getCommandType());
+
+ //father history: complement,start failure; child null == command type: complement
+
+ String startString = "2020-01-01 00:00:00";
+ String endString = "2020-01-10 00:00:00";
+ parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
+ parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS");
+ Map complementMap = new HashMap<>();
+ complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE, startString);
+ complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE, endString);
+ parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap));
+ command = processService.createSubProcessCommand(
+ parentInstance, childInstance, instanceMap, task
+ );
+ Assert.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType());
+
+ JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam());
+ Date start = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE).asText());
+ Date end = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE).asText());
+ Assert.assertEquals(startString, DateUtils.dateToString(start));
+ Assert.assertEquals(endString, DateUtils.dateToString(end));
+
+ //father history: start,failure,start failure; child not null == command type: start failure
+ childInstance = new ProcessInstance();
+ parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
+ parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
+ command = processService.createSubProcessCommand(
+ parentInstance, childInstance, instanceMap, task
+ );
+ Assert.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType());
+ }
+}
diff --git a/pom.xml b/pom.xml
index e895b01d89..9e4934e833 100644
--- a/pom.xml
+++ b/pom.xml
@@ -853,6 +853,7 @@
**/server/worker/EnvFileTest.java
**/server/worker/runner/TaskExecuteThreadTest.java
**/service/quartz/cron/CronUtilsTest.java
+ **/service/process/ProcessServiceTest.java
**/service/zk/DefaultEnsembleProviderTest.java
**/service/zk/ZKServerTest.java
**/service/zk/CuratorZookeeperClientTest.java