Browse Source

[Feature][JsonSplit] modify processdefinition create/delete method (#4579)

* processdefinition create/delete method

* init

* add relation parse

* delete process_definition_json
pull/3/MERGE
Simon 4 years ago committed by GitHub
parent
commit
e1e48d7e03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 112
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 51
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
  3. 16
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
  4. 48
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
  5. 23
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml

112
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -27,8 +27,10 @@ import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionVersionService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionVersionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService; import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.SchedulerService; import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.CheckUtils; import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.FileUtils; import org.apache.dolphinscheduler.api.utils.FileUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
@ -54,17 +56,21 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.common.utils.StreamUtils; import org.apache.dolphinscheduler.common.utils.StreamUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionVersion; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionVersion;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
@ -78,6 +84,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -131,6 +138,15 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Autowired @Autowired
private ProcessDefinitionVersionService processDefinitionVersionService; private ProcessDefinitionVersionService processDefinitionVersionService;
@Autowired
private TaskDefinitionService taskDefinitionService;
@Autowired
private ProcessTaskRelationService processTaskRelationService;
@Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper;
@Autowired @Autowired
private ProcessDefinitionMapper processDefineMapper; private ProcessDefinitionMapper processDefineMapper;
@ -159,6 +175,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
* @return create result code * @return create result code
*/ */
@Override @Override
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> createProcessDefinition(User loginUser, public Map<String, Object> createProcessDefinition(User loginUser,
String projectName, String projectName,
String name, String name,
@ -176,7 +193,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return checkResult; return checkResult;
} }
ProcessDefinition processDefine = new ProcessDefinition(); ProcessDefinition processDefinition = new ProcessDefinition();
Date now = new Date(); Date now = new Date();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
@ -185,42 +202,84 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return checkProcessJson; return checkProcessJson;
} }
processDefine.setName(name); Long processDefinitionCode;
processDefine.setReleaseState(ReleaseState.OFFLINE); try {
processDefine.setProjectId(project.getId()); processDefinitionCode = SnowFlakeUtils.getInstance().nextId();
processDefine.setUserId(loginUser.getId()); processDefinition.setCode(processDefinitionCode);
processDefine.setProcessDefinitionJson(processDefinitionJson); } catch (SnowFlakeException e) {
processDefine.setDescription(desc); putMsg(result, Status.CREATE_PROCESS_DEFINITION);
processDefine.setLocations(locations); return result;
processDefine.setConnects(connects); }
processDefine.setTimeout(processData.getTimeout());
processDefine.setTenantId(processData.getTenantId()); processDefinition.setName(name);
processDefine.setModifyBy(loginUser.getUserName()); processDefinition.setReleaseState(ReleaseState.OFFLINE);
processDefine.setResourceIds(getResourceIds(processData)); processDefinition.setUserId(loginUser.getId());
processDefinition.setDescription(desc);
processDefinition.setLocations(locations);
processDefinition.setConnects(connects);
processDefinition.setTimeout(processData.getTimeout());
processDefinition.setTenantId(processData.getTenantId());
processDefinition.setModifyBy(loginUser.getUserName());
processDefinition.setResourceIds(getResourceIds(processData));
//custom global params //custom global params
List<Property> globalParamsList = processData.getGlobalParams(); List<Property> globalParamsList = processData.getGlobalParams();
if (CollectionUtils.isNotEmpty(globalParamsList)) { if (CollectionUtils.isNotEmpty(globalParamsList)) {
Set<Property> globalParamsSet = new HashSet<>(globalParamsList); Set<Property> globalParamsSet = new HashSet<>(globalParamsList);
globalParamsList = new ArrayList<>(globalParamsSet); globalParamsList = new ArrayList<>(globalParamsSet);
processDefine.setGlobalParamList(globalParamsList); processDefinition.setGlobalParamList(globalParamsList);
} }
processDefine.setCreateTime(now); processDefinition.setCreateTime(now);
processDefine.setUpdateTime(now); processDefinition.setUpdateTime(now);
processDefine.setFlag(Flag.YES); processDefinition.setFlag(Flag.YES);
// save the new process definition // save the new process definition
processDefineMapper.insert(processDefine); processDefineMapper.insert(processDefinition);
// add process definition version // parse and save the taskDefinition and processTaskRelation
long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefine); try {
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
processDefine.setVersion(version); for (TaskNode task : taskNodeList) {
taskDefinitionService.createTaskDefinition(loginUser, projectName, JSONUtils.toJsonString(task));
}
DAG<String, TaskNode, TaskNodeRelation> dag = genDagGraph(processDefinition);
Collection<String> beginNode = dag.getBeginNode();
Collection<String> endNode = dag.getEndNode();
processDefineMapper.updateVersionByProcessDefinitionId(processDefine.getId(), version); // TODO: query taskCode by projectCode and taskName
processTaskRelationService.createProcessTaskRelation(
loginUser,
name,
project.getCode(),
processDefinitionCode,
0L,
0L,
"0",
"");
} catch (Exception e) {
putMsg(result, Status.CREATE_PROCESS_DEFINITION);
return result;
}
// save process definition log
ProcessDefinitionLog processDefinitionLog = JSONUtils.parseObject(
JSONUtils.toJsonString(processDefinition), ProcessDefinitionLog.class);
processDefinitionLog.setOperator(loginUser.getId());
processDefinitionLog.setOperateTime(now);
processDefinitionLogMapper.insert(processDefinitionLog);
// add process definition version
long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition);
processDefinition.setVersion(version);
processDefineMapper.updateVersionByProcessDefinitionId(processDefinition.getId(), version);
// return processDefinition object with ID // return processDefinition object with ID
result.put(Constants.DATA_LIST, processDefine.getId()); result.put(Constants.DATA_LIST, processDefinition.getId());
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
return result; return result;
} }
@ -367,7 +426,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return checkResult; return checkResult;
} }
ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(),processDefinitionName); ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(), processDefinitionName);
if (processDefinition == null) { if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName);
} else { } else {
@ -523,6 +582,9 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
ProcessDefinition processDefinition = processDefineMapper.selectById(processDefinitionId); ProcessDefinition processDefinition = processDefineMapper.selectById(processDefinitionId);
// TODO: replace id to code
// ProcessDefinition processDefinition = processDefineMapper.selectByCode(processDefinitionCode);
if (processDefinition == null) { if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionId); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionId);
return result; return result;
@ -562,6 +624,8 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
} }
} }
// TODO: replace id to code
// ProcessDefinition processDefinition = processDefineMapper.deleteByCode(processDefinitionCode);
int delete = processDefineMapper.deleteById(processDefinitionId); int delete = processDefineMapper.deleteById(processDefinitionId);
if (delete > 0) { if (delete > 0) {

51
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* process definition log mapper interface
*/
public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinitionLog> {
/**
* query process definition log by name
*
* @param projectCode projectCode
* @param name process name
* @return process definition log list
*/
List<ProcessDefinitionLog> queryByDefinitionName(@Param("projectCode") Long projectCode,
@Param("processDefinitionName") String name);
/**
* query process definition log list
*
* @param processDefinitionCode processDefinitionCode
* @return process definition log list
*/
List<ProcessDefinitionLog> queryByDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
}

16
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java

@ -34,6 +34,22 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
*/ */
public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> { public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
/**
* query process definition by code
*
* @param code code
* @return process definition
*/
ProcessDefinition queryByCode(@Param("code") Long code);
/**
* delete process definition by code
*
* @param code code
* @return delete result
*/
int deleteByCode(@Param("code") Long code);
/** /**
* verify process definition by name * verify process definition by name
* *

48
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml

@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper">
<sql id="baseSql">
pd.id, pd.code, pd.name, pd.version, pd.description, pd.project_code,
pd.release_state, pd.user_id,pd.global_params, pd.flag, pd.locations, pd.connects,
pd.warning_group_id, pd.timeout, pd.tenant_id,pd.operator, pd.operate_time, pd.create_time,
pd.update_time, u.user_name,p.name as project_name
</sql>
<select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
select
<include refid="baseSql"/>
from t_ds_process_definition_log pd
JOIN t_ds_user u ON pd.user_id = u.id
JOIN t_ds_project p ON pd.project_code = p.code
WHERE p.code = #{projectCode}
and pd.name = #{processDefinitionName}
</select>
<select id="queryByDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
select
<include refid="baseSql"/>
from t_ds_process_definition_log pd
JOIN t_ds_user u ON pd.user_id = u.id
JOIN t_ds_project p ON pd.project_code = p.code
WHERE pd.code = #{processDefinitionCode}
</select>
</mapper>

23
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml

@ -19,21 +19,34 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper"> <mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper">
<sql id="baseSql"> <sql id="baseSql">
id id, code, name, version, release_state, project_id, project_code, user_id, description,
, name, version, release_state, project_id, user_id, process_definition_json, description,
global_params, flag, locations, connects, warning_group_id, create_time, timeout, global_params, flag, locations, connects, warning_group_id, create_time, timeout,
tenant_id, update_time, modify_by, resource_ids tenant_id, update_time, modify_by, resource_ids
</sql> </sql>
<select id="verifyByDefineName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition"> <select id="verifyByDefineName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
select pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.process_definition_json, pd.description, select pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.description,
pd.global_params, pd.flag, pd.locations, pd.connects, pd.warning_group_id, pd.create_time, pd.timeout, pd.global_params, pd.flag, pd.locations, pd.connects, pd.warning_group_id, pd.create_time, pd.timeout,
pd.tenant_id, pd.update_time, pd.modify_by, pd.resource_ids pd.tenant_id, pd.update_time, pd.modify_by, pd.resource_ids
from t_ds_process_definition pd from t_ds_process_definition pd
WHERE pd.project_id = #{projectId} WHERE pd.project_id = #{projectId}
and pd.name = #{processDefinitionName} and pd.name = #{processDefinitionName}
</select> </select>
<delete id="deleteByCode">
delete from t_ds_process_definition
where code = #{code}
</delete>
<select id="queryByCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
select
<include refid="baseSql"/>
from t_ds_process_definition
where code = #{code}
</select>
<select id="queryByDefineName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition"> <select id="queryByDefineName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
select pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.process_definition_json, pd.description, select pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.description,
pd.global_params, pd.flag, pd.locations, pd.connects, pd.warning_group_id, pd.create_time, pd.timeout, pd.global_params, pd.flag, pd.locations, pd.connects, pd.warning_group_id, pd.create_time, pd.timeout,
pd.tenant_id, pd.update_time, pd.modify_by, pd.resource_ids, pd.tenant_id, pd.update_time, pd.modify_by, pd.resource_ids,
u.user_name,p.name as project_name,t.tenant_code,q.queue,q.queue_name u.user_name,p.name as project_name,t.tenant_code,q.queue,q.queue_name
@ -101,7 +114,7 @@
</select> </select>
<select id="queryByDefineId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition"> <select id="queryByDefineId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
SELECT SELECT
pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.process_definition_json, pd.description, pd.id, pd.name, pd.version, pd.release_state, pd.project_id, pd.user_id, pd.description,
pd.global_params, pd.flag, pd.locations, pd.connects, pd.warning_group_id, pd.create_time, pd.timeout, pd.global_params, pd.flag, pd.locations, pd.connects, pd.warning_group_id, pd.create_time, pd.timeout,
pd.tenant_id, pd.update_time, pd.modify_by, pd.resource_ids, pd.tenant_id, pd.update_time, pd.modify_by, pd.resource_ids,
u.user_name, u.user_name,

Loading…
Cancel
Save