From 39411ce03b864bc770da220ad6f81df47bd2487b Mon Sep 17 00:00:00 2001 From: bao liang <29528966+lenboo@users.noreply.github.com> Date: Sat, 10 Oct 2020 15:05:56 +0800 Subject: [PATCH] [FIX-3617][Service]fix 2 tasks instance are generated when fault tolerance (#3873) * fix 2 tasks instance are generated when fault tolerance * update code style * update code style * update code style * update code style * update code style * update code style * update code style * update code style * update code style * update code style * update code style * update ut * update code style Co-authored-by: baoliang --- .../server/zk/ZKMasterClient.java | 604 +++++++++--------- .../service/process/ProcessService.java | 167 +++-- .../service/process/ProcessServiceTest.java | 116 ++++ pom.xml | 1 + 4 files changed, 520 insertions(+), 368 deletions(-) create mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 6abb381583..49bfb5f9a8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -20,6 +20,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.locks.InterProcessMutex; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ZKNodeType; @@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.zk.AbstractZKClient; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -45,309 +47,309 @@ import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; /** - * zookeeper master client - * - * single instance + * zookeeper master client + *

+ * single instance */ @Component public class ZKMasterClient extends AbstractZKClient { - /** - * logger - */ - private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class); - - /** - * process service - */ - @Autowired - private ProcessService processService; - - public void start() { - - InterProcessMutex mutex = null; - try { - // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master - String znodeLock = getMasterStartUpLockPath(); - mutex = new InterProcessMutex(getZkClient(), znodeLock); - mutex.acquire(); - - // init system znode - this.initSystemZNode(); - - while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)){ - ThreadUtils.sleep(SLEEP_TIME_MILLIS); - } - - - // self tolerant - if (getActiveMasterNum() == 1) { - failoverWorker(null, true); - failoverMaster(null); - } - - }catch (Exception e){ - logger.error("master start up exception",e); - }finally { - releaseMutex(mutex); - } - } - - @Override - public void close(){ - super.close(); - } - - /** - * handle path events that this class cares about - * @param client zkClient - * @param event path event - * @param path zk path - */ - @Override - protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { - //monitor master - if(path.startsWith(getZNodeParentPath(ZKNodeType.MASTER)+Constants.SINGLE_SLASH)){ - handleMasterEvent(event,path); - }else if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){ - //monitor worker - handleWorkerEvent(event,path); - } - } - - /** - * remove zookeeper node path - * - * @param path zookeeper node path - * @param zkNodeType zookeeper node type - * @param failover is failover - */ - private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) { - logger.info("{} node deleted : {}", zkNodeType.toString(), path); - InterProcessMutex mutex = null; - try { - String failoverPath = getFailoverLockPath(zkNodeType); - // create a distributed lock - mutex = new InterProcessMutex(getZkClient(), failoverPath); - mutex.acquire(); - - String serverHost = getHostByEventDataPath(path); - // handle dead server - handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); - //failover server - if(failover){ - failoverServerWhenDown(serverHost, zkNodeType); - } - }catch (Exception e){ - logger.error("{} server failover failed.", zkNodeType.toString()); - logger.error("failover exception ",e); - } - finally { - releaseMutex(mutex); - } - } - - /** - * failover server when server down - * - * @param serverHost server host - * @param zkNodeType zookeeper node type - * @throws Exception exception - */ - private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { - if(StringUtils.isEmpty(serverHost) || serverHost.startsWith(NetUtils.getHost())){ - return ; - } - switch (zkNodeType){ - case MASTER: - failoverMaster(serverHost); - break; - case WORKER: - failoverWorker(serverHost, true); - default: - break; - } - } - - /** - * get failover lock path - * - * @param zkNodeType zookeeper node type - * @return fail over lock path - */ - private String getFailoverLockPath(ZKNodeType zkNodeType){ - - switch (zkNodeType){ - case MASTER: - return getMasterFailoverLockPath(); - case WORKER: - return getWorkerFailoverLockPath(); - default: - return ""; - } - } - - /** - * monitor master - * @param event event - * @param path path - */ - public void handleMasterEvent(TreeCacheEvent event, String path){ - switch (event.getType()) { - case NODE_ADDED: - logger.info("master node added : {}", path); - break; - case NODE_REMOVED: - removeZKNodePath(path, ZKNodeType.MASTER, true); - break; - default: - break; - } - } - - /** - * monitor worker - * @param event event - * @param path path - */ - public void handleWorkerEvent(TreeCacheEvent event, String path){ - switch (event.getType()) { - case NODE_ADDED: - logger.info("worker node added : {}", path); - break; - case NODE_REMOVED: - logger.info("worker node deleted : {}", path); - removeZKNodePath(path, ZKNodeType.WORKER, true); - break; - default: - break; - } - } - - /** - * task needs failover if task start before worker starts - * - * @param taskInstance task instance - * @return true if task instance need fail over - */ - private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception { - - boolean taskNeedFailover = true; - - //now no host will execute this task instance,so no need to failover the task - if(taskInstance.getHost() == null){ - return false; - } - - // if the worker node exists in zookeeper, we must check the task starts after the worker - if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){ - //if task start after worker starts, there is no need to failover the task. - if(checkTaskAfterWorkerStart(taskInstance)){ - taskNeedFailover = false; - } - } - return taskNeedFailover; - } - - /** - * check task start after the worker server starts. - * - * @param taskInstance task instance - * @return true if task instance start time after worker server start date - */ - private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) { - if(StringUtils.isEmpty(taskInstance.getHost())){ - return false; - } - Date workerServerStartDate = null; - List workerServers = getServersList(ZKNodeType.WORKER); - for(Server workerServer : workerServers){ - if(taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())){ - workerServerStartDate = workerServer.getCreateTime(); - break; - } - } - - if(workerServerStartDate != null){ - return taskInstance.getStartTime().after(workerServerStartDate); - }else{ - return false; - } - } - - /** - * failover worker tasks - * - * 1. kill yarn job if there are yarn jobs in tasks. - * 2. change task state from running to need failover. - * 3. failover all tasks when workerHost is null - * @param workerHost worker host - */ - - /** - * failover worker tasks - * - * 1. kill yarn job if there are yarn jobs in tasks. - * 2. change task state from running to need failover. - * 3. failover all tasks when workerHost is null - * @param workerHost worker host - * @param needCheckWorkerAlive need check worker alive - * @throws Exception exception - */ - private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { - logger.info("start worker[{}] failover ...", workerHost); - - List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); - for(TaskInstance taskInstance : needFailoverTaskInstanceList){ - if(needCheckWorkerAlive){ - if(!checkTaskInstanceNeedFailover(taskInstance)){ - continue; - } - } - - ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); - if(processInstance != null){ - taskInstance.setProcessInstance(processInstance); - } - - TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() - .buildTaskInstanceRelatedInfo(taskInstance) - .buildProcessInstanceRelatedInfo(processInstance) - .create(); - // only kill yarn job if exists , the local thread has exited - ProcessUtils.killYarnJob(taskExecutionContext); - - taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); - processService.saveTaskInstance(taskInstance); - } - logger.info("end worker[{}] failover ...", workerHost); - } - - /** - * failover master tasks - * - * @param masterHost master host - */ - private void failoverMaster(String masterHost) { - logger.info("start master failover ..."); - - List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); - - //updateProcessInstance host is null and insert into command - for(ProcessInstance processInstance : needFailoverProcessInstanceList){ - if(Constants.NULL.equals(processInstance.getHost()) ){ - continue; - } - processService.processNeedFailoverProcessInstances(processInstance); - } - - logger.info("master failover end"); - } - - public InterProcessMutex blockAcquireMutex() throws Exception { - InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath()); - mutex.acquire(); - return mutex; - } - + /** + * logger + */ + private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class); + + /** + * process service + */ + @Autowired + private ProcessService processService; + + public void start() { + + InterProcessMutex mutex = null; + try { + // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master + String znodeLock = getMasterStartUpLockPath(); + mutex = new InterProcessMutex(getZkClient(), znodeLock); + mutex.acquire(); + + // init system znode + this.initSystemZNode(); + + while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)) { + ThreadUtils.sleep(SLEEP_TIME_MILLIS); + } + + // self tolerant + if (getActiveMasterNum() == 1) { + failoverWorker(null, true); + failoverMaster(null); + } + + } catch (Exception e) { + logger.error("master start up exception", e); + } finally { + releaseMutex(mutex); + } + } + + @Override + public void close() { + super.close(); + } + + /** + * handle path events that this class cares about + * + * @param client zkClient + * @param event path event + * @param path zk path + */ + @Override + protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { + //monitor master + if (path.startsWith(getZNodeParentPath(ZKNodeType.MASTER) + Constants.SINGLE_SLASH)) { + handleMasterEvent(event, path); + } else if (path.startsWith(getZNodeParentPath(ZKNodeType.WORKER) + Constants.SINGLE_SLASH)) { + //monitor worker + handleWorkerEvent(event, path); + } + } + + /** + * remove zookeeper node path + * + * @param path zookeeper node path + * @param zkNodeType zookeeper node type + * @param failover is failover + */ + private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) { + logger.info("{} node deleted : {}", zkNodeType.toString(), path); + InterProcessMutex mutex = null; + try { + String failoverPath = getFailoverLockPath(zkNodeType); + // create a distributed lock + mutex = new InterProcessMutex(getZkClient(), failoverPath); + mutex.acquire(); + + String serverHost = getHostByEventDataPath(path); + // handle dead server + handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); + //failover server + if (failover) { + failoverServerWhenDown(serverHost, zkNodeType); + } + } catch (Exception e) { + logger.error("{} server failover failed.", zkNodeType.toString()); + logger.error("failover exception ", e); + } finally { + releaseMutex(mutex); + } + } + + /** + * failover server when server down + * + * @param serverHost server host + * @param zkNodeType zookeeper node type + * @throws Exception exception + */ + private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { + if (StringUtils.isEmpty(serverHost)) { + return; + } + switch (zkNodeType) { + case MASTER: + failoverMaster(serverHost); + break; + case WORKER: + failoverWorker(serverHost, true); + break; + default: + break; + } + } + + /** + * get failover lock path + * + * @param zkNodeType zookeeper node type + * @return fail over lock path + */ + private String getFailoverLockPath(ZKNodeType zkNodeType) { + + switch (zkNodeType) { + case MASTER: + return getMasterFailoverLockPath(); + case WORKER: + return getWorkerFailoverLockPath(); + default: + return ""; + } + } + + /** + * monitor master + * + * @param event event + * @param path path + */ + public void handleMasterEvent(TreeCacheEvent event, String path) { + switch (event.getType()) { + case NODE_ADDED: + logger.info("master node added : {}", path); + break; + case NODE_REMOVED: + removeZKNodePath(path, ZKNodeType.MASTER, true); + break; + default: + break; + } + } + + /** + * monitor worker + * + * @param event event + * @param path path + */ + public void handleWorkerEvent(TreeCacheEvent event, String path) { + switch (event.getType()) { + case NODE_ADDED: + logger.info("worker node added : {}", path); + break; + case NODE_REMOVED: + logger.info("worker node deleted : {}", path); + removeZKNodePath(path, ZKNodeType.WORKER, true); + break; + default: + break; + } + } + + /** + * task needs failover if task start before worker starts + * + * @param taskInstance task instance + * @return true if task instance need fail over + */ + private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception { + + boolean taskNeedFailover = true; + + //now no host will execute this task instance,so no need to failover the task + if (taskInstance.getHost() == null) { + return false; + } + + // if the worker node exists in zookeeper, we must check the task starts after the worker + if (checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)) { + //if task start after worker starts, there is no need to failover the task. + if (checkTaskAfterWorkerStart(taskInstance)) { + taskNeedFailover = false; + } + } + return taskNeedFailover; + } + + /** + * check task start after the worker server starts. + * + * @param taskInstance task instance + * @return true if task instance start time after worker server start date + */ + private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) { + if (StringUtils.isEmpty(taskInstance.getHost())) { + return false; + } + Date workerServerStartDate = null; + List workerServers = getServersList(ZKNodeType.WORKER); + for (Server workerServer : workerServers) { + if (taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())) { + workerServerStartDate = workerServer.getCreateTime(); + break; + } + } + if (workerServerStartDate != null) { + return taskInstance.getStartTime().after(workerServerStartDate); + } + return false; + } + + /** + * failover worker tasks + * + * 1. kill yarn job if there are yarn jobs in tasks. + * 2. change task state from running to need failover. + * 3. failover all tasks when workerHost is null + * @param workerHost worker host + */ + + /** + * failover worker tasks + *

+ * 1. kill yarn job if there are yarn jobs in tasks. + * 2. change task state from running to need failover. + * 3. failover all tasks when workerHost is null + * + * @param workerHost worker host + * @param needCheckWorkerAlive need check worker alive + * @throws Exception exception + */ + private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { + logger.info("start worker[{}] failover ...", workerHost); + + List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); + for (TaskInstance taskInstance : needFailoverTaskInstanceList) { + if (needCheckWorkerAlive) { + if (!checkTaskInstanceNeedFailover(taskInstance)) { + continue; + } + } + + ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); + if (processInstance != null) { + taskInstance.setProcessInstance(processInstance); + } + + TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() + .buildTaskInstanceRelatedInfo(taskInstance) + .buildProcessInstanceRelatedInfo(processInstance) + .create(); + // only kill yarn job if exists , the local thread has exited + ProcessUtils.killYarnJob(taskExecutionContext); + + taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); + processService.saveTaskInstance(taskInstance); + } + logger.info("end worker[{}] failover ...", workerHost); + } + + /** + * failover master tasks + * + * @param masterHost master host + */ + private void failoverMaster(String masterHost) { + logger.info("start master failover ..."); + + List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); + + //updateProcessInstance host is null and insert into command + for (ProcessInstance processInstance : needFailoverProcessInstanceList) { + if (Constants.NULL.equals(processInstance.getHost())) { + continue; + } + processService.processNeedFailoverProcessInstances(processInstance); + } + + logger.info("master failover end"); + } + + public InterProcessMutex blockAcquireMutex() throws Exception { + InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath()); + mutex.acquire(); + return mutex; + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 7344cf13e5..7fca37470d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -896,7 +896,7 @@ public class ProcessService { return task; } if(!task.getState().typeIsFinished()){ - createSubWorkProcessCommand(processInstance, task); + createSubWorkProcess(processInstance, task); } logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ", @@ -906,20 +906,22 @@ public class ProcessService { /** * set work process instance map + * consider o + * repeat running does not generate new sub process instance + * set map {parent instance id, task instance id, 0(child instance id)} * @param parentInstance parentInstance * @param parentTask parentTask * @return process instance map */ private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask){ ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId()); - if(processMap != null){ + if (processMap != null) { return processMap; - }else if(parentInstance.getCommandType() == CommandType.REPEAT_RUNNING - || parentInstance.isComplementData()){ + } + if (parentInstance.getCommandType() == CommandType.REPEAT_RUNNING) { // update current task id to map - // repeat running does not generate new sub process instance processMap = findPreviousTaskProcessMap(parentInstance, parentTask); - if(processMap!= null){ + if (processMap != null) { processMap.setParentTaskInstanceId(parentTask.getId()); updateWorkProcessInstanceMap(processMap); return processMap; @@ -944,11 +946,11 @@ public class ProcessService { Integer preTaskId = 0; List preTaskList = this.findPreviousTaskListByWorkProcessId(parentProcessInstance.getId()); - for(TaskInstance task : preTaskList){ - if(task.getName().equals(parentTask.getName())){ + for (TaskInstance task : preTaskList) { + if (task.getName().equals(parentTask.getName())) { preTaskId = task.getId(); ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId); - if(map!=null){ + if (map != null) { return map; } } @@ -960,66 +962,111 @@ public class ProcessService { /** * create sub work process command + * * @param parentProcessInstance parentProcessInstance - * @param task task + * @param task task */ - private void createSubWorkProcessCommand(ProcessInstance parentProcessInstance, - TaskInstance task){ - if(!task.isSubProcess()){ + public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) { + if (!task.isSubProcess()) { return; } - ProcessInstanceMap instanceMap = setProcessInstanceMap(parentProcessInstance, task); - TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); - Map subProcessParam = JSONUtils.toMap(taskNode.getParams()); - Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); - - ProcessInstance childInstance = findSubProcessInstance(parentProcessInstance.getId(), task.getId()); - - CommandType fatherType = parentProcessInstance.getCommandType(); - CommandType commandType = fatherType; - if(childInstance == null){ - String fatherHistoryCommand = parentProcessInstance.getHistoryCmd(); - // sub process must begin with schedule/complement data - // if father begin with scheduler/complement data - if(fatherHistoryCommand.startsWith(CommandType.SCHEDULER.toString()) || - fatherHistoryCommand.startsWith(CommandType.COMPLEMENT_DATA.toString())){ - commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]); - } + //check create sub work flow firstly + ProcessInstanceMap instanceMap = findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId()); + if (null != instanceMap && CommandType.RECOVER_TOLERANCE_FAULT_PROCESS == parentProcessInstance.getCommandType()) { + // recover failover tolerance would not create a new command when the sub command already have been created + return; } - - if(childInstance != null){ - childInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); - updateProcessInstance(childInstance); + instanceMap = setProcessInstanceMap(parentProcessInstance, task); + ProcessInstance childInstance = null; + if (instanceMap.getProcessInstanceId() != 0) { + childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId()); } + Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task); + updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionId()); + initSubInstanceState(childInstance); + createCommand(subProcessCommand); + logger.info("sub process command created: {} ", subProcessCommand); + } + + /** + * complement data needs transform parent parameter to child. + * @param instanceMap + * @param parentProcessInstance + * @return + */ + private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance) { // set sub work process command String processMapStr = JSONUtils.toJsonString(instanceMap); Map cmdParam = JSONUtils.toMap(processMapStr); - - if(commandType == CommandType.COMPLEMENT_DATA || - (childInstance != null && childInstance.isComplementData())){ + if (parentProcessInstance.isComplementData()) { Map parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam()); - String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); - String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); + String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); + String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime); processMapStr = JSONUtils.toJsonString(cmdParam); } + return processMapStr; + } - updateSubProcessDefinitionByParent(parentProcessInstance, childDefineId); + /** + * create sub work process command + * @param parentProcessInstance + * @param childInstance + * @param instanceMap + * @param task + */ + public Command createSubProcessCommand(ProcessInstance parentProcessInstance, + ProcessInstance childInstance, + ProcessInstanceMap instanceMap, + TaskInstance task) { + CommandType commandType = getSubCommandType(parentProcessInstance, childInstance); + TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); + Map subProcessParam = JSONUtils.toMap(taskNode.getParams()); + Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); + String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance); + + return new Command( + commandType, + TaskDependType.TASK_POST, + parentProcessInstance.getFailureStrategy(), + parentProcessInstance.getExecutorId(), + childDefineId, + processParam, + parentProcessInstance.getWarningType(), + parentProcessInstance.getWarningGroupId(), + parentProcessInstance.getScheduleTime(), + parentProcessInstance.getProcessInstancePriority() + ); + } + + /** + * initialize sub work flow state + * child instance state would be initialized when 'recovery from pause/stop/failure' + * @param childInstance + */ + private void initSubInstanceState(ProcessInstance childInstance) { + if (childInstance != null) { + childInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + updateProcessInstance(childInstance); + } + } - Command command = new Command(); - command.setWarningType(parentProcessInstance.getWarningType()); - command.setWarningGroupId(parentProcessInstance.getWarningGroupId()); - command.setFailureStrategy(parentProcessInstance.getFailureStrategy()); - command.setProcessDefinitionId(childDefineId); - command.setScheduleTime(parentProcessInstance.getScheduleTime()); - command.setExecutorId(parentProcessInstance.getExecutorId()); - command.setCommandParam(processMapStr); - command.setCommandType(commandType); - command.setProcessInstancePriority(parentProcessInstance.getProcessInstancePriority()); - command.setWorkerGroup(parentProcessInstance.getWorkerGroup()); - createCommand(command); - logger.info("sub process command created: {} ", command.toString()); + /** + * get sub work flow command type + * child instance exist: child command = fatherCommand + * child instance not exists: child command = fatherCommand[0] + * + * @param parentProcessInstance + * @return + */ + private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) { + CommandType commandType = parentProcessInstance.getCommandType(); + if (childInstance == null) { + String fatherHistoryCommand = parentProcessInstance.getHistoryCmd(); + commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]); + } + return commandType; } /** @@ -1497,20 +1544,6 @@ public class ProcessService { return result; } - /** - * update pid and app links field by task instance id - * @param taskInstId taskInstId - * @param pid pid - * @param appLinks appLinks - */ - public void updatePidByTaskInstId(int taskInstId, int pid,String appLinks) { - - TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId); - taskInstance.setPid(pid); - taskInstance.setAppLink(appLinks); - saveTaskInstance(taskInstance); - } - /** * query schedule by id * @param id id diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java new file mode 100644 index 0000000000..74b52bb316 --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.process; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import com.fasterxml.jackson.databind.JsonNode; + +/** + * process service test + */ +public class ProcessServiceTest { + + @Test + public void testCreateSubCommand() { + ProcessService processService = new ProcessService(); + ProcessInstance parentInstance = new ProcessInstance(); + parentInstance.setProcessDefinitionId(1); + parentInstance.setWarningType(WarningType.SUCCESS); + parentInstance.setWarningGroupId(0); + + TaskInstance task = new TaskInstance(); + task.setTaskJson("{\"params\":{\"processDefinitionId\":100}}"); + task.setId(10); + + ProcessInstance childInstance = null; + ProcessInstanceMap instanceMap = new ProcessInstanceMap(); + instanceMap.setParentProcessInstanceId(1); + instanceMap.setParentTaskInstanceId(10); + Command command = null; + + //father history: start; child null == command type: start + parentInstance.setHistoryCmd("START_PROCESS"); + parentInstance.setCommandType(CommandType.START_PROCESS); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType()); + + //father history: start,start failure; child null == command type: start + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS"); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType()); + + //father history: scheduler,start failure; child null == command type: scheduler + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS"); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.SCHEDULER, command.getCommandType()); + + //father history: complement,start failure; child null == command type: complement + + String startString = "2020-01-01 00:00:00"; + String endString = "2020-01-10 00:00:00"; + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS"); + Map complementMap = new HashMap<>(); + complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE, startString); + complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE, endString); + parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap)); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType()); + + JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam()); + Date start = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE).asText()); + Date end = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE).asText()); + Assert.assertEquals(startString, DateUtils.dateToString(start)); + Assert.assertEquals(endString, DateUtils.dateToString(end)); + + //father history: start,failure,start failure; child not null == command type: start failure + childInstance = new ProcessInstance(); + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS"); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType()); + } +} diff --git a/pom.xml b/pom.xml index e895b01d89..9e4934e833 100644 --- a/pom.xml +++ b/pom.xml @@ -853,6 +853,7 @@ **/server/worker/EnvFileTest.java **/server/worker/runner/TaskExecuteThreadTest.java **/service/quartz/cron/CronUtilsTest.java + **/service/process/ProcessServiceTest.java **/service/zk/DefaultEnsembleProviderTest.java **/service/zk/ZKServerTest.java **/service/zk/CuratorZookeeperClientTest.java