Browse Source

Fix Recover WorkflowInstance will casue workflow Instance state is success but task insatnce is killed/paused (#15574)

dev_wenjun_refactorMaster
Wenjun Ruan 4 months ago committed by GitHub
parent
commit
43a06525a2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  2. 33
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
  3. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
  4. 99
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
  5. 24
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
  6. 110
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
  7. 3
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  8. 61
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -769,7 +769,6 @@ public class ProcessInstanceServiceTest {
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog());
when(processInstanceMapper.queryDetailById(1)).thenReturn(processInstance);
when(taskInstanceMapper.queryByInstanceIdAndName(Mockito.anyInt(), Mockito.any())).thenReturn(taskInstance);
DAG<Long, TaskNode, TaskNodeRelation> graph = new DAG<>();
for (long i = 1; i <= 7; ++i) {
graph.addNode(i, new TaskNode());

33
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java

@ -39,23 +39,10 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
*/
public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
List<Integer> queryTaskByProcessIdAndState(@Param("processInstanceId") Integer processInstanceId,
@Param("state") Integer state);
List<TaskInstance> findValidTaskListByProcessId(@Param("processInstanceId") Integer processInstanceId,
@Param("flag") Flag flag,
@Param("testFlag") int testFlag);
List<TaskInstance> queryByHostAndStatus(@Param("host") String host,
@Param("states") int[] stateArray);
int setFailoverByHostAndStateArray(@Param("host") String host,
@Param("states") int[] stateArray,
@Param("destStatus") TaskExecutionStatus destStatus);
TaskInstance queryByInstanceIdAndName(@Param("processInstanceId") int processInstanceId,
@Param("name") String name);
TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processInstanceId,
@Param("taskCode") Long taskCode);
@ -66,9 +53,6 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
List<TaskInstance> queryByProcessInstanceIdsAndTaskCodes(@Param("processInstanceIds") List<Integer> processInstanceIds,
@Param("taskCodes") List<Long> taskCodes);
Integer countTask(@Param("projectCodes") Long[] projectCodes,
@Param("taskIds") int[] taskIds);
/**
* Statistics task instance group by given project codes list by start time
* <p>
@ -97,20 +81,6 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
@Param("endTime") Date endTime,
@Param("projectIds") Set<Integer> projectIds);
/**
* Statistics task instance group by given project codes list by submit time
* <p>
* We only need project codes to determine whether the task instance belongs to the user or not.
*
* @param startTime Statistics start time
* @param endTime Statistics end time
* @param projectCodes Project codes list to filter
* @return List of ExecuteStatusCount
*/
List<ExecuteStatusCount> countTaskInstanceStateByProjectCodesAndStatesBySubmitTime(@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
@Param("projectCodes") Long[] projectCodes,
@Param("states") List<TaskExecutionStatus> states);
/**
* Statistics task instance group by given project codes list by submit time
* <p>
@ -159,9 +129,6 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
@Param("startTime") Date startTime,
@Param("endTime") Date endTime);
List<TaskInstance> loadAllInfosNoRelease(@Param("processInstanceId") int processInstanceId,
@Param("status") int status);
void deleteByWorkflowInstanceId(@Param("workflowInstanceId") int workflowInstanceId);
List<TaskInstance> findByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java

@ -100,9 +100,10 @@ public interface TaskInstanceDao extends IDao<TaskInstance> {
/**
* find last task instance corresponding to taskCode in the date interval
*
* @param processInstanceId Task's parent process instance id
* @param depTaskCode taskCode
* @param testFlag test flag
* @param depTaskCode taskCode
* @param testFlag test flag
* @return task instance
*/
TaskInstance queryLastTaskInstanceIntervalInProcessInstance(Integer processInstanceId,

99
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

@ -30,24 +30,6 @@
${alias}.flag, ${alias}.is_cache, ${alias}.cache_key, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id,
${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run, ${alias}.test_flag, ${alias}.task_group_id, ${alias}.task_execute_type
</sql>
<update id="setFailoverByHostAndStateArray">
update t_ds_task_instance
set state = #{destStatus}
where host = #{host}
<if test="states != null and states.length != 0">
and state in
<foreach collection="states" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
</update>
<select id="queryTaskByProcessIdAndState" resultType="java.lang.Integer">
select id
from t_ds_task_instance
WHERE process_instance_id = #{processInstanceId}
and state = #{state}
and flag = 1
</select>
<select id="findValidTaskListByProcessId" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
@ -63,21 +45,6 @@
from t_ds_task_instance
WHERE process_instance_id = #{workflowInstanceId}
</select>
<select id="queryByHostAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
from t_ds_task_instance
where 1 = 1
<if test="host != null and host != ''">
and host = #{host}
</if>
<if test="states != null and states.length != 0">
and state in
<foreach collection="states" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
</select>
<select id="countTaskInstanceStateByProjectCodes" resultType="org.apache.dolphinscheduler.dao.model.TaskInstanceStatusCountDto">
select state, count(0) as count
@ -118,32 +85,7 @@
</if>
group by t.state
</select>
<select id="countTaskInstanceStateByProjectCodesAndStatesBySubmitTime" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
select
state, count(0) as count
from t_ds_task_instance t
left join t_ds_task_definition_log d on d.code=t.task_code and d.version=t.task_definition_version
where 1=1
<if test="states != null and states.size != 0">
and t.state in
<foreach collection="states" index="index" item="state" open="(" separator="," close=")">
#{state.code}
</foreach>
</if>
<if test="projectCodes != null and projectCodes.length != 0">
and d.project_code in
<foreach collection="projectCodes" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
<if test="startTime != null">
and t.submit_time <![CDATA[ > ]]> #{startTime}
</if>
<if test="endTime != null">
and t.submit_time <![CDATA[ <= ]]> #{endTime}
</if>
group by t.state
</select>
<select id="countTaskInstanceStateByProjectCodesAndStatesBySubmitTimeV2" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
select
state, count(0) as count
@ -181,15 +123,7 @@
</if>
group by t.state
</select>
<select id="queryByInstanceIdAndName" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
from t_ds_task_instance
where process_instance_id = #{processInstanceId}
and name = #{name}
and flag = 1
limit 1
</select>
<select id="queryByInstanceIdAndCode" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
@ -229,24 +163,7 @@
</foreach>
</if>
</select>
<select id="countTask" resultType="java.lang.Integer">
select count(1) as count
from t_ds_task_instance task,t_ds_task_definition_log define
where task.task_code=define.code
and task.task_definition_version=define.version
<if test="projectCodes != null and projectCodes.length != 0">
and define.project_code in
<foreach collection="projectCodes" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
<if test="taskIds != null and taskIds.length != 0">
and task.id in
<foreach collection="taskIds" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
</select>
<select id="queryTaskInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
@ -330,16 +247,6 @@
</if>
order by start_time desc
</select>
<select id="loadAllInfosNoRelease" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSqlV2">
<property name="alias" value="instance"/>
</include>
from t_ds_task_instance instance
left join t_ds_task_group_queue que on instance.id = que.task_id
where instance.process_instance_id = #{processInstanceId}
and que.status = #{status}
</select>
<select id="findLastTaskInstances" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSqlV2">

24
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java

@ -197,30 +197,6 @@ public class ProcessInstanceMapperTest extends BaseDaoTest {
processInstanceMapper.deleteById(processInstance.getId());
}
/**
* test set failover by host and state
*/
@Test
public void testSetFailoverByHostAndStateArray() {
int[] stateArray = new int[]{
WorkflowExecutionStatus.RUNNING_EXECUTION.ordinal(),
WorkflowExecutionStatus.SUCCESS.ordinal()};
ProcessInstance processInstance = insertOne();
processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
processInstance.setHost("192.168.2.220");
processInstanceMapper.updateById(processInstance);
String host = processInstance.getHost();
int update = processInstanceMapper.setFailoverByHostAndStateArray(host, stateArray);
Assertions.assertNotEquals(0, update);
processInstance = processInstanceMapper.selectById(processInstance.getId());
Assertions.assertNull(processInstance.getHost());
processInstanceMapper.deleteById(processInstance.getId());
}
/**
* test update process instance by state
*/

110
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java

@ -143,25 +143,6 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
Assertions.assertNotEquals(0, taskInstances.size());
}
/**
* test query task instance by process instance id and state
*/
@Test
public void testQueryTaskByProcessIdAndState() {
// insert ProcessInstance
ProcessInstance processInstance = insertProcessInstance();
// insert taskInstance
TaskInstance task = insertTaskInstance(processInstance.getId());
task.setProcessInstanceId(processInstance.getId());
taskInstanceMapper.updateById(task);
List<Integer> taskInstances = taskInstanceMapper.queryTaskByProcessIdAndState(
task.getProcessInstanceId(),
TaskExecutionStatus.RUNNING_EXECUTION.getCode());
taskInstanceMapper.deleteById(task.getId());
Assertions.assertNotEquals(0, taskInstances.size());
}
/**
* test find valid task list by process instance id
*/
@ -194,66 +175,6 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
Assertions.assertNotEquals(0, taskInstances1.size());
}
/**
* test query by host and status
*/
@Test
public void testQueryByHostAndStatus() {
// insert ProcessInstance
ProcessInstance processInstance = insertProcessInstance();
// insert taskInstance
TaskInstance task = insertTaskInstance(processInstance.getId());
task.setHost("111.111.11.11");
taskInstanceMapper.updateById(task);
List<TaskInstance> taskInstances = taskInstanceMapper.queryByHostAndStatus(
task.getHost(), new int[]{TaskExecutionStatus.RUNNING_EXECUTION.getCode()});
taskInstanceMapper.deleteById(task.getId());
Assertions.assertNotEquals(0, taskInstances.size());
}
/**
* test set failover by host and state array
*/
@Test
public void testSetFailoverByHostAndStateArray() {
// insert ProcessInstance
ProcessInstance processInstance = insertProcessInstance();
// insert taskInstance
TaskInstance task = insertTaskInstance(processInstance.getId());
task.setHost("111.111.11.11");
taskInstanceMapper.updateById(task);
int setResult = taskInstanceMapper.setFailoverByHostAndStateArray(
task.getHost(),
new int[]{TaskExecutionStatus.RUNNING_EXECUTION.getCode()},
TaskExecutionStatus.NEED_FAULT_TOLERANCE);
taskInstanceMapper.deleteById(task.getId());
Assertions.assertNotEquals(0, setResult);
}
/**
* test query by task instance id and name
*/
@Test
public void testQueryByInstanceIdAndName() {
// insert ProcessInstance
ProcessInstance processInstance = insertProcessInstance();
// insert taskInstance
TaskInstance task = insertTaskInstance(processInstance.getId());
task.setHost("111.111.11.11");
taskInstanceMapper.updateById(task);
TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(
task.getProcessInstanceId(),
task.getName());
taskInstanceMapper.deleteById(task.getId());
Assertions.assertNotEquals(null, taskInstance);
}
/**
* test query by task instance id and code
*/
@ -294,37 +215,6 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
Assertions.assertEquals(1, taskInstances.size());
}
/**
* test count task instance
*/
@Test
public void testCountTask() {
// insert ProcessInstance
ProcessInstance processInstance = insertProcessInstance();
// insert taskInstance
TaskInstance task = insertTaskInstance(processInstance.getId());
ProcessDefinition definition = new ProcessDefinition();
definition.setCode(1L);
definition.setProjectCode(1111L);
definition.setCreateTime(new Date());
definition.setUpdateTime(new Date());
processDefinitionMapper.insert(definition);
taskInstanceMapper.updateById(task);
int countTask = taskInstanceMapper.countTask(
new Long[0],
new int[0]);
int countTask2 = taskInstanceMapper.countTask(
new Long[]{definition.getProjectCode()},
new int[]{task.getId()});
taskInstanceMapper.deleteById(task.getId());
processDefinitionMapper.deleteById(definition.getId());
Assertions.assertEquals(0, countTask);
Assertions.assertEquals(0, countTask2);
}
/**
* test count task instance state by user
*/

