diff --git a/.github/workflows/ci_ut.yml b/.github/workflows/ci_ut.yml index 7efc74b4c0..c32caf1821 100644 --- a/.github/workflows/ci_ut.yml +++ b/.github/workflows/ci_ut.yml @@ -90,4 +90,4 @@ jobs: run: | mkdir -p ${LOG_DIR} docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml logs dolphinscheduler-postgresql > ${LOG_DIR}/db.txt - continue-on-error: true + continue-on-error: true \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 686d73d8ac..a69533928f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -155,10 +155,10 @@ public class ZKMasterClient extends AbstractZKClient { * @throws Exception exception */ private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { - if(StringUtils.isEmpty(serverHost) || serverHost.startsWith(OSUtils.getHost())){ - return ; + if (StringUtils.isEmpty(serverHost)) { + return; } - switch (zkNodeType){ + switch (zkNodeType) { case MASTER: failoverMaster(serverHost); break; @@ -262,7 +262,7 @@ public class ZKMasterClient extends AbstractZKClient { Date workerServerStartDate = null; List workerServers = getServersList(ZKNodeType.WORKER); for(Server workerServer : workerServers){ - if(taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())){ + if(taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())){ workerServerStartDate = workerServer.getCreateTime(); break; } @@ -336,7 +336,7 @@ public class ZKMasterClient extends AbstractZKClient { //updateProcessInstance host is null and insert into command for(ProcessInstance processInstance : needFailoverProcessInstanceList){ if(Constants.NULL.equals(processInstance.getHost()) ){ - continue; + continue; } processService.processNeedFailoverProcessInstances(processInstance); } @@ -350,4 +350,4 @@ public class ZKMasterClient extends AbstractZKClient { return mutex; } -} +} \ No newline at end of file diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 40101fc3bc..903f5cd2a1 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -39,7 +39,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -import java.io.File; import java.util.*; import java.util.stream.Collectors; @@ -840,7 +839,7 @@ public class ProcessService { return task; } if(!task.getState().typeIsFinished()){ - createSubWorkProcessCommand(processInstance, task); + createSubWorkProcess(processInstance, task); } logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ", @@ -850,20 +849,22 @@ public class ProcessService { /** * set work process instance map + * consider o + * repeat running does not generate new sub process instance + * set map {parent instance id, task instance id, 0(child instance id)} * @param parentInstance parentInstance * @param parentTask parentTask * @return process instance map */ - private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask){ + private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) { ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId()); - if(processMap != null){ + if (processMap != null) { return processMap; - }else if(parentInstance.getCommandType() == CommandType.REPEAT_RUNNING - || parentInstance.isComplementData()){ + } + if (parentInstance.getCommandType() == CommandType.REPEAT_RUNNING) { // update current task id to map - // repeat running does not generate new sub process instance processMap = findPreviousTaskProcessMap(parentInstance, parentTask); - if(processMap!= null){ + if (processMap != null) { processMap.setParentTaskInstanceId(parentTask.getId()); updateWorkProcessInstanceMap(processMap); return processMap; @@ -892,7 +893,7 @@ public class ProcessService { if(task.getName().equals(parentTask.getName())){ preTaskId = task.getId(); ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId); - if(map!=null){ + if(map != null){ return map; } } @@ -907,63 +908,110 @@ public class ProcessService { * @param parentProcessInstance parentProcessInstance * @param task task */ - private void createSubWorkProcessCommand(ProcessInstance parentProcessInstance, - TaskInstance task){ - if(!task.isSubProcess()){ + public void createSubWorkProcess(ProcessInstance parentProcessInstance, + TaskInstance task) { + if (!task.isSubProcess()) { return; } - ProcessInstanceMap instanceMap = setProcessInstanceMap(parentProcessInstance, task); - TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); - Map subProcessParam = JSONUtils.toMap(taskNode.getParams()); - Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); + //check create sub work flow firstly + ProcessInstanceMap instanceMap = findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId()); + if (null != instanceMap && CommandType.RECOVER_TOLERANCE_FAULT_PROCESS == parentProcessInstance.getCommandType()) { + // recover failover tolerance would not create a new command when the sub command already have been created + return; + } + instanceMap = setProcessInstanceMap(parentProcessInstance, task); + ProcessInstance childInstance = null; + if (instanceMap.getProcessInstanceId() != 0) { + childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId()); + } + Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task); + updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionId()); + initSubInstanceState(childInstance); + createCommand(subProcessCommand); + logger.info("sub process command created: {} ", subProcessCommand); + } - ProcessInstance childInstance = findSubProcessInstance(parentProcessInstance.getId(), task.getId()); - CommandType fatherType = parentProcessInstance.getCommandType(); - CommandType commandType = fatherType; - if(childInstance == null){ - String fatherHistoryCommand = parentProcessInstance.getHistoryCmd(); - // sub process must begin with schedule/complement data - // if father begin with scheduler/complement data - if(fatherHistoryCommand.startsWith(CommandType.SCHEDULER.toString()) || - fatherHistoryCommand.startsWith(CommandType.COMPLEMENT_DATA.toString())){ - commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]); - } - } - if(childInstance != null){ - childInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); - updateProcessInstance(childInstance); - } + /** + * complement data needs transform parent parameter to child. + * @param instanceMap + * @param parentProcessInstance + * @return + */ + private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance) { // set sub work process command String processMapStr = JSONUtils.toJson(instanceMap); Map cmdParam = JSONUtils.toMap(processMapStr); - - if(commandType == CommandType.COMPLEMENT_DATA || - (childInstance != null && childInstance.isComplementData())){ + if (parentProcessInstance.isComplementData()) { Map parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam()); - String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); - String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); + String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); + String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime); processMapStr = JSONUtils.toJson(cmdParam); } + return processMapStr; + } - updateSubProcessDefinitionByParent(parentProcessInstance, childDefineId); + /** + * create sub work process command + * @param parentProcessInstance + * @param childInstance + * @param instanceMap + * @param task + */ + public Command createSubProcessCommand(ProcessInstance parentProcessInstance, + ProcessInstance childInstance, + ProcessInstanceMap instanceMap, + TaskInstance task) { + CommandType commandType = getSubCommandType(parentProcessInstance, childInstance); + TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); + Map subProcessParam = JSONUtils.toMap(taskNode.getParams()); + Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); + String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance); + + return new Command( + commandType, + TaskDependType.TASK_POST, + parentProcessInstance.getFailureStrategy(), + parentProcessInstance.getExecutorId(), + childDefineId, + processParam, + parentProcessInstance.getWarningType(), + parentProcessInstance.getWarningGroupId(), + parentProcessInstance.getScheduleTime(), + parentProcessInstance.getProcessInstancePriority() + ); + } + + /** + * initialize sub work flow state + * child instance state would be initialized when 'recovery from pause/stop/failure' + * @param childInstance + */ + private void initSubInstanceState(ProcessInstance childInstance) { + if (childInstance != null) { + childInstance.setState(ExecutionStatus.RUNNING_EXEUTION); + updateProcessInstance(childInstance); + } + } - Command command = new Command(); - command.setWarningType(parentProcessInstance.getWarningType()); - command.setWarningGroupId(parentProcessInstance.getWarningGroupId()); - command.setFailureStrategy(parentProcessInstance.getFailureStrategy()); - command.setProcessDefinitionId(childDefineId); - command.setScheduleTime(parentProcessInstance.getScheduleTime()); - command.setExecutorId(parentProcessInstance.getExecutorId()); - command.setCommandParam(processMapStr); - command.setCommandType(commandType); - command.setProcessInstancePriority(parentProcessInstance.getProcessInstancePriority()); - command.setWorkerGroup(parentProcessInstance.getWorkerGroup()); - createCommand(command); - logger.info("sub process command created: {} ", command.toString()); + /** + * get sub work flow command type + * child instance exist: child command = fatherCommand + * child instance not exists: child command = fatherCommand[0] + * + * @param parentProcessInstance + * @return + */ + private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) { + CommandType commandType = parentProcessInstance.getCommandType(); + if (childInstance == null) { + String fatherHistoryCommand = parentProcessInstance.getHistoryCmd(); + commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]); + } + return commandType; } /** @@ -1065,9 +1113,9 @@ public class ProcessService { public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState){ ExecutionStatus state = taskInstance.getState(); if( - // running or killed - // the task already exists in task queue - // return state + // running or killed + // the task already exists in task queue + // return state state == ExecutionStatus.RUNNING_EXEUTION || state == ExecutionStatus.KILL || checkTaskExistsInTaskQueue(taskInstance) @@ -1252,7 +1300,7 @@ public class ProcessService { * @return task instance list */ public List findValidTaskListByProcessId(Integer processInstanceId){ - return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES); + return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES); } /** @@ -1431,20 +1479,6 @@ public class ProcessService { return result; } - /** - * update pid and app links field by task instance id - * @param taskInstId taskInstId - * @param pid pid - * @param appLinks appLinks - */ - public void updatePidByTaskInstId(int taskInstId, int pid,String appLinks) { - - TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId); - taskInstance.setPid(pid); - taskInstance.setAppLink(appLinks); - saveTaskInstance(taskInstance); - } - /** * query schedule by id * @param id id diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java new file mode 100644 index 0000000000..6ca96d91bb --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.service.process; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; + +import org.junit.Assert; +import org.junit.Test; + +import com.alibaba.fastjson.JSONObject; + +/** + * process service test + */ +public class ProcessServiceTest { + + + @Test + public void testCreateSubCommand() { + + ProcessService processService = new ProcessService(); + ProcessInstance parentInstance = new ProcessInstance(); + parentInstance.setProcessDefinitionId(1); + parentInstance.setWarningType(WarningType.SUCCESS); + parentInstance.setWarningGroupId(0); + + TaskInstance task = new TaskInstance(); + task.setTaskJson("{params:{processDefinitionId:100}}"); + task.setId(10); + + ProcessInstance childInstance = null; + ProcessInstanceMap instanceMap = new ProcessInstanceMap(); + instanceMap.setParentProcessInstanceId(1); + instanceMap.setParentTaskInstanceId(10); + Command command = null; + + //father history: start; child null == command type: start + parentInstance.setHistoryCmd("START_PROCESS"); + parentInstance.setCommandType(CommandType.START_PROCESS); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType()); + + //father history: start,start failure; child null == command type: start + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS"); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType()); + + + //father history: scheduler,start failure; child null == command type: scheduler + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS"); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.SCHEDULER, command.getCommandType()); + + //father history: complement,start failure; child null == command type: complement + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS"); + parentInstance.setCommandParam("{complementStartDate:'2020-01-01',complementEndDate:'2020-01-10'}"); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType()); + + JSONObject complementDate = JSONUtils.parseObject(command.getCommandParam()); + Assert.assertEquals("2020-01-01", String.valueOf(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE))); + Assert.assertEquals("2020-01-10", String.valueOf(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE))); + + //father history: start,failure,start failure; child not null == command type: start failure + childInstance = new ProcessInstance(); + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS"); + command = processService.createSubProcessCommand( + parentInstance, childInstance, instanceMap, task + ); + Assert.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType()); + + + } +} diff --git a/pom.xml b/pom.xml index b08cb52096..672a0b6aa0 100644 --- a/pom.xml +++ b/pom.xml @@ -819,6 +819,7 @@ **/server/worker/task/EnvFileTest.java **/server/worker/task/spark/SparkTaskTest.java **/server/worker/EnvFileTest.java + **/service/process/ProcessServiceTest.java **/service/quartz/cron/CronUtilsTest.java **/service/zk/DefaultEnsembleProviderTest.java **/service/zk/ZKServerTest.java