Browse Source

[Bug] Disable delete a specific workflow version when exist workflow instance under this version which is not finish (#15730)

dev_wenjun_refactorMaster
Wenjun Ruan 8 months ago committed by GitHub
parent
commit
56a834b48b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  2. 13
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  3. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  4. 54
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  5. 7
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  6. 18
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
  7. 15
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java
  8. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  9. 10
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
  10. 10
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
  11. 15
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  12. 104
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImplTest.java

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -333,13 +333,13 @@ public class ProcessDefinitionController extends BaseController {
@DeleteMapping(value = "/{code}/versions/{version}")
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_PROCESS_DEFINITION_VERSION_ERROR)
public Result deleteProcessDefinitionVersion(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
public Result<Void> deleteProcessDefinitionVersion(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "code") long code,
@PathVariable(value = "version") int version) {
Map<String, Object> result =
processDefinitionService.deleteProcessDefinitionVersion(loginUser, projectCode, code, version);
return returnDataList(result);
@PathVariable(value = "code") long workflowDefinitionCode,
@PathVariable(value = "version") int workflowDefinitionVersion) {
processDefinitionService.deleteProcessDefinitionVersion(loginUser, projectCode, workflowDefinitionCode,
workflowDefinitionVersion);
return Result.success();
}
@Operation(summary = "release", description = "RELEASE_PROCESS_DEFINITION_NOTES")

13
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -150,6 +150,7 @@ public interface ProcessDefinitionService {
long code);
Optional<ProcessDefinition> queryWorkflowDefinition(long workflowDefinitionCode, int workflowDefinitionVersion);
ProcessDefinition queryWorkflowDefinitionThrowExceptionIfNotFound(long workflowDefinitionCode,
int workflowDefinitionVersion);
@ -387,14 +388,13 @@ public interface ProcessDefinitionService {
*
* @param loginUser login user info to check auth
* @param projectCode project code
* @param code process definition code
* @param version version number
* @return delele result code
* @param workflowDefinitionCode process definition code
* @param workflowDefinitionVersion version number
*/
Map<String, Object> deleteProcessDefinitionVersion(User loginUser,
void deleteProcessDefinitionVersion(User loginUser,
long projectCode,
long code,
int version);
long workflowDefinitionCode,
int workflowDefinitionVersion);
/**
* update process definition basic info, not including task definition, task relation and location.
@ -420,6 +420,7 @@ public interface ProcessDefinitionService {
/**
* view process variables
*
* @param loginUser login user
* @param projectCode project code
* @param code process definition code

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -216,6 +216,18 @@ public interface ProcessInstanceService {
List<ProcessInstance> queryByProcessDefineCodeAndStatus(Long processDefinitionCode,
int[] states);
/**
* query process instance by processDefinitionCode and stateArray
*
* @param workflowDefinitionCode workflowDefinitionCode
* @param workflowDefinitionVersion workflowDefinitionVersion
* @param states states array
* @return process instance list
*/
List<ProcessInstance> queryByWorkflowCodeVersionStatus(Long workflowDefinitionCode,
int workflowDefinitionVersion,
int[] states);
/**
* query process instance by processDefinitionCode
*

54
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -70,6 +70,7 @@ import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@ -817,7 +818,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* Task want to delete whether used in other task, should throw exception when have be used.
*
* <p>
* This function avoid delete task already dependencies by other tasks by accident.
*
* @param processDefinition ProcessDefinition you change task definition and task relation
@ -999,7 +1000,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* Process definition want to delete whether used in other task, should throw exception when have be used.
*
* <p>
* This function avoid delete process definition already dependencies by other tasks by accident.
*
* @param processDefinition ProcessDefinition you change task definition and task relation
@ -1987,6 +1988,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* batch move process definition
* Will be deleted
*
* @param loginUser loginUser
* @param projectCode projectCode
* @param codes processDefinitionCodes
@ -2160,6 +2162,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* get new Task name or Process name when copy or import operate
*
* @param originalName Task or Process original name
* @param suffix "_copy_" or "_import_"
* @return new name
@ -2308,48 +2311,38 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @param projectCode project code
* @param code process definition code
* @param version version number
* @return delete result code
*/
@Override
@Transactional
public Map<String, Object> deleteProcessDefinitionVersion(User loginUser, long projectCode, long code,
public void deleteProcessDefinitionVersion(User loginUser,
long projectCode,
long code,
int version) {
Project project = projectMapper.queryByCode(projectCode);
// check if user have write perm for project
Map<String, Object> result = new HashMap<>();
boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result);
if (!hasProjectAndWritePerm) {
return result;
}
projectService.checkHasProjectWritePermissionThrowException(loginUser, projectCode);
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
log.error("Process definition does not exist, code:{}.", code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
} else {
throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, code);
}
if (processDefinition.getVersion() == version) {
log.warn(
"Process definition can not be deleted due to version is being used, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
putMsg(result, Status.MAIN_TABLE_USING_VERSION);
return result;
log.warn("This version: {} of workflow: {} is the main version cannot delete by version", code, version);
throw new ServiceException(Status.MAIN_TABLE_USING_VERSION);
}
// check whether there exist running workflow instance under the process definition
List<ProcessInstance> workflowInstances = processInstanceService.queryByWorkflowCodeVersionStatus(
code,
version,
WorkflowExecutionStatus.getNotTerminalStatus());
if (CollectionUtils.isNotEmpty(workflowInstances)) {
throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_EXECUTING_FAIL, workflowInstances.size());
}
int deleteLog = processDefinitionLogMapper.deleteByProcessDefinitionCodeAndVersion(code, version);
int deleteRelationLog = processTaskRelationLogMapper.deleteByCode(code, version);
if (deleteLog == 0 || deleteRelationLog == 0) {
log.error(
"Delete process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
}
log.info(
"Delete process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
putMsg(result, Status.SUCCESS);
}
return result;
log.info("Delete version: {} of workflow: {}, projectCode:{}.", version, code, projectCode);
}
private void updateWorkflowValid(User user, ProcessDefinition oldProcessDefinition,
@ -2546,6 +2539,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* view process variables
*
* @param loginUser login user
* @param projectCode project code
* @param code process definition code

7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -1064,6 +1064,13 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return processInstanceMapper.queryByProcessDefineCodeAndStatus(processDefinitionCode, states);
}
@Override
public List<ProcessInstance> queryByWorkflowCodeVersionStatus(Long workflowDefinitionCode,
int workflowDefinitionVersion, int[] states) {
return processInstanceDao.queryByWorkflowCodeVersionStatus(workflowDefinitionCode, workflowDefinitionVersion,
states);
}
/**
* query process instance by processDefinitionCode
*

18
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.api.controller;
import static org.mockito.Mockito.doNothing;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@ -183,7 +185,7 @@ public class ProcessDefinitionControllerTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
Mockito.doNothing().when(processDefinitionService)
doNothing().when(processDefinitionService)
.offlineWorkflowDefinition(user, projectCode, id);
Result<Boolean> response =
processDefinitionController.releaseProcessDefinition(user, projectCode, id, ReleaseState.OFFLINE);
@ -383,7 +385,7 @@ public class ProcessDefinitionControllerTest {
String processDefinitionIds = "1,2";
long projectCode = 1L;
HttpServletResponse response = new MockHttpServletResponse();
Mockito.doNothing().when(this.processDefinitionService).batchExportProcessDefinitionByCodes(user, projectCode,
doNothing().when(this.processDefinitionService).batchExportProcessDefinitionByCodes(user, projectCode,
processDefinitionIds, response);
processDefinitionController.batchExportProcessDefinitionByCodes(user, projectCode, processDefinitionIds,
response);
@ -420,13 +422,11 @@ public class ProcessDefinitionControllerTest {
@Test
public void testDeleteProcessDefinitionVersion() {
long projectCode = 1L;
Map<String, Object> resultMap = new HashMap<>();
putMsg(resultMap, Status.SUCCESS);
Mockito.when(processDefinitionService.deleteProcessDefinitionVersion(
user, projectCode, 1, 10)).thenReturn(resultMap);
Result result = processDefinitionController.deleteProcessDefinitionVersion(
user, projectCode, 1, 10);
Assertions.assertEquals(Status.SUCCESS.getCode(), (int) result.getCode());
long workflowCode = 1L;
int workflowVersion = 10;
doNothing().when(processDefinitionService).deleteProcessDefinitionVersion(user, projectCode, workflowCode,
workflowVersion);
processDefinitionController.deleteProcessDefinitionVersion(user, projectCode, workflowCode, workflowVersion);
}
@Test

15
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java

@ -52,6 +52,17 @@ public enum WorkflowExecutionStatus {
READY_STOP.getCode()
};
private static final int[] NOT_TERMINAL_STATUS = new int[]{
SUBMITTED_SUCCESS.getCode(),
RUNNING_EXECUTION.getCode(),
DELAY_EXECUTION.getCode(),
READY_PAUSE.getCode(),
READY_STOP.getCode(),
SERIAL_WAIT.getCode(),
READY_BLOCK.getCode(),
WAIT_TO_RUN.getCode()
};
static {
for (WorkflowExecutionStatus executionStatus : WorkflowExecutionStatus.values()) {
CODE_MAP.put(executionStatus.getCode(), executionStatus);
@ -116,6 +127,10 @@ public enum WorkflowExecutionStatus {
return NEED_FAILOVER_STATES;
}
public static int[] getNotTerminalStatus() {
return NOT_TERMINAL_STATUS;
}
@EnumValue
private final int code;

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -259,6 +259,10 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
List<ProcessInstance> queryByProcessDefineCodeAndStatus(@Param("processDefinitionCode") Long processDefinitionCode,
@Param("states") int[] states);
List<ProcessInstance> queryByWorkflowCodeVersionStatus(@Param("workflowDefinitionCode") long workflowDefinitionCode,
@Param("workflowDefinitionVersion") int workflowDefinitionVersion,
@Param("states") int[] states);
List<ProcessInstance> queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(@Param("processDefinitionCode") Long processDefinitionCode,
@Param("processDefinitionVersion") int processDefinitionVersion,
@Param("states") int[] states,

10
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java

@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.ibatis.annotations.Param;
import java.util.List;
public interface ProcessInstanceDao extends IDao<ProcessInstance> {
@ -64,7 +64,7 @@ public interface ProcessInstanceDao extends IDao<ProcessInstance> {
* @param definitionCode definitionCode
* @return process instance
*/
ProcessInstance queryFirstScheduleProcessInstance(@Param("processDefinitionCode") Long definitionCode);
ProcessInstance queryFirstScheduleProcessInstance(Long definitionCode);
/**
* query first manual process instance
@ -72,7 +72,11 @@ public interface ProcessInstanceDao extends IDao<ProcessInstance> {
* @param definitionCode definitionCode
* @return process instance
*/
ProcessInstance queryFirstStartProcessInstance(@Param("processDefinitionCode") Long definitionCode);
ProcessInstance queryFirstStartProcessInstance(Long definitionCode);
ProcessInstance querySubProcessInstanceByParentId(Integer processInstanceId, Integer taskInstanceId);
List<ProcessInstance> queryByWorkflowCodeVersionStatus(Long workflowDefinitionCode,
int workflowDefinitionVersion,
int[] states);
}

10
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java

@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import java.util.List;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@ -129,4 +131,12 @@ public class ProcessInstanceDaoImpl extends BaseDao<ProcessInstance, ProcessInst
processInstance = queryById(processInstanceMap.getProcessInstanceId());
return processInstance;
}
@Override
public List<ProcessInstance> queryByWorkflowCodeVersionStatus(Long workflowDefinitionCode,
int workflowDefinitionVersion,
int[] states) {
return mybatisMapper.queryByWorkflowCodeVersionStatus(workflowDefinitionCode, workflowDefinitionVersion,
states);
}
}