3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -45,7 +45,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.model.TaskNode;
@ -98,8 +97,6 @@ public interface ProcessService {
void updateTaskDefinitionResources(TaskDefinition taskDefinition);
List<Integer> findTaskIdByInstanceState(int instanceId, TaskExecutionStatus state);
int deleteWorkProcessMapByParentId(int parentWorkProcessId);
ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId);

61
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -763,22 +763,27 @@ public class ProcessServiceImpl implements ProcessService {
case DYNAMIC_GENERATION:
break;
case START_FAILURE_TASK_PROCESS:
// find failed tasks and init these tasks
List<Integer> failedList =
this.findTaskIdByInstanceState(processInstance.getId(), TaskExecutionStatus.FAILURE);
List<Integer> toleranceList = this.findTaskIdByInstanceState(processInstance.getId(),
TaskExecutionStatus.NEED_FAULT_TOLERANCE);
List<Integer> killedList =
this.findTaskIdByInstanceState(processInstance.getId(), TaskExecutionStatus.KILL);
cmdParam.remove(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING);
failedList.addAll(killedList);
failedList.addAll(toleranceList);
for (Integer taskId : failedList) {
initTaskInstance(taskInstanceDao.queryById(taskId));
case RECOVER_SUSPENDED_PROCESS:
List<TaskInstance> needToStartTaskInstances = taskInstanceDao
.queryValidTaskListByWorkflowInstanceId(processInstance.getId(), processInstance.getTestFlag())
.stream()
.filter(taskInstance -> {
TaskExecutionStatus state = taskInstance.getState();
return state == TaskExecutionStatus.FAILURE
|| state == TaskExecutionStatus.PAUSE
|| state == TaskExecutionStatus.NEED_FAULT_TOLERANCE
|| state == TaskExecutionStatus.KILL;
})
.collect(Collectors.toList());
for (TaskInstance taskInstance : needToStartTaskInstances) {
initTaskInstance(taskInstance);
}
cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING,
String.join(Constants.COMMA, convertIntListToString(failedList)));
String startTaskInstanceIds = needToStartTaskInstances.stream()
.map(TaskInstance::getId)
.map(String::valueOf)
.collect(Collectors.joining(Constants.COMMA));
cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING, startTaskInstanceIds);
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
processInstance.setRunTimes(runTime + 1);
break;
@ -786,20 +791,6 @@ public class ProcessServiceImpl implements ProcessService {
break;
case RECOVER_WAITING_THREAD:
break;
case RECOVER_SUSPENDED_PROCESS:
// find pause tasks and init task's state
cmdParam.remove(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING);
List<Integer> stopNodeList = findTaskIdByInstanceState(processInstance.getId(),
TaskExecutionStatus.KILL);
for (Integer taskId : stopNodeList) {
// initialize the pause state
initTaskInstance(taskInstanceDao.queryById(taskId));
}
cmdParam.put(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING,
String.join(Constants.COMMA, convertIntListToString(stopNodeList)));
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
processInstance.setRunTimes(runTime + 1);
break;
case RECOVER_TOLERANCE_FAULT_PROCESS:
// recover tolerance fault process
processInstance.setRecovery(Flag.YES);
@ -1312,18 +1303,6 @@ public class ProcessServiceImpl implements ProcessService {
return resourceInfo;
}
/**
* get id list by task state
*
* @param instanceId instanceId
* @param state state
* @return task instance states
*/
@Override
public List<Integer> findTaskIdByInstanceState(int instanceId, TaskExecutionStatus state) {
return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.getCode());
}
/**
* delete work process map by parent process id
*

Loading…
Cancel
Save