Browse Source

check processInstance state before delete worker group (#1074)

* move updateTaskState into try/catch block in case of exception

* fix NPE

* using conf.getInt instead of getString

* for AbstractZKClient, remove the log, for it will print the same log message in createZNodePath.
for AlertDao, correct the spelling.

* duplicate

* refactor getTaskWorkerGroupId

* add friendly log

* update hearbeat thread num = 1

* fix the bug when worker execute task using queue. and remove checking Tenant user anymore in TaskScheduleThread

* 1. move verifyTaskInstanceIsNull after taskInstance
2. keep verifyTenantIsNull/verifyTaskInstanceIsNull clean and readable

* fix the message

* delete before check to avoid KeeperException$NoNodeException

* fix the message

* check processInstance state before delete tenant

* check processInstance state before delete worker group

* refactor
pull/2/head
Tboy 5 years ago committed by bao liang
parent
commit
c8ee405426
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  2. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java
  3. 13
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
  4. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  5. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  6. 20
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -168,6 +168,8 @@ public enum Status {
DELETE_TENANT_BY_ID_FAIL_DEFINES(100143,"delete tenant by id fail, for there are {0} process definitions using it"),
DELETE_TENANT_BY_ID_FAIL_USERS(100144,"delete tenant by id fail, for there are {0} users using it"),
DELETE_WORKER_GROUP_BY_ID_FAIL(100143,"delete worker group by id fail, for there are {0} process instances in executing using it"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists"),
// RESOURCE_EMPTY(20003, "resource file is empty"),

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java

@ -290,17 +290,7 @@ public class TenantService extends BaseService{
}
private List<ProcessInstance> getProcessInstancesByTenant(Tenant tenant) {
int[] states = new int[]{
ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.RUNNING_EXEUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal(),
ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
ExecutionStatus.WAITTING_THREAD.ordinal(),
ExecutionStatus.WAITTING_DEPEND.ordinal()
};
return processInstanceMapper.queryByTenantIdAndStatus(tenant.getId(), states);
return processInstanceMapper.queryByTenantIdAndStatus(tenant.getId(), org.apache.dolphinscheduler.common.Constants.NOT_TERMINATED_STATES);
}
/**

13
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java

@ -19,8 +19,12 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.Constants;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@ -43,6 +47,9 @@ public class WorkerGroupService extends BaseService {
@Autowired
WorkerGroupMapper workerGroupMapper;
@Autowired
ProcessInstanceMapper processInstanceMapper;
/**
* create or update a worker group
* @param id
@ -142,7 +149,13 @@ public class WorkerGroupService extends BaseService {
Map<String, Object> result = new HashMap<>(5);
List<ProcessInstance> processInstances = processInstanceMapper.queryByWorkerGroupIdAndStatus(id, org.apache.dolphinscheduler.common.Constants.NOT_TERMINATED_STATES);
if(CollectionUtils.isNotEmpty(processInstances)){
putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL, processInstances.size());
return result;
}
workerGroupMapper.deleteById(id);
processInstanceMapper.updateProcessInstanceByWorkerGroupId(id, org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_ID);
putMsg(result, Status.SUCCESS);
return result;
}

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import java.util.regex.Pattern;
@ -925,4 +926,13 @@ public final class Constants {
public static final String FLINK_MAIN_CLASS = "-c";
public static final int[] NOT_TERMINATED_STATES = new int[]{
ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.RUNNING_EXEUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal(),
ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
ExecutionStatus.WAITTING_THREAD.ordinal(),
ExecutionStatus.WAITTING_DEPEND.ordinal()
};
}

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -37,6 +37,9 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
List<ProcessInstance> queryByTenantIdAndStatus(@Param("tenantId") int tenantId,
@Param("states") int[] states);
List<ProcessInstance> queryByWorkerGroupIdAndStatus(@Param("workerGroupId") int workerGroupId,
@Param("states") int[] states);
IPage<ProcessInstance> queryProcessInstanceListPaging(Page<ProcessInstance> page,
@Param("projectId") int projectId,
@Param("processDefinitionId") Integer processDefinitionId,
@ -55,6 +58,8 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
int updateProcessInstanceByTenantId(@Param("originTenantId") int originTenantId, @Param("destTenantId") int destTenantId);
int updateProcessInstanceByWorkerGroupId(@Param("originWorkerGroupId") int originWorkerGroupId, @Param("destWorkerGroupId") int destWorkerGroupId);
List<ExecuteStatusCount> countInstanceStateByUser(
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,

20
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -34,6 +34,20 @@
order by id asc
</select>
<select id="queryByWorkerGroupIdAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select *
from t_ds_process_instance
where 1=1
<if test="workerGroupId != -1">
and worker_group_id =#{workerGroupId}
</if>
and state in
<foreach collection="states" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
order by id asc
</select>
<select id="queryProcessInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select instance.*
from t_ds_process_instance instance
@ -81,6 +95,12 @@
where tenant_id = #{originTenantId}
</update>
<update id="updateProcessInstanceByWorkerGroupId">
update t_ds_process_instance
set worker_group_id = #{destWorkerGroupId}
where worker_group_id = #{originWorkerGroupId}
</update>
<select id="countInstanceStateByUser" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
select t.state, count(0) as count
from t_ds_process_instance t

Loading…
Cancel
Save