15
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -259,6 +259,21 @@
</if>
order by id asc
</select>
<select id="queryByWorkflowCodeVersionStatus" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_code=#{workflowDefinitionCode}
and process_definition_version = #{workflowDefinitionVersion}
<if test="states != null and states.length != 0">
and state in
<foreach collection="states" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
</select>
<select id="queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select

104
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImplTest.java

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import static org.apache.commons.collections4.CollectionUtils.isEmpty;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
class ProcessInstanceDaoImplTest extends BaseDaoTest {
@Autowired
private ProcessInstanceDao processInstanceDao;
@Test
void queryByWorkflowCodeVersionStatus_EMPTY_INSTANCE() {
long workflowDefinitionCode = 1L;
int workflowDefinitionVersion = 1;
int[] status = WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState();
assertTrue(isEmpty(processInstanceDao.queryByWorkflowCodeVersionStatus(workflowDefinitionCode,
workflowDefinitionVersion, status)));
}
@Test
void queryByWorkflowCodeVersionStatus_EXIST_NOT_FINISH_INSTANCE() {
long workflowDefinitionCode = 1L;
int workflowDefinitionVersion = 1;
int[] status = WorkflowExecutionStatus.getNotTerminalStatus();
assertTrue(isEmpty(processInstanceDao.queryByWorkflowCodeVersionStatus(workflowDefinitionCode,
workflowDefinitionVersion, status)));
processInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion,
WorkflowExecutionStatus.SUBMITTED_SUCCESS));
processInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion,
WorkflowExecutionStatus.RUNNING_EXECUTION));
processInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion,
WorkflowExecutionStatus.DELAY_EXECUTION));
processInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion,
WorkflowExecutionStatus.READY_PAUSE));
processInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion,
WorkflowExecutionStatus.READY_STOP));
processInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion,
WorkflowExecutionStatus.SERIAL_WAIT));
processInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion,
WorkflowExecutionStatus.READY_BLOCK));
processInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion,
WorkflowExecutionStatus.WAIT_TO_RUN));
assertEquals(8, processInstanceDao
.queryByWorkflowCodeVersionStatus(workflowDefinitionCode, workflowDefinitionVersion, status).size());
}
@Test
void queryByWorkflowCodeVersionStatus_EXIST_FINISH_INSTANCE() {
long workflowDefinitionCode = 1L;
int workflowDefinitionVersion = 1;
int[] status = WorkflowExecutionStatus.getNotTerminalStatus();
processInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion,
WorkflowExecutionStatus.PAUSE));
processInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion,
WorkflowExecutionStatus.STOP));
processInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion,
WorkflowExecutionStatus.FAILURE));
processInstanceDao.insert(createWorkflowInstance(workflowDefinitionCode, workflowDefinitionVersion,
WorkflowExecutionStatus.SUCCESS));
assertTrue(isEmpty(processInstanceDao.queryByWorkflowCodeVersionStatus(workflowDefinitionCode,
workflowDefinitionVersion, status)));
}
private ProcessInstance createWorkflowInstance(Long workflowDefinitionCode, int workflowDefinitionVersion,
WorkflowExecutionStatus status) {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setName("WorkflowInstance" + System.currentTimeMillis());
processInstance.setProcessDefinitionCode(workflowDefinitionCode);
processInstance.setProcessDefinitionVersion(workflowDefinitionVersion);
processInstance.setState(status);
return processInstance;
}
}
Loading…
Cancel
Save