Browse Source

check processInstance state before delete tenant (#1065)

* move updateTaskState into try/catch block in case of exception

* fix NPE

* using conf.getInt instead of getString

* for AbstractZKClient, remove the log, for it will print the same log message in createZNodePath.
for AlertDao, correct the spelling.

* duplicate

* refactor getTaskWorkerGroupId

* add friendly log

* update hearbeat thread num = 1

* fix the bug when worker execute task using queue. and remove checking Tenant user anymore in TaskScheduleThread

* 1. move verifyTaskInstanceIsNull after taskInstance
2. keep verifyTenantIsNull/verifyTaskInstanceIsNull clean and readable

* fix the message

* delete before check to avoid KeeperException$NoNodeException

* fix the message

* check processInstance state before delete tenant
pull/2/head
Tboy 5 years ago committed by bao liang
parent
commit
1f30367586
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TenantController.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  3. 30
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java
  4. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  5. 22
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TenantController.java

@ -203,7 +203,7 @@ public class TenantController extends BaseController{
@ResponseStatus(HttpStatus.OK)
public Result deleteTenantById(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "id") int id) {
logger.info("login user {}, delete tenant, tenantCode: {},", loginUser.getUserName(), id);
logger.info("login user {}, delete tenant, tenantId: {},", loginUser.getUserName(), id);
try {
Map<String, Object> result = tenantService.deleteTenantById(loginUser,id);
return returnDataList(result);

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -164,7 +164,7 @@ public enum Status {
PREVIEW_SCHEDULE_ERROR(10139,"preview schedule error"),
PARSE_TO_CRON_EXPRESSION_ERROR(10140,"parse cron to cron expression error"),
SCHEDULE_START_TIME_END_TIME_SAME(10141,"The start time must not be the same as the end"),
DELETE_TENANT_BY_ID_FAIL(100142,"delete tenant by id fail, for there are {0} process instances in executing using it"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists"),

30
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java

@ -16,18 +16,22 @@
*/
package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.Constants;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -51,6 +55,9 @@ public class TenantService extends BaseService{
@Autowired
private TenantMapper tenantMapper;
@Autowired
private ProcessInstanceMapper processInstanceMapper;
/**
* create tenant
*
@ -227,6 +234,22 @@ public class TenantService extends BaseService{
return result;
}
int[] states = new int[]{
ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.RUNNING_EXEUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal(),
ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
ExecutionStatus.WAITTING_THREAD.ordinal(),
ExecutionStatus.WAITTING_DEPEND.ordinal()
};
List<ProcessInstance> processInstances = processInstanceMapper.queryByTenantIdAndStatus(tenant.getId(), states);
if(CollectionUtils.isNotEmpty(processInstances)){
putMsg(result, Status.DELETE_TENANT_BY_ID_FAIL, processInstances.size());
return result;
}
// if resource upload startup
if (PropertyUtils.getResUploadStartupState()){
String tenantPath = HadoopUtils.getHdfsDataBasePath() + "/" + tenant.getTenantCode();
@ -249,6 +272,7 @@ public class TenantService extends BaseService{
}
tenantMapper.deleteById(id);
processInstanceMapper.updateProcessInstanceByTenantId(id, -1);
putMsg(result, Status.SUCCESS);
return result;
}

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -34,6 +34,9 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
List<ProcessInstance> queryByHostAndStatus(@Param("host") String host,
@Param("states") int[] stateArray);
List<ProcessInstance> queryByTenantIdAndStatus(@Param("tenantId") int tenantId,
@Param("states") int[] states);
IPage<ProcessInstance> queryProcessInstanceListPaging(Page<ProcessInstance> page,
@Param("projectId") int projectId,
@Param("processDefinitionId") Integer processDefinitionId,
@ -50,6 +53,8 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
int updateProcessInstanceByState(@Param("originState") ExecutionStatus originState,
@Param("destState") ExecutionStatus destState);
int updateProcessInstanceByTenantId(@Param("originTenantId") int originTenantId, @Param("destTenantId") int destTenantId);
List<ExecuteStatusCount> countInstanceStateByUser(
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,

22
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -19,6 +19,21 @@
</foreach>
order by id asc
</select>
<select id="queryByTenantIdAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select *
from t_ds_process_instance
where 1=1
<if test="tenantId != -1">
and tenant_id =#{tenantId}
</if>
and state in
<foreach collection="states" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
order by id asc
</select>
<select id="queryProcessInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select instance.*
from t_ds_process_instance instance
@ -59,6 +74,13 @@
set state = #{destState}
where state = #{originState}
</update>
<update id="updateProcessInstanceByTenantId">
update t_ds_process_instance
set tenant_id = #{destTenantId}
where tenant_id = #{originTenantId}
</update>
<select id="countInstanceStateByUser" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
select t.state, count(0) as count
from t_ds_process_instance t

Loading…
Cancel
Save