diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 9810674863..c40a2cbba1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -27,8 +27,10 @@ import org.apache.dolphinscheduler.api.service.BaseService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionVersionService; import org.apache.dolphinscheduler.api.service.ProcessInstanceService; +import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.SchedulerService; +import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.utils.CheckUtils; import org.apache.dolphinscheduler.api.utils.FileUtils; import org.apache.dolphinscheduler.api.utils.PageInfo; @@ -54,17 +56,21 @@ import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils; +import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException; import org.apache.dolphinscheduler.common.utils.StreamUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionVersion; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; @@ -78,6 +84,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -131,6 +138,15 @@ public class ProcessDefinitionServiceImpl extends BaseService implements @Autowired private ProcessDefinitionVersionService processDefinitionVersionService; + @Autowired + private TaskDefinitionService taskDefinitionService; + + @Autowired + private ProcessTaskRelationService processTaskRelationService; + + @Autowired + private ProcessDefinitionLogMapper processDefinitionLogMapper; + @Autowired private ProcessDefinitionMapper processDefineMapper; @@ -159,6 +175,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements * @return create result code */ @Override + @Transactional(rollbackFor = Exception.class) public Map createProcessDefinition(User loginUser, String projectName, String name, @@ -176,7 +193,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements return checkResult; } - ProcessDefinition processDefine = new ProcessDefinition(); + ProcessDefinition processDefinition = new ProcessDefinition(); Date now = new Date(); ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); @@ -185,42 +202,84 @@ public class ProcessDefinitionServiceImpl extends BaseService implements return checkProcessJson; } - processDefine.setName(name); - processDefine.setReleaseState(ReleaseState.OFFLINE); - processDefine.setProjectId(project.getId()); - processDefine.setUserId(loginUser.getId()); - processDefine.setProcessDefinitionJson(processDefinitionJson); - processDefine.setDescription(desc); - processDefine.setLocations(locations); - processDefine.setConnects(connects); - processDefine.setTimeout(processData.getTimeout()); - processDefine.setTenantId(processData.getTenantId()); - processDefine.setModifyBy(loginUser.getUserName()); - processDefine.setResourceIds(getResourceIds(processData)); + Long processDefinitionCode; + try { + processDefinitionCode = SnowFlakeUtils.getInstance().nextId(); + processDefinition.setCode(processDefinitionCode); + } catch (SnowFlakeException e) { + putMsg(result, Status.CREATE_PROCESS_DEFINITION); + return result; + } + + processDefinition.setName(name); + processDefinition.setReleaseState(ReleaseState.OFFLINE); + processDefinition.setUserId(loginUser.getId()); + processDefinition.setDescription(desc); + processDefinition.setLocations(locations); + processDefinition.setConnects(connects); + processDefinition.setTimeout(processData.getTimeout()); + processDefinition.setTenantId(processData.getTenantId()); + processDefinition.setModifyBy(loginUser.getUserName()); + processDefinition.setResourceIds(getResourceIds(processData)); //custom global params List globalParamsList = processData.getGlobalParams(); if (CollectionUtils.isNotEmpty(globalParamsList)) { Set globalParamsSet = new HashSet<>(globalParamsList); globalParamsList = new ArrayList<>(globalParamsSet); - processDefine.setGlobalParamList(globalParamsList); + processDefinition.setGlobalParamList(globalParamsList); } - processDefine.setCreateTime(now); - processDefine.setUpdateTime(now); - processDefine.setFlag(Flag.YES); + processDefinition.setCreateTime(now); + processDefinition.setUpdateTime(now); + processDefinition.setFlag(Flag.YES); // save the new process definition - processDefineMapper.insert(processDefine); + processDefineMapper.insert(processDefinition); - // add process definition version - long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefine); + // parse and save the taskDefinition and processTaskRelation + try { + List taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); - processDefine.setVersion(version); + for (TaskNode task : taskNodeList) { + taskDefinitionService.createTaskDefinition(loginUser, projectName, JSONUtils.toJsonString(task)); + } + + DAG dag = genDagGraph(processDefinition); + Collection beginNode = dag.getBeginNode(); + Collection endNode = dag.getEndNode(); + + // TODO: query taskCode by projectCode and taskName - processDefineMapper.updateVersionByProcessDefinitionId(processDefine.getId(), version); + processTaskRelationService.createProcessTaskRelation( + loginUser, + name, + project.getCode(), + processDefinitionCode, + 0L, + 0L, + "0", + ""); + + } catch (Exception e) { + putMsg(result, Status.CREATE_PROCESS_DEFINITION); + return result; + } + + // save process definition log + ProcessDefinitionLog processDefinitionLog = JSONUtils.parseObject( + JSONUtils.toJsonString(processDefinition), ProcessDefinitionLog.class); + + processDefinitionLog.setOperator(loginUser.getId()); + processDefinitionLog.setOperateTime(now); + processDefinitionLogMapper.insert(processDefinitionLog); + + // add process definition version + long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition); + processDefinition.setVersion(version); + processDefineMapper.updateVersionByProcessDefinitionId(processDefinition.getId(), version); // return processDefinition object with ID - result.put(Constants.DATA_LIST, processDefine.getId()); + result.put(Constants.DATA_LIST, processDefinition.getId()); putMsg(result, Status.SUCCESS); return result; } @@ -367,7 +426,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements return checkResult; } - ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(),processDefinitionName); + ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(), processDefinitionName); if (processDefinition == null) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName); } else { @@ -523,6 +582,9 @@ public class ProcessDefinitionServiceImpl extends BaseService implements ProcessDefinition processDefinition = processDefineMapper.selectById(processDefinitionId); + // TODO: replace id to code + // ProcessDefinition processDefinition = processDefineMapper.selectByCode(processDefinitionCode); + if (processDefinition == null) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionId); return result; @@ -562,6 +624,8 @@ public class ProcessDefinitionServiceImpl extends BaseService implements } } + // TODO: replace id to code + // ProcessDefinition processDefinition = processDefineMapper.deleteByCode(processDefinitionCode); int delete = processDefineMapper.deleteById(processDefinitionId); if (delete > 0) { diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java new file mode 100644 index 0000000000..dcf99a2651 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.mapper; + +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; + +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +/** + * process definition log mapper interface + */ +public interface ProcessDefinitionLogMapper extends BaseMapper { + + /** + * query process definition log by name + * + * @param projectCode projectCode + * @param name process name + * @return process definition log list + */ + List queryByDefinitionName(@Param("projectCode") Long projectCode, + @Param("processDefinitionName") String name); + + /** + * query process definition log list + * + * @param processDefinitionCode processDefinitionCode + * @return process definition log list + */ + List queryByDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode); + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java index 36c988776f..4c61d68b34 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java @@ -34,6 +34,22 @@ import com.baomidou.mybatisplus.core.metadata.IPage; */ public interface ProcessDefinitionMapper extends BaseMapper { + /** + * query process definition by code + * + * @param code code + * @return process definition + */ + ProcessDefinition queryByCode(@Param("code") Long code); + + /** + * delete process definition by code + * + * @param code code + * @return delete result + */ + int deleteByCode(@Param("code") Long code); + /** * verify process definition by name * diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml new file mode 100644 index 0000000000..e722cf53a3 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml @@ -0,0 +1,48 @@ + + + + + + + + pd.id, pd.code, pd.name, pd.version, pd.description, pd.project_code, + pd.release_state, pd.user_id,pd.global_params, pd.flag, pd.locations, pd.connects, + pd.warning_group_id, pd.timeout, pd.tenant_id,pd.operator, pd.operate_time, pd.create_time, + pd.update_time, u.user_name,p.name as project_name + + + + + + + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml index a24fd294a7..e9a4888e1e 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml @@ -19,21 +19,34 @@ - id - , name, version, release_state, project_id, user_id, process_definition_json, description, + id, code, name, version, release_state, project_id, project_code, user_id, description, global_params, flag, locations, connects, warning_group_id, create_time, timeout, tenant_id, update_time, modify_by, resource_ids + + + + delete from t_ds_process_definition + where code = #{code} + + + +