Browse Source

[Improvement][API][num-7] add task delete check (#6887)

* upgrade add sub_process

* add interface method

* add interface method

* fix ut

* add task delete check

* add task delete check
3.0.0/version-upgrade
JinYong Li 3 years ago committed by GitHub
parent
commit
fc6a5116c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  2. 23
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  3. 8
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
  4. 6
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -284,6 +284,9 @@ public enum Status {
MOVE_PROCESS_TASK_RELATION_ERROR(50047, "move process task relation error", "移动任务到其他工作流错误"), MOVE_PROCESS_TASK_RELATION_ERROR(50047, "move process task relation error", "移动任务到其他工作流错误"),
DELETE_TASK_PROCESS_RELATION_ERROR(50048, "delete process task relation error", "删除工作流任务关系错误"), DELETE_TASK_PROCESS_RELATION_ERROR(50048, "delete process task relation error", "删除工作流任务关系错误"),
QUERY_TASK_PROCESS_RELATION_ERROR(50049, "query process task relation error", "查询工作流任务关系错误"), QUERY_TASK_PROCESS_RELATION_ERROR(50049, "query process task relation error", "查询工作流任务关系错误"),
TASK_DEFINE_STATE_ONLINE(50050, "task definition {0} is already on line", "任务定义[{0}]已上线"),
TASK_HAS_DOWNSTREAM(50051, "Task [{0}] exists downstream dependence", "任务[{0}]存在下游依赖"),
MAIN_TABLE_USING_VERSION(50052, "the version that the master table is using", "主表正在使用该版本"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
/** /**

23
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils; import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
@ -175,14 +176,22 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
if (result.get(Constants.STATUS) != Status.SUCCESS) { if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result; return result;
} }
// TODO if (taskCode == 0) {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode); putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition.getFlag() == Flag.YES) {
putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode);
return result;
}
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryDownstreamByTaskCode(taskCode);
if (!processTaskRelationList.isEmpty()) { if (!processTaskRelationList.isEmpty()) {
Set<Long> processDefinitionCodes = processTaskRelationList Set<Long> postTaskCodes = processTaskRelationList
.stream() .stream()
.map(ProcessTaskRelation::getProcessDefinitionCode) .map(ProcessTaskRelation::getPostTaskCode)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, StringUtils.join(processDefinitionCodes, ",")); putMsg(result, Status.TASK_HAS_DOWNSTREAM, StringUtils.join(postTaskCodes, ","));
return result; return result;
} }
int delete = taskDefinitionMapper.deleteByCode(taskCode); int delete = taskDefinitionMapper.deleteByCode(taskCode);
@ -338,6 +347,10 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
if (taskDefinition == null) { if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
} else { } else {
if (taskDefinition.getVersion() == version) {
putMsg(result, Status.MAIN_TABLE_USING_VERSION);
return result;
}
int delete = taskDefinitionLogMapper.deleteByCodeAndVersion(taskCode, version); int delete = taskDefinitionLogMapper.deleteByCodeAndVersion(taskCode, version);
if (delete > 0) { if (delete > 0) {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);

8
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java

@ -74,4 +74,12 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
* @return int * @return int
*/ */
int batchInsert(@Param("taskRelationList") List<ProcessTaskRelationLog> taskRelationList); int batchInsert(@Param("taskRelationList") List<ProcessTaskRelationLog> taskRelationList);
/**
* query downstream process task relation by taskCode
*
* @param taskCode taskCode
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryDownstreamByTaskCode(@Param("taskCode") long taskCode);
} }

6
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

@ -69,4 +69,10 @@
#{relation.conditionType},#{relation.conditionParams},#{relation.createTime},#{relation.updateTime}) #{relation.conditionType},#{relation.conditionParams},#{relation.createTime},#{relation.updateTime})
</foreach> </foreach>
</insert> </insert>
<select id="queryDownstreamByTaskCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>
from t_ds_process_task_relation
WHERE pre_task_code = #{taskCode}
</select>
</mapper> </mapper>

Loading…
Cancel
Save