Browse Source

[FIX-3617][Service]after subtask fault tolerance, 2 task instances are generated (#3830)

* fix bug(#3617): after subtask fault tolerance, 2 task instances are generated.

* delete unused code

* update code smell

* refactor sub work command process

* add process service ut

* add license header

* fix some code smell

* chang ut java8 to java11

* update sonar to java11

* copy ut config from dev

* remove checkstyle

* revert to 1.3.3

* change proess service test to executor service

* add process service test

* add process service test

* revert

* revert

* add comments

* change dev to 1.3.3-release

* revert

Co-authored-by: baoliang <baoliang@analysys.com.cn>
1.3.3-release
bao liang 4 years ago committed by GitHub
parent
commit
f29f4e8384
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      .github/workflows/ci_ut.yml
  2. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  3. 172
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  4. 107
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  5. 1
      pom.xml

2
.github/workflows/ci_ut.yml

@ -90,4 +90,4 @@ jobs:
run: | run: |
mkdir -p ${LOG_DIR} mkdir -p ${LOG_DIR}
docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml logs dolphinscheduler-postgresql > ${LOG_DIR}/db.txt docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml logs dolphinscheduler-postgresql > ${LOG_DIR}/db.txt
continue-on-error: true continue-on-error: true

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -155,10 +155,10 @@ public class ZKMasterClient extends AbstractZKClient {
* @throws Exception exception * @throws Exception exception
*/ */
private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
if(StringUtils.isEmpty(serverHost) || serverHost.startsWith(OSUtils.getHost())){ if (StringUtils.isEmpty(serverHost)) {
return ; return;
} }
switch (zkNodeType){ switch (zkNodeType) {
case MASTER: case MASTER:
failoverMaster(serverHost); failoverMaster(serverHost);
break; break;
@ -262,7 +262,7 @@ public class ZKMasterClient extends AbstractZKClient {
Date workerServerStartDate = null; Date workerServerStartDate = null;
List<Server> workerServers = getServersList(ZKNodeType.WORKER); List<Server> workerServers = getServersList(ZKNodeType.WORKER);
for(Server workerServer : workerServers){ for(Server workerServer : workerServers){
if(taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())){ if(taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())){
workerServerStartDate = workerServer.getCreateTime(); workerServerStartDate = workerServer.getCreateTime();
break; break;
} }
@ -336,7 +336,7 @@ public class ZKMasterClient extends AbstractZKClient {
//updateProcessInstance host is null and insert into command //updateProcessInstance host is null and insert into command
for(ProcessInstance processInstance : needFailoverProcessInstanceList){ for(ProcessInstance processInstance : needFailoverProcessInstanceList){
if(Constants.NULL.equals(processInstance.getHost()) ){ if(Constants.NULL.equals(processInstance.getHost()) ){
continue; continue;
} }
processService.processNeedFailoverProcessInstances(processInstance); processService.processNeedFailoverProcessInstances(processInstance);
} }
@ -350,4 +350,4 @@ public class ZKMasterClient extends AbstractZKClient {
return mutex; return mutex;
} }
} }

172
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -39,7 +39,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -840,7 +839,7 @@ public class ProcessService {
return task; return task;
} }
if(!task.getState().typeIsFinished()){ if(!task.getState().typeIsFinished()){
createSubWorkProcessCommand(processInstance, task); createSubWorkProcess(processInstance, task);
} }
logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ", logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ",
@ -850,20 +849,22 @@ public class ProcessService {
/** /**
* set work process instance map * set work process instance map
* consider o
* repeat running does not generate new sub process instance
* set map {parent instance id, task instance id, 0(child instance id)}
* @param parentInstance parentInstance * @param parentInstance parentInstance
* @param parentTask parentTask * @param parentTask parentTask
* @return process instance map * @return process instance map
*/ */
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask){ private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) {
ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId()); ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId());
if(processMap != null){ if (processMap != null) {
return processMap; return processMap;
}else if(parentInstance.getCommandType() == CommandType.REPEAT_RUNNING }
|| parentInstance.isComplementData()){ if (parentInstance.getCommandType() == CommandType.REPEAT_RUNNING) {
// update current task id to map // update current task id to map
// repeat running does not generate new sub process instance
processMap = findPreviousTaskProcessMap(parentInstance, parentTask); processMap = findPreviousTaskProcessMap(parentInstance, parentTask);
if(processMap!= null){ if (processMap != null) {
processMap.setParentTaskInstanceId(parentTask.getId()); processMap.setParentTaskInstanceId(parentTask.getId());
updateWorkProcessInstanceMap(processMap); updateWorkProcessInstanceMap(processMap);
return processMap; return processMap;
@ -892,7 +893,7 @@ public class ProcessService {
if(task.getName().equals(parentTask.getName())){ if(task.getName().equals(parentTask.getName())){
preTaskId = task.getId(); preTaskId = task.getId();
ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId); ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId);
if(map!=null){ if(map != null){
return map; return map;
} }
} }
@ -907,63 +908,110 @@ public class ProcessService {
* @param parentProcessInstance parentProcessInstance * @param parentProcessInstance parentProcessInstance
* @param task task * @param task task
*/ */
private void createSubWorkProcessCommand(ProcessInstance parentProcessInstance, public void createSubWorkProcess(ProcessInstance parentProcessInstance,
TaskInstance task){ TaskInstance task) {
if(!task.isSubProcess()){ if (!task.isSubProcess()) {
return; return;
} }
ProcessInstanceMap instanceMap = setProcessInstanceMap(parentProcessInstance, task); //check create sub work flow firstly
TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); ProcessInstanceMap instanceMap = findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId());
Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams()); if (null != instanceMap && CommandType.RECOVER_TOLERANCE_FAULT_PROCESS == parentProcessInstance.getCommandType()) {
Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); // recover failover tolerance would not create a new command when the sub command already have been created
return;
}
instanceMap = setProcessInstanceMap(parentProcessInstance, task);
ProcessInstance childInstance = null;
if (instanceMap.getProcessInstanceId() != 0) {
childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId());
}
Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task);
updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionId());
initSubInstanceState(childInstance);
createCommand(subProcessCommand);
logger.info("sub process command created: {} ", subProcessCommand);
}
ProcessInstance childInstance = findSubProcessInstance(parentProcessInstance.getId(), task.getId());
CommandType fatherType = parentProcessInstance.getCommandType();
CommandType commandType = fatherType;
if(childInstance == null){
String fatherHistoryCommand = parentProcessInstance.getHistoryCmd();
// sub process must begin with schedule/complement data
// if father begin with scheduler/complement data
if(fatherHistoryCommand.startsWith(CommandType.SCHEDULER.toString()) ||
fatherHistoryCommand.startsWith(CommandType.COMPLEMENT_DATA.toString())){
commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
}
}
if(childInstance != null){ /**
childInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); * complement data needs transform parent parameter to child.
updateProcessInstance(childInstance); * @param instanceMap
} * @param parentProcessInstance
* @return
*/
private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance) {
// set sub work process command // set sub work process command
String processMapStr = JSONUtils.toJson(instanceMap); String processMapStr = JSONUtils.toJson(instanceMap);
Map<String, String> cmdParam = JSONUtils.toMap(processMapStr); Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);
if (parentProcessInstance.isComplementData()) {
if(commandType == CommandType.COMPLEMENT_DATA ||
(childInstance != null && childInstance.isComplementData())){
Map<String, String> parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam()); Map<String, String> parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam());
String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime);
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
processMapStr = JSONUtils.toJson(cmdParam); processMapStr = JSONUtils.toJson(cmdParam);
} }
return processMapStr;
}
updateSubProcessDefinitionByParent(parentProcessInstance, childDefineId); /**
* create sub work process command
* @param parentProcessInstance
* @param childInstance
* @param instanceMap
* @param task
*/
public Command createSubProcessCommand(ProcessInstance parentProcessInstance,
ProcessInstance childInstance,
ProcessInstanceMap instanceMap,
TaskInstance task) {
CommandType commandType = getSubCommandType(parentProcessInstance, childInstance);
TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance);
return new Command(
commandType,
TaskDependType.TASK_POST,
parentProcessInstance.getFailureStrategy(),
parentProcessInstance.getExecutorId(),
childDefineId,
processParam,
parentProcessInstance.getWarningType(),
parentProcessInstance.getWarningGroupId(),
parentProcessInstance.getScheduleTime(),
parentProcessInstance.getProcessInstancePriority()
);
}
/**
* initialize sub work flow state
* child instance state would be initialized when 'recovery from pause/stop/failure'
* @param childInstance
*/
private void initSubInstanceState(ProcessInstance childInstance) {
if (childInstance != null) {
childInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
updateProcessInstance(childInstance);
}
}
Command command = new Command(); /**
command.setWarningType(parentProcessInstance.getWarningType()); * get sub work flow command type
command.setWarningGroupId(parentProcessInstance.getWarningGroupId()); * child instance exist: child command = fatherCommand
command.setFailureStrategy(parentProcessInstance.getFailureStrategy()); * child instance not exists: child command = fatherCommand[0]
command.setProcessDefinitionId(childDefineId); *
command.setScheduleTime(parentProcessInstance.getScheduleTime()); * @param parentProcessInstance
command.setExecutorId(parentProcessInstance.getExecutorId()); * @return
command.setCommandParam(processMapStr); */
command.setCommandType(commandType); private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) {
command.setProcessInstancePriority(parentProcessInstance.getProcessInstancePriority()); CommandType commandType = parentProcessInstance.getCommandType();
command.setWorkerGroup(parentProcessInstance.getWorkerGroup()); if (childInstance == null) {
createCommand(command); String fatherHistoryCommand = parentProcessInstance.getHistoryCmd();
logger.info("sub process command created: {} ", command.toString()); commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
}
return commandType;
} }
/** /**
@ -1065,9 +1113,9 @@ public class ProcessService {
public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState){ public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState){
ExecutionStatus state = taskInstance.getState(); ExecutionStatus state = taskInstance.getState();
if( if(
// running or killed // running or killed
// the task already exists in task queue // the task already exists in task queue
// return state // return state
state == ExecutionStatus.RUNNING_EXEUTION state == ExecutionStatus.RUNNING_EXEUTION
|| state == ExecutionStatus.KILL || state == ExecutionStatus.KILL
|| checkTaskExistsInTaskQueue(taskInstance) || checkTaskExistsInTaskQueue(taskInstance)
@ -1252,7 +1300,7 @@ public class ProcessService {
* @return task instance list * @return task instance list
*/ */
public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId){ public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId){
return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES); return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES);
} }
/** /**
@ -1431,20 +1479,6 @@ public class ProcessService {
return result; return result;
} }
/**
* update pid and app links field by task instance id
* @param taskInstId taskInstId
* @param pid pid
* @param appLinks appLinks
*/
public void updatePidByTaskInstId(int taskInstId, int pid,String appLinks) {
TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId);
taskInstance.setPid(pid);
taskInstance.setAppLink(appLinks);
saveTaskInstance(taskInstance);
}
/** /**
* query schedule by id * query schedule by id
* @param id id * @param id id

107
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.process;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.junit.Assert;
import org.junit.Test;
import com.alibaba.fastjson.JSONObject;
/**
* process service test
*/
public class ProcessServiceTest {
@Test
public void testCreateSubCommand() {
ProcessService processService = new ProcessService();
ProcessInstance parentInstance = new ProcessInstance();
parentInstance.setProcessDefinitionId(1);
parentInstance.setWarningType(WarningType.SUCCESS);
parentInstance.setWarningGroupId(0);
TaskInstance task = new TaskInstance();
task.setTaskJson("{params:{processDefinitionId:100}}");
task.setId(10);
ProcessInstance childInstance = null;
ProcessInstanceMap instanceMap = new ProcessInstanceMap();
instanceMap.setParentProcessInstanceId(1);
instanceMap.setParentTaskInstanceId(10);
Command command = null;
//father history: start; child null == command type: start
parentInstance.setHistoryCmd("START_PROCESS");
parentInstance.setCommandType(CommandType.START_PROCESS);
command = processService.createSubProcessCommand(
parentInstance, childInstance, instanceMap, task
);
Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType());
//father history: start,start failure; child null == command type: start
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
command = processService.createSubProcessCommand(
parentInstance, childInstance, instanceMap, task
);
Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType());
//father history: scheduler,start failure; child null == command type: scheduler
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS");
command = processService.createSubProcessCommand(
parentInstance, childInstance, instanceMap, task
);
Assert.assertEquals(CommandType.SCHEDULER, command.getCommandType());
//father history: complement,start failure; child null == command type: complement
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS");
parentInstance.setCommandParam("{complementStartDate:'2020-01-01',complementEndDate:'2020-01-10'}");
command = processService.createSubProcessCommand(
parentInstance, childInstance, instanceMap, task
);
Assert.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType());
JSONObject complementDate = JSONUtils.parseObject(command.getCommandParam());
Assert.assertEquals("2020-01-01", String.valueOf(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE)));
Assert.assertEquals("2020-01-10", String.valueOf(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE)));
//father history: start,failure,start failure; child not null == command type: start failure
childInstance = new ProcessInstance();
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
command = processService.createSubProcessCommand(
parentInstance, childInstance, instanceMap, task
);
Assert.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType());
}
}

1
pom.xml

@ -819,6 +819,7 @@
<include>**/server/worker/task/EnvFileTest.java</include> <include>**/server/worker/task/EnvFileTest.java</include>
<include>**/server/worker/task/spark/SparkTaskTest.java</include> <include>**/server/worker/task/spark/SparkTaskTest.java</include>
<include>**/server/worker/EnvFileTest.java</include> <include>**/server/worker/EnvFileTest.java</include>
<include>**/service/process/ProcessServiceTest.java</include>
<include>**/service/quartz/cron/CronUtilsTest.java</include> <include>**/service/quartz/cron/CronUtilsTest.java</include>
<include>**/service/zk/DefaultEnsembleProviderTest.java</include> <include>**/service/zk/DefaultEnsembleProviderTest.java</include>
<include>**/service/zk/ZKServerTest.java</include> <include>**/service/zk/ZKServerTest.java</include>

Loading…
Cancel
Save