From c8ee40542614fee71d6a846eadafb5d7c99da18e Mon Sep 17 00:00:00 2001 From: Tboy Date: Wed, 23 Oct 2019 20:12:15 +0800 Subject: [PATCH] check processInstance state before delete worker group (#1074) * move updateTaskState into try/catch block in case of exception * fix NPE * using conf.getInt instead of getString * for AbstractZKClient, remove the log, for it will print the same log message in createZNodePath. for AlertDao, correct the spelling. * duplicate * refactor getTaskWorkerGroupId * add friendly log * update hearbeat thread num = 1 * fix the bug when worker execute task using queue. and remove checking Tenant user anymore in TaskScheduleThread * 1. move verifyTaskInstanceIsNull after taskInstance 2. keep verifyTenantIsNull/verifyTaskInstanceIsNull clean and readable * fix the message * delete before check to avoid KeeperException$NoNodeException * fix the message * check processInstance state before delete tenant * check processInstance state before delete worker group * refactor --- .../dolphinscheduler/api/enums/Status.java | 2 ++ .../api/service/TenantService.java | 12 +---------- .../api/service/WorkerGroupService.java | 13 ++++++++++++ .../dolphinscheduler/common/Constants.java | 10 ++++++++++ .../dao/mapper/ProcessInstanceMapper.java | 5 +++++ .../dao/mapper/ProcessInstanceMapper.xml | 20 +++++++++++++++++++ 6 files changed, 51 insertions(+), 11 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 7bd7a00a74..479f38d725 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -168,6 +168,8 @@ public enum Status { DELETE_TENANT_BY_ID_FAIL_DEFINES(100143,"delete tenant by id fail, for there are {0} process definitions using it"), DELETE_TENANT_BY_ID_FAIL_USERS(100144,"delete tenant by id fail, for there are {0} users using it"), + DELETE_WORKER_GROUP_BY_ID_FAIL(100143,"delete worker group by id fail, for there are {0} process instances in executing using it"), + UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists"), // RESOURCE_EMPTY(20003, "resource file is empty"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java index 11ace50648..bdc4991953 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java @@ -290,17 +290,7 @@ public class TenantService extends BaseService{ } private List getProcessInstancesByTenant(Tenant tenant) { - int[] states = new int[]{ - ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), - ExecutionStatus.RUNNING_EXEUTION.ordinal(), - ExecutionStatus.READY_PAUSE.ordinal(), - ExecutionStatus.READY_STOP.ordinal(), - ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(), - ExecutionStatus.WAITTING_THREAD.ordinal(), - ExecutionStatus.WAITTING_DEPEND.ordinal() - }; - - return processInstanceMapper.queryByTenantIdAndStatus(tenant.getId(), states); + return processInstanceMapper.queryByTenantIdAndStatus(tenant.getId(), org.apache.dolphinscheduler.common.Constants.NOT_TERMINATED_STATES); } /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java index 3b67e02c19..80725495c7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java @@ -19,8 +19,12 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.Constants; import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -43,6 +47,9 @@ public class WorkerGroupService extends BaseService { @Autowired WorkerGroupMapper workerGroupMapper; + @Autowired + ProcessInstanceMapper processInstanceMapper; + /** * create or update a worker group * @param id @@ -142,7 +149,13 @@ public class WorkerGroupService extends BaseService { Map result = new HashMap<>(5); + List processInstances = processInstanceMapper.queryByWorkerGroupIdAndStatus(id, org.apache.dolphinscheduler.common.Constants.NOT_TERMINATED_STATES); + if(CollectionUtils.isNotEmpty(processInstances)){ + putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL, processInstances.size()); + return result; + } workerGroupMapper.deleteById(id); + processInstanceMapper.updateProcessInstanceByWorkerGroupId(id, org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_ID); putMsg(result, Status.SUCCESS); return result; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 5a1b901328..c0977c4a47 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.OSUtils; import java.util.regex.Pattern; @@ -925,4 +926,13 @@ public final class Constants { public static final String FLINK_MAIN_CLASS = "-c"; + public static final int[] NOT_TERMINATED_STATES = new int[]{ + ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), + ExecutionStatus.RUNNING_EXEUTION.ordinal(), + ExecutionStatus.READY_PAUSE.ordinal(), + ExecutionStatus.READY_STOP.ordinal(), + ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(), + ExecutionStatus.WAITTING_THREAD.ordinal(), + ExecutionStatus.WAITTING_DEPEND.ordinal() + }; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index ae78b929b9..d288658b40 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -37,6 +37,9 @@ public interface ProcessInstanceMapper extends BaseMapper { List queryByTenantIdAndStatus(@Param("tenantId") int tenantId, @Param("states") int[] states); + List queryByWorkerGroupIdAndStatus(@Param("workerGroupId") int workerGroupId, + @Param("states") int[] states); + IPage queryProcessInstanceListPaging(Page page, @Param("projectId") int projectId, @Param("processDefinitionId") Integer processDefinitionId, @@ -55,6 +58,8 @@ public interface ProcessInstanceMapper extends BaseMapper { int updateProcessInstanceByTenantId(@Param("originTenantId") int originTenantId, @Param("destTenantId") int destTenantId); + int updateProcessInstanceByWorkerGroupId(@Param("originWorkerGroupId") int originWorkerGroupId, @Param("destWorkerGroupId") int destWorkerGroupId); + List countInstanceStateByUser( @Param("startTime") Date startTime, @Param("endTime") Date endTime, diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index cd05dc56e3..75dc448883 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -34,6 +34,20 @@ order by id asc + + select t.state, count(0) as count from t_ds_process_instance t