diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index e74b2b46fc..7976882f56 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -59,6 +59,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.service.process.ProcessService; + import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -190,6 +191,7 @@ public class ProcessInstanceService extends BaseService { ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); processInstance.setWarningGroupId(processDefinition.getWarningGroupId()); + processInstance.setProcessDefinitionId(processDefinition.getId()); result.put(DATA_LIST, processInstance); putMsg(result, Status.SUCCESS); @@ -406,91 +408,123 @@ public class ProcessInstanceService extends BaseService { Flag flag, String locations, String connects) throws ParseException { Map result = new HashMap<>(); Project project = projectMapper.queryByName(projectName); - //check project permission Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Status resultEnum = (Status) checkResult.get(Constants.STATUS); if (resultEnum != Status.SUCCESS) { return checkResult; } - //check process instance exists ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); if (processInstance == null) { putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); return result; } - //check process instance status if (!processInstance.getState().typeIsFinished()) { putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), processInstance.getState().toString(), "update"); return result; } - Date schedule = null; - schedule = processInstance.getScheduleTime(); - if (scheduleTime != null) { - schedule = DateUtils.getScheduleDate(scheduleTime); + ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); + ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class); + //check workflow json is valid + result = processDefinitionService.checkProcessNodeList(processData, processInstanceJson); + if (result.get(Constants.STATUS) != Status.SUCCESS) { + return result; } - processInstance.setScheduleTime(schedule); - processInstance.setLocations(locations); - processInstance.setConnects(connects); - String globalParams = null; - String originDefParams = null; - int timeout = processInstance.getTimeout(); - ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); - if (StringUtils.isNotEmpty(processInstanceJson)) { - ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class); - //check workflow json is valid - Map checkFlowJson = processDefinitionService.checkProcessNodeList(processData, processInstanceJson); - if (checkFlowJson.get(Constants.STATUS) != Status.SUCCESS) { - return result; - } - - originDefParams = JSONUtils.toJsonString(processData.getGlobalParams()); - List globalParamList = processData.getGlobalParams(); - Map globalParamMap = Optional.ofNullable(globalParamList).orElse(Collections.emptyList()).stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); - globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, - processInstance.getCmdTypeIfComplement(), schedule); - timeout = processData.getTimeout(); - processInstance.setTimeout(timeout); - Tenant tenant = processService.getTenantForProcess(processData.getTenantId(), - processDefinition.getUserId()); - if (tenant != null) { - processInstance.setTenantCode(tenant.getTenantCode()); - } - // get the processinstancejson before saving,and then save the name and taskid - String oldJson = processInstance.getProcessInstanceJson(); - if (StringUtils.isNotEmpty(oldJson)) { - processInstanceJson = processService.changeJson(processData,oldJson); - } - processInstance.setProcessInstanceJson(processInstanceJson); - processInstance.setGlobalParams(globalParams); + Tenant tenant = processService.getTenantForProcess(processData.getTenantId(), + processDefinition.getUserId()); + // get the processinstancejson before saving,and then save the name and taskid + String oldJson = processInstance.getProcessInstanceJson(); + if (StringUtils.isNotEmpty(oldJson)) { + processInstanceJson = processService.changeJson(processData, oldJson); } - + setProcessInstance(processInstance, tenant, scheduleTime, locations, + connects, processInstanceJson, processData); int update = processService.updateProcessInstance(processInstance); int updateDefine = 1; if (Boolean.TRUE.equals(syncDefine)) { - processDefinition.setProcessDefinitionJson(processInstanceJson); - processDefinition.setGlobalParams(originDefParams); - processDefinition.setLocations(locations); - processDefinition.setConnects(connects); - processDefinition.setTimeout(timeout); - processDefinition.setUpdateTime(new Date()); - - // add process definition version - int version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition); - processDefinition.setVersion(version); - updateDefine = processDefineMapper.updateById(processDefinition); + updateDefine = syncDefinition(loginUser, project, processInstanceJson, locations, connects, + processInstance, processDefinition, processData); } if (update > 0 && updateDefine > 0) { putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR); } - return result; + } + /** + * sync definition according process instance + * + * @param loginUser + * @param project + * @param processInstanceJson + * @param locations + * @param connects + * @param processInstance + * @param processDefinition + * @param processData + * @return + */ + private int syncDefinition(User loginUser, Project project, String processInstanceJson, String locations, String connects, + ProcessInstance processInstance, ProcessDefinition processDefinition, + ProcessData processData) { + + String originDefParams = JSONUtils.toJsonString(processData.getGlobalParams()); + processDefinition.setProcessDefinitionJson(processInstanceJson); + processDefinition.setGlobalParams(originDefParams); + processDefinition.setLocations(locations); + processDefinition.setConnects(connects); + processDefinition.setTimeout(processInstance.getTimeout()); + processDefinition.setUpdateTime(new Date()); + + int updateDefine = processService.saveProcessDefinition(loginUser, project, processDefinition.getName(), + processDefinition.getDescription(), locations, connects, + processData, processDefinition); + return updateDefine; + } + + /** + * update process instance attributes + * + * @param processInstance + * @param tenant + * @param scheduleTime + * @param locations + * @param connects + * @param processInstanceJson + * @param processData + * @return false if check failed or + */ + private void setProcessInstance(ProcessInstance processInstance, Tenant tenant, + String scheduleTime, String locations, String connects, String processInstanceJson, + ProcessData processData) { + + Date schedule = processInstance.getScheduleTime(); + if (scheduleTime != null) { + schedule = DateUtils.getScheduleDate(scheduleTime); + } + processInstance.setScheduleTime(schedule); + processInstance.setLocations(locations); + processInstance.setConnects(connects); + if (StringUtils.isNotEmpty(processInstanceJson)) { + return; + } + List globalParamList = processData.getGlobalParams(); + Map globalParamMap = Optional.ofNullable(globalParamList).orElse(Collections.emptyList()).stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); + String globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, + processInstance.getCmdTypeIfComplement(), schedule); + int timeout = processData.getTimeout(); + processInstance.setTimeout(timeout); + if (tenant != null) { + processInstance.setTenantCode(tenant.getTenantCode()); + } + processInstance.setProcessInstanceJson(processInstanceJson); + processInstance.setGlobalParams(globalParams); } /** @@ -705,13 +739,9 @@ public class ProcessInstanceService extends BaseService { private static DAG processInstance2DAG(ProcessInstance processInstance) { String processDefinitionJson = processInstance.getProcessInstanceJson(); - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - List taskNodeList = processData.getTasks(); - ProcessDag processDag = DagHelper.getProcessDag(taskNodeList); - return DagHelper.buildDagGraph(processDag); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index f7049716d5..03515e325d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -38,7 +38,6 @@ import org.apache.dolphinscheduler.api.utils.exportprocess.TaskNodeParamFactory; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.FailureStrategy; -import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.TaskType; @@ -48,7 +47,6 @@ import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag; -import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.thread.Stopper; @@ -66,24 +64,24 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.service.permission.PermissionCheck; import org.apache.dolphinscheduler.service.process.ProcessService; + import java.io.BufferedOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -158,12 +156,15 @@ public class ProcessDefinitionServiceImpl extends BaseService implements @Autowired private ProcessService processService; + @Autowired + private TaskDefinitionMapper taskDefinitionMapper; + /** * create process definition * * @param loginUser login user * @param projectName project name - * @param name process definition name + * @param processDefinitionName process definition name * @param processDefinitionJson process definition json * @param desc description * @param locations locations for nodes @@ -174,7 +175,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements @Transactional(rollbackFor = Exception.class) public Map createProcessDefinition(User loginUser, String projectName, - String name, + String processDefinitionName, String processDefinitionJson, String desc, String locations, @@ -190,8 +191,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements } ProcessDefinition processDefinition = new ProcessDefinition(); - Date now = new Date(); - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); Map checkProcessJson = checkProcessNodeList(processData, processDefinitionJson); if (checkProcessJson.get(Constants.STATUS) != Status.SUCCESS) { @@ -206,68 +205,10 @@ public class ProcessDefinitionServiceImpl extends BaseService implements putMsg(result, Status.CREATE_PROCESS_DEFINITION); return result; } - - processDefinition.setName(name); - processDefinition.setVersion(1); - processDefinition.setReleaseState(ReleaseState.OFFLINE); - processDefinition.setUserId(loginUser.getId()); - processDefinition.setDescription(desc); - processDefinition.setLocations(locations); - processDefinition.setConnects(connects); - processDefinition.setTimeout(processData.getTimeout()); - processDefinition.setTenantId(processData.getTenantId()); - processDefinition.setModifyBy(loginUser.getUserName()); - - //custom global params - List globalParamsList = processData.getGlobalParams(); - if (CollectionUtils.isNotEmpty(globalParamsList)) { - Set globalParamsSet = new HashSet<>(globalParamsList); - globalParamsList = new ArrayList<>(globalParamsSet); - processDefinition.setGlobalParamList(globalParamsList); - } - processDefinition.setCreateTime(now); - processDefinition.setUpdateTime(now); - processDefinition.setFlag(Flag.YES); - - // save the new process definition - processDefinitionMapper.insert(processDefinition); - - // parse and save the taskDefinition and processTaskRelation - try { - List taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); - - for (TaskNode task : taskNodeList) { - taskDefinitionService.createTaskDefinition(loginUser, projectName, JSONUtils.toJsonString(task)); - } - - DAG dag = genDagGraph(processDefinition); - Collection beginNode = dag.getBeginNode(); - Collection endNode = dag.getEndNode(); - - // TODO: query taskCode by projectCode and taskName - - processTaskRelationService.createProcessTaskRelation( - loginUser, - name, - projectName, - processDefinitionCode, - 0L, - 0L, - "0", - ""); - - } catch (Exception e) { - putMsg(result, Status.CREATE_PROCESS_DEFINITION); - return result; - } - - // save process definition log - ProcessDefinitionLog processDefinitionLog = JSONUtils.parseObject( - JSONUtils.toJsonString(processDefinition), ProcessDefinitionLog.class); - - processDefinitionLog.setOperator(loginUser.getId()); - processDefinitionLog.setOperateTime(now); - processDefinitionLogMapper.insert(processDefinitionLog); + ProcessDefinitionLog processDefinitionLog = processService.insertProcessDefinitionLog(loginUser, processDefinitionCode, processDefinitionName, processData, + project, desc, locations, connects); + processService.switchVersion(processDefinition, processDefinitionLog); + processService.createTaskAndRelation(loginUser, projectName, "", processDefinition, processData); // return processDefinition object with ID result.put(Constants.DATA_LIST, processDefinition.getId()); @@ -275,6 +216,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements return result; } + /** * get resource ids * @@ -475,7 +417,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName()); return result; } - if (!name.equals(processDefinition.getName())) { // check whether the new process define name exist ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getId(), name); @@ -487,55 +428,11 @@ public class ProcessDefinitionServiceImpl extends BaseService implements // get the processdefinitionjson before saving,and then save the name and taskid String oldJson = processDefinition.getProcessDefinitionJson(); processDefinitionJson = processService.changeJson(processData, oldJson); - - // update TaskDefinition ProcessData newProcessData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - List taskNodeList = (newProcessData.getTasks() == null) ? new ArrayList<>() : newProcessData.getTasks(); + int saveResult = processService.saveProcessDefinition(loginUser, project, name, desc, + locations, connects, newProcessData, processDefinition); - for (TaskNode task : taskNodeList) { - // TODO update by code directly - Map stringObjectMap = taskDefinitionService.queryTaskDefinitionByName(loginUser, projectName, task.getName()); - TaskDefinition taskDefinition = (TaskDefinition) stringObjectMap.get(Constants.DATA_LIST); - taskDefinitionService.updateTaskDefinition(loginUser, projectName, taskDefinition.getCode(), JSONUtils.toJsonString(task)); - } - - List processDefinitionLogs = processDefinitionLogMapper.queryByDefinitionCode(processDefinition.getCode()); - int version = getNextVersion(processDefinitionLogs); - - Date now = new Date(); - processDefinition.setVersion(version); - processDefinition.setName(name); - processDefinition.setReleaseState(ReleaseState.OFFLINE); - processDefinition.setProjectCode(project.getCode()); - processDefinition.setDescription(desc); - processDefinition.setLocations(locations); - processDefinition.setConnects(connects); - processDefinition.setTimeout(processData.getTimeout()); - processDefinition.setTenantId(processData.getTenantId()); - - //custom global params - List globalParamsList = new ArrayList<>(); - if (CollectionUtils.isNotEmpty(processData.getGlobalParams())) { - Set userDefParamsSet = new HashSet<>(processData.getGlobalParams()); - globalParamsList = new ArrayList<>(userDefParamsSet); - } - processDefinition.setGlobalParamList(globalParamsList); - processDefinition.setUpdateTime(now); - processDefinition.setFlag(Flag.YES); - - - processDefinition.setVersion(version); - int update = processDefinitionMapper.updateById(processDefinition); - - // save processDefinitionLog - ProcessDefinitionLog processDefinitionLog = JSONUtils.parseObject( - JSONUtils.toJsonString(processDefinition), ProcessDefinitionLog.class); - - processDefinitionLog.setOperator(loginUser.getId()); - processDefinitionLog.setOperateTime(now); - int insert = processDefinitionLogMapper.insert(processDefinitionLog); - - if (update > 0 && insert > 0) { + if (saveResult > 0) { putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, processDefinition); } else { @@ -544,13 +441,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements return result; } - private int getNextVersion(List processDefinitionLogs) { - return processDefinitionLogs - .stream() - .map(ProcessDefinitionLog::getVersion) - .max((x, y) -> x > y ? x : y) - .orElse(0) + 1; - } /** * verify process definition name unique @@ -1732,18 +1622,8 @@ public class ProcessDefinitionServiceImpl extends BaseService implements , version); return result; } - - processDefinition.setVersion(processDefinitionLog.getVersion()); - processDefinition.setDescription(processDefinitionLog.getDescription()); - processDefinition.setLocations(processDefinitionLog.getLocations()); - processDefinition.setConnects(processDefinitionLog.getConnects()); - processDefinition.setTimeout(processDefinitionLog.getTimeout()); - processDefinition.setGlobalParams(processDefinitionLog.getGlobalParams()); - processDefinition.setUpdateTime(new Date()); - processDefinition.setWarningGroupId(processDefinitionLog.getWarningGroupId()); - processDefinition.setResourceIds(processDefinitionLog.getResourceIds()); - - if (processDefinitionMapper.updateById(processDefinition) > 0) { + int switchVersion = processService.switchVersion(processDefinition, processDefinitionLog); + if (switchVersion > 0) { putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index ed50788ebc..e140052807 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -47,6 +47,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.Date; import java.util.HashMap; @@ -88,6 +89,9 @@ public class TaskDefinitionServiceImpl extends BaseService implements @Autowired private ProcessDefinitionMapper processDefinitionMapper; + @Autowired + private ProcessService processService; + /** * create task definition * @@ -144,7 +148,7 @@ public class TaskDefinitionServiceImpl extends BaseService implements taskNode.getTaskTimeoutParameter().getInterval(), now, now); - taskDefinition.setResourceIds(getResourceIds(taskDefinition)); + taskDefinition.setResourceIds(processService.getResourceIds(taskDefinition)); // save the new task definition taskDefinitionMapper.insert(taskDefinition); // save task definition log @@ -159,30 +163,6 @@ public class TaskDefinitionServiceImpl extends BaseService implements return result; } - /** - * get resource ids - * - * @param taskDefinition taskDefinition - * @return resource ids - */ - private String getResourceIds(TaskDefinition taskDefinition) { - Set resourceIds = null; - // TODO modify taskDefinition.getTaskType() - AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType().getDescp(), taskDefinition.getTaskParams()); - - if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) { - resourceIds = params.getResourceFilesList(). - stream() - .filter(t -> t.getId() != 0) - .map(ResourceInfo::getId) - .collect(Collectors.toSet()); - } - if (CollectionUtils.isEmpty(resourceIds)) { - return StringUtils.EMPTY; - } - return StringUtils.join(resourceIds, ","); - } - /** * query task definition * @@ -276,38 +256,7 @@ public class TaskDefinitionServiceImpl extends BaseService implements return result; } - List taskDefinitionLogs = taskDefinitionLogMapper.queryByDefinitionCode(taskCode); - int version = taskDefinitionLogs - .stream() - .map(TaskDefinitionLog::getVersion) - .max((x, y) -> x > y ? x : y) - .orElse(0) + 1; - Date now = new Date(); - taskDefinition.setVersion(version); - taskDefinition.setCode(taskCode); - taskDefinition.setName(taskNode.getName()); - taskDefinition.setDescription(taskNode.getDesc()); - taskDefinition.setProjectCode(project.getCode()); - taskDefinition.setUserId(loginUser.getId()); - taskDefinition.setTaskType(TaskType.of(taskNode.getType())); - taskDefinition.setTaskParams(taskNode.getParams()); - taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES); - taskDefinition.setTaskPriority(taskNode.getTaskInstancePriority()); - taskDefinition.setWorkerGroup(taskNode.getWorkerGroup()); - taskDefinition.setFailRetryTimes(taskNode.getMaxRetryTimes()); - taskDefinition.setFailRetryInterval(taskNode.getRetryInterval()); - taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE); - taskDefinition.setTaskTimeoutStrategy(taskNode.getTaskTimeoutParameter().getStrategy()); - taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval()); - taskDefinition.setUpdateTime(now); - taskDefinition.setResourceIds(getResourceIds(taskDefinition)); - taskDefinitionMapper.updateById(taskDefinition); - // save task definition log - TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(); - taskDefinitionLog.set(taskDefinition); - taskDefinitionLog.setOperator(loginUser.getId()); - taskDefinitionLog.setOperateTime(now); - taskDefinitionLogMapper.insert(taskDefinitionLog); + processService.updateTaskDefinition(loginUser, project.getCode(), taskNode, taskDefinition); result.put(Constants.DATA_LIST, taskCode); putMsg(result, Status.SUCCESS); return result; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java index 677925845d..52713921e5 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java @@ -184,6 +184,8 @@ public class ProcessDefinition { @TableField(exist = false) private int warningGroupId; + public ProcessDefinition(){} + public String getName() { return name; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index f1d43a353b..aa6b4b8671 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -47,10 +47,23 @@ public class ProcessInstance { */ @TableId(value = "id", type = IdType.AUTO) private int id; + /** * process definition id + * TODO delete */ private int processDefinitionId; + + /** + * process definition code + */ + private Long processDefinitionCode; + + /** + * process definition version + */ + private int processDefinitionVersion; + /** * process state */ @@ -145,6 +158,7 @@ public class ProcessInstance { /** * process instance json + * TODO delete */ private String processInstanceJson; @@ -579,6 +593,22 @@ public class ProcessInstance { this.tenantId = tenantId; } + public Long getProcessDefinitionCode() { + return processDefinitionCode; + } + + public void setProcessDefinitionCode(Long processDefinitionCode) { + this.processDefinitionCode = processDefinitionCode; + } + + public int getProcessDefinitionVersion() { + return processDefinitionVersion; + } + + public void setProcessDefinitionVersion(int processDefinitionVersion) { + this.processDefinitionVersion = processDefinitionVersion; + } + @Override public String toString() { return "ProcessInstance{" @@ -651,6 +681,12 @@ public class ProcessInstance { + timeout + ", tenantId=" + tenantId + + ", processDefinitionCode='" + + processDefinitionCode + + '\'' + + ", processDefinitionVersion='" + + processDefinitionVersion + + '\'' + '}'; } @@ -672,4 +708,5 @@ public class ProcessInstance { public int hashCode() { return Objects.hash(id); } + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index b13ca87e38..e35cf8f03d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -58,7 +58,9 @@ public class TaskInstance implements Serializable { /** * process definition id + * TODO delete */ + @TableField(exist = false) private int processDefinitionId; /** @@ -66,6 +68,21 @@ public class TaskInstance implements Serializable { */ private int processInstanceId; + /** + * task code + */ + private long taskCode; + + /** + * process definition code + */ + private long processDefinitionCode; + + /** + * task defintion version + */ + private String taskDefinitionVersion; + /** * process instance name */ @@ -74,7 +91,9 @@ public class TaskInstance implements Serializable { /** * task json + * TODO delete */ + @TableField(exist = false) private String taskJson; /** @@ -601,4 +620,28 @@ public class TaskInstance implements Serializable { + ", delayTime=" + delayTime + '}'; } + + public long getTaskCode() { + return taskCode; + } + + public void setTaskCode(long taskCode) { + this.taskCode = taskCode; + } + + public long getProcessDefinitionCode() { + return processDefinitionCode; + } + + public void setProcessDefinitionCode(long processDefinitionCode) { + this.processDefinitionCode = processDefinitionCode; + } + + public String getTaskDefinitionVersion() { + return taskDefinitionVersion; + } + + public void setTaskDefinitionVersion(String taskDefinitionVersion) { + this.taskDefinitionVersion = taskDefinitionVersion; + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java index 80a426ad3d..18c3b969b1 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java @@ -45,6 +45,13 @@ public interface ProcessDefinitionLogMapper extends BaseMapper queryByDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode); + /** + * query max version for definition + * @param processDefinitionCode + * @return + */ + int queryMaxVersionForDefinition(@Param("processDefinitionCode") long processDefinitionCode); + /** * query the certain process definition version info by process definition code and version number * @@ -52,6 +59,6 @@ public interface ProcessDefinitionLogMapper extends BaseMapper - - - - + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index f66163541b..39ccdebb22 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -19,7 +19,7 @@ - id, name, process_definition_id, state, recovery, start_time, end_time, run_times,host, + id, name, process_definition_id, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host, command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type, warning_group_id, schedule_time, command_start_time, global_params, process_instance_json, flag, update_time, is_sub_process, executor_id, locations, connects, history_cmd, dependence_schedule_times, @@ -88,7 +88,7 @@ select state, count(0) as count from t_ds_task_instance t - left join t_ds_process_definition d on d.id=t.process_definition_id + left join t_ds_process_definition d on d.code=t.process_definition_code left join t_ds_project p on p.id=d.project_id where 1=1 @@ -98,7 +98,7 @@