Browse Source

Fix kill dynamic task doesn't kill the wait to run workflow instances (#15896)

3.2.2-release-bak
Wenjun Ruan 6 months ago committed by GitHub
parent
commit
b3b8c0784d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
  2. 7
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
  3. 11
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
  4. 51
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java

@ -34,6 +34,7 @@ public interface CommandMapper extends BaseMapper<Command> {
/** /**
* count command state * count command state
*
* @param startTime startTime * @param startTime startTime
* @param endTime endTime * @param endTime endTime
* @param projectCodes projectCodes * @param projectCodes projectCodes
@ -46,15 +47,19 @@ public interface CommandMapper extends BaseMapper<Command> {
/** /**
* query command page * query command page
*
* @return * @return
*/ */
List<Command> queryCommandPage(@Param("limit") int limit, @Param("offset") int offset); List<Command> queryCommandPage(@Param("limit") int limit, @Param("offset") int offset);
/** /**
* query command page by slot * query command page by slot
*
* @return command list * @return command list
*/ */
List<Command> queryCommandPageBySlot(@Param("limit") int limit, List<Command> queryCommandPageBySlot(@Param("limit") int limit,
@Param("masterCount") int masterCount, @Param("masterCount") int masterCount,
@Param("thisMasterSlot") int thisMasterSlot); @Param("thisMasterSlot") int thisMasterSlot);
void deleteByWorkflowInstanceIds(@Param("workflowInstanceIds") List<Integer> workflowInstanceIds);
} }

7
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml

@ -47,4 +47,11 @@
order by process_instance_priority, id asc order by process_instance_priority, id asc
limit #{limit} limit #{limit}
</select> </select>
<delete id="deleteByWorkflowInstanceIds" >
delete from t_ds_command
where process_instance_id in
<foreach collection="workflowInstanceIds" index="index" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</delete>
</mapper> </mapper>

11
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.dao.mapper; package org.apache.dolphinscheduler.dao.mapper;
import static com.google.common.truth.Truth.assertThat;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@ -173,6 +175,14 @@ public class CommandMapperTest extends BaseDaoTest {
toTestQueryCommandPageBySlot(masterCount, thisMasterSlot); toTestQueryCommandPageBySlot(masterCount, thisMasterSlot);
} }
@Test
void deleteByWorkflowInstanceIds() {
Command command = createCommand();
assertThat(commandMapper.selectList(null)).isNotEmpty();
commandMapper.deleteByWorkflowInstanceIds(Lists.newArrayList(command.getProcessInstanceId()));
assertThat(commandMapper.selectList(null)).isEmpty();
}
private boolean toTestQueryCommandPageBySlot(int masterCount, int thisMasterSlot) { private boolean toTestQueryCommandPageBySlot(int masterCount, int thisMasterSlot) {
Command command = createCommand(); Command command = createCommand();
Integer id = command.getId(); Integer id = command.getId();
@ -280,5 +290,4 @@ public class CommandMapperTest extends BaseDaoTest {
return command; return command;
} }
} }

51
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java

@ -252,12 +252,61 @@ public class DynamicLogicTask extends BaseAsyncLogicTask<DynamicParameters> {
@Override @Override
public void kill() { public void kill() {
try { try {
changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus.READY_STOP); doKillSubWorkflowInstances();
} catch (MasterTaskExecuteException e) { } catch (MasterTaskExecuteException e) {
log.error("kill {} error", taskInstance.getName(), e); log.error("kill {} error", taskInstance.getName(), e);
} }
} }
private void doKillSubWorkflowInstances() throws MasterTaskExecuteException {
List<ProcessInstance> existsSubProcessInstanceList =
subWorkflowService.getAllDynamicSubWorkflow(processInstance.getId(), taskInstance.getTaskCode());
if (CollectionUtils.isEmpty(existsSubProcessInstanceList)) {
return;
}
commandMapper.deleteByWorkflowInstanceIds(
existsSubProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));
List<ProcessInstance> runningSubProcessInstanceList =
subWorkflowService.filterRunningProcessInstances(existsSubProcessInstanceList);
doKillRunningSubWorkflowInstances(runningSubProcessInstanceList);
List<ProcessInstance> waitToRunProcessInstances =
subWorkflowService.filterWaitToRunProcessInstances(existsSubProcessInstanceList);
doKillWaitToRunSubWorkflowInstances(waitToRunProcessInstances);
this.haveBeenCanceled = true;
}
private void doKillRunningSubWorkflowInstances(List<ProcessInstance> runningSubProcessInstanceList) throws MasterTaskExecuteException {
for (ProcessInstance subProcessInstance : runningSubProcessInstanceList) {
subProcessInstance.setState(WorkflowExecutionStatus.READY_STOP);
processInstanceDao.updateById(subProcessInstance);
if (subProcessInstance.getState().isFinished()) {
log.info("The process instance [{}] is finished, no need to stop", subProcessInstance.getId());
continue;
}
try {
sendToSubProcess(taskExecutionContext, subProcessInstance);
log.info("Success send [{}] request to SubWorkflow's master: {}", WorkflowExecutionStatus.READY_STOP,
subProcessInstance.getHost());
} catch (Exception e) {
throw new MasterTaskExecuteException(
String.format("Send stop request to SubWorkflow's master: %s failed",
subProcessInstance.getHost()),
e);
}
}
}
private void doKillWaitToRunSubWorkflowInstances(List<ProcessInstance> waitToRunWorkflowInstances) {
for (ProcessInstance subProcessInstance : waitToRunWorkflowInstances) {
subProcessInstance.setState(WorkflowExecutionStatus.STOP);
processInstanceDao.updateById(subProcessInstance);
}
}
private void changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus stopStatus) throws MasterTaskExecuteException { private void changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus stopStatus) throws MasterTaskExecuteException {
this.haveBeenCanceled = true; this.haveBeenCanceled = true;
List<ProcessInstance> existsSubProcessInstanceList = List<ProcessInstance> existsSubProcessInstanceList =

Loading…
Cancel
Save