Browse Source

[Feature][JsonSplit] refactor process definition update (#4708)

* add code in task_instance and process instance

* delete process_definition_id in t_ds_task_instance

* add task_code task_definition_version process_definition_code in task instance

* add task_code task_definition_version process_definition_code in task instance

* refactor process instance

* refactor process instance update

* refactor json-split for process definition and task definition
refactor process instance update

* refactor json-split for process definition and task definition
refactor process instance update

* code style

* code style

* code style

* code style

* refactor code
pull/3/MERGE
bao liang 4 years ago committed by GitHub
parent
commit
f31eee4341
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 146
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  2. 154
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  3. 63
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  4. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
  5. 37
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
  6. 43
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  7. 11
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
  8. 10
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
  9. 4
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  10. 10
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
  11. 248
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  12. 2
      sql/dolphinscheduler-postgre.sql
  13. 4
      sql/dolphinscheduler_mysql.sql

146
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -59,6 +59,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
@ -190,6 +191,7 @@ public class ProcessInstanceService extends BaseService {
ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
processInstance.setWarningGroupId(processDefinition.getWarningGroupId()); processInstance.setWarningGroupId(processDefinition.getWarningGroupId());
processInstance.setProcessDefinitionId(processDefinition.getId());
result.put(DATA_LIST, processInstance); result.put(DATA_LIST, processInstance);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
@ -406,91 +408,123 @@ public class ProcessInstanceService extends BaseService {
Flag flag, String locations, String connects) throws ParseException { Flag flag, String locations, String connects) throws ParseException {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName); Project project = projectMapper.queryByName(projectName);
//check project permission //check project permission
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS); Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) { if (resultEnum != Status.SUCCESS) {
return checkResult; return checkResult;
} }
//check process instance exists //check process instance exists
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
if (processInstance == null) { if (processInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result; return result;
} }
//check process instance status //check process instance status
if (!processInstance.getState().typeIsFinished()) { if (!processInstance.getState().typeIsFinished()) {
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
processInstance.getName(), processInstance.getState().toString(), "update"); processInstance.getName(), processInstance.getState().toString(), "update");
return result; return result;
} }
Date schedule = null; ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
schedule = processInstance.getScheduleTime(); processInstance.getProcessDefinitionVersion());
if (scheduleTime != null) { ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class);
schedule = DateUtils.getScheduleDate(scheduleTime); //check workflow json is valid
result = processDefinitionService.checkProcessNodeList(processData, processInstanceJson);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
} }
processInstance.setScheduleTime(schedule); Tenant tenant = processService.getTenantForProcess(processData.getTenantId(),
processInstance.setLocations(locations); processDefinition.getUserId());
processInstance.setConnects(connects); // get the processinstancejson before saving,and then save the name and taskid
String globalParams = null; String oldJson = processInstance.getProcessInstanceJson();
String originDefParams = null; if (StringUtils.isNotEmpty(oldJson)) {
int timeout = processInstance.getTimeout(); processInstanceJson = processService.changeJson(processData, oldJson);
ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
if (StringUtils.isNotEmpty(processInstanceJson)) {
ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class);
//check workflow json is valid
Map<String, Object> checkFlowJson = processDefinitionService.checkProcessNodeList(processData, processInstanceJson);
if (checkFlowJson.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
originDefParams = JSONUtils.toJsonString(processData.getGlobalParams());
List<Property> globalParamList = processData.getGlobalParams();
Map<String, String> globalParamMap = Optional.ofNullable(globalParamList).orElse(Collections.emptyList()).stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList,
processInstance.getCmdTypeIfComplement(), schedule);
timeout = processData.getTimeout();
processInstance.setTimeout(timeout);
Tenant tenant = processService.getTenantForProcess(processData.getTenantId(),
processDefinition.getUserId());
if (tenant != null) {
processInstance.setTenantCode(tenant.getTenantCode());
}
// get the processinstancejson before saving,and then save the name and taskid
String oldJson = processInstance.getProcessInstanceJson();
if (StringUtils.isNotEmpty(oldJson)) {
processInstanceJson = processService.changeJson(processData,oldJson);
}
processInstance.setProcessInstanceJson(processInstanceJson);
processInstance.setGlobalParams(globalParams);
} }
setProcessInstance(processInstance, tenant, scheduleTime, locations,
connects, processInstanceJson, processData);
int update = processService.updateProcessInstance(processInstance); int update = processService.updateProcessInstance(processInstance);
int updateDefine = 1; int updateDefine = 1;
if (Boolean.TRUE.equals(syncDefine)) { if (Boolean.TRUE.equals(syncDefine)) {
processDefinition.setProcessDefinitionJson(processInstanceJson); updateDefine = syncDefinition(loginUser, project, processInstanceJson, locations, connects,
processDefinition.setGlobalParams(originDefParams); processInstance, processDefinition, processData);
processDefinition.setLocations(locations);
processDefinition.setConnects(connects);
processDefinition.setTimeout(timeout);
processDefinition.setUpdateTime(new Date());
// add process definition version
int version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition);
processDefinition.setVersion(version);
updateDefine = processDefineMapper.updateById(processDefinition);
} }
if (update > 0 && updateDefine > 0) { if (update > 0 && updateDefine > 0) {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} else { } else {
putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR); putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR);
} }
return result; return result;
}
/**
* sync definition according process instance
*
* @param loginUser
* @param project
* @param processInstanceJson
* @param locations
* @param connects
* @param processInstance
* @param processDefinition
* @param processData
* @return
*/
private int syncDefinition(User loginUser, Project project, String processInstanceJson, String locations, String connects,
ProcessInstance processInstance, ProcessDefinition processDefinition,
ProcessData processData) {
String originDefParams = JSONUtils.toJsonString(processData.getGlobalParams());
processDefinition.setProcessDefinitionJson(processInstanceJson);
processDefinition.setGlobalParams(originDefParams);
processDefinition.setLocations(locations);
processDefinition.setConnects(connects);
processDefinition.setTimeout(processInstance.getTimeout());
processDefinition.setUpdateTime(new Date());
int updateDefine = processService.saveProcessDefinition(loginUser, project, processDefinition.getName(),
processDefinition.getDescription(), locations, connects,
processData, processDefinition);
return updateDefine;
}
/**
* update process instance attributes
*
* @param processInstance
* @param tenant
* @param scheduleTime
* @param locations
* @param connects
* @param processInstanceJson
* @param processData
* @return false if check failed or
*/
private void setProcessInstance(ProcessInstance processInstance, Tenant tenant,
String scheduleTime, String locations, String connects, String processInstanceJson,
ProcessData processData) {
Date schedule = processInstance.getScheduleTime();
if (scheduleTime != null) {
schedule = DateUtils.getScheduleDate(scheduleTime);
}
processInstance.setScheduleTime(schedule);
processInstance.setLocations(locations);
processInstance.setConnects(connects);
if (StringUtils.isNotEmpty(processInstanceJson)) {
return;
}
List<Property> globalParamList = processData.getGlobalParams();
Map<String, String> globalParamMap = Optional.ofNullable(globalParamList).orElse(Collections.emptyList()).stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
String globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList,
processInstance.getCmdTypeIfComplement(), schedule);
int timeout = processData.getTimeout();
processInstance.setTimeout(timeout);
if (tenant != null) {
processInstance.setTenantCode(tenant.getTenantCode());
}
processInstance.setProcessInstanceJson(processInstanceJson);
processInstance.setGlobalParams(globalParams);
} }
/** /**
@ -705,13 +739,9 @@ public class ProcessInstanceService extends BaseService {
private static DAG<String, TaskNode, TaskNodeRelation> processInstance2DAG(ProcessInstance processInstance) { private static DAG<String, TaskNode, TaskNodeRelation> processInstance2DAG(ProcessInstance processInstance) {
String processDefinitionJson = processInstance.getProcessInstanceJson(); String processDefinitionJson = processInstance.getProcessInstanceJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
List<TaskNode> taskNodeList = processData.getTasks(); List<TaskNode> taskNodeList = processData.getTasks();
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList); ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);
return DagHelper.buildDagGraph(processDag); return DagHelper.buildDagGraph(processDag);
} }

154
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -38,7 +38,6 @@ import org.apache.dolphinscheduler.api.utils.exportprocess.TaskNodeParamFactory;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
@ -48,7 +47,6 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
@ -66,24 +64,24 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.permission.PermissionCheck; import org.apache.dolphinscheduler.service.permission.PermissionCheck;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -158,12 +156,15 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
/** /**
* create process definition * create process definition
* *
* @param loginUser login user * @param loginUser login user
* @param projectName project name * @param projectName project name
* @param name process definition name * @param processDefinitionName process definition name
* @param processDefinitionJson process definition json * @param processDefinitionJson process definition json
* @param desc description * @param desc description
* @param locations locations for nodes * @param locations locations for nodes
@ -174,7 +175,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public Map<String, Object> createProcessDefinition(User loginUser, public Map<String, Object> createProcessDefinition(User loginUser,
String projectName, String projectName,
String name, String processDefinitionName,
String processDefinitionJson, String processDefinitionJson,
String desc, String desc,
String locations, String locations,
@ -190,8 +191,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
} }
ProcessDefinition processDefinition = new ProcessDefinition(); ProcessDefinition processDefinition = new ProcessDefinition();
Date now = new Date();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Map<String, Object> checkProcessJson = checkProcessNodeList(processData, processDefinitionJson); Map<String, Object> checkProcessJson = checkProcessNodeList(processData, processDefinitionJson);
if (checkProcessJson.get(Constants.STATUS) != Status.SUCCESS) { if (checkProcessJson.get(Constants.STATUS) != Status.SUCCESS) {
@ -206,68 +205,10 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
putMsg(result, Status.CREATE_PROCESS_DEFINITION); putMsg(result, Status.CREATE_PROCESS_DEFINITION);
return result; return result;
} }
ProcessDefinitionLog processDefinitionLog = processService.insertProcessDefinitionLog(loginUser, processDefinitionCode, processDefinitionName, processData,
processDefinition.setName(name); project, desc, locations, connects);
processDefinition.setVersion(1); processService.switchVersion(processDefinition, processDefinitionLog);
processDefinition.setReleaseState(ReleaseState.OFFLINE); processService.createTaskAndRelation(loginUser, projectName, "", processDefinition, processData);
processDefinition.setUserId(loginUser.getId());
processDefinition.setDescription(desc);
processDefinition.setLocations(locations);
processDefinition.setConnects(connects);
processDefinition.setTimeout(processData.getTimeout());
processDefinition.setTenantId(processData.getTenantId());
processDefinition.setModifyBy(loginUser.getUserName());
//custom global params
List<Property> globalParamsList = processData.getGlobalParams();
if (CollectionUtils.isNotEmpty(globalParamsList)) {
Set<Property> globalParamsSet = new HashSet<>(globalParamsList);
globalParamsList = new ArrayList<>(globalParamsSet);
processDefinition.setGlobalParamList(globalParamsList);
}
processDefinition.setCreateTime(now);
processDefinition.setUpdateTime(now);
processDefinition.setFlag(Flag.YES);
// save the new process definition
processDefinitionMapper.insert(processDefinition);
// parse and save the taskDefinition and processTaskRelation
try {
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
for (TaskNode task : taskNodeList) {
taskDefinitionService.createTaskDefinition(loginUser, projectName, JSONUtils.toJsonString(task));
}
DAG<String, TaskNode, TaskNodeRelation> dag = genDagGraph(processDefinition);
Collection<String> beginNode = dag.getBeginNode();
Collection<String> endNode = dag.getEndNode();
// TODO: query taskCode by projectCode and taskName
processTaskRelationService.createProcessTaskRelation(
loginUser,
name,
projectName,
processDefinitionCode,
0L,
0L,
"0",
"");
} catch (Exception e) {
putMsg(result, Status.CREATE_PROCESS_DEFINITION);
return result;
}
// save process definition log
ProcessDefinitionLog processDefinitionLog = JSONUtils.parseObject(
JSONUtils.toJsonString(processDefinition), ProcessDefinitionLog.class);
processDefinitionLog.setOperator(loginUser.getId());
processDefinitionLog.setOperateTime(now);
processDefinitionLogMapper.insert(processDefinitionLog);
// return processDefinition object with ID // return processDefinition object with ID
result.put(Constants.DATA_LIST, processDefinition.getId()); result.put(Constants.DATA_LIST, processDefinition.getId());
@ -275,6 +216,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return result; return result;
} }
/** /**
* get resource ids * get resource ids
* *
@ -475,7 +417,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName()); putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName());
return result; return result;
} }
if (!name.equals(processDefinition.getName())) { if (!name.equals(processDefinition.getName())) {
// check whether the new process define name exist // check whether the new process define name exist
ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getId(), name); ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getId(), name);
@ -487,55 +428,11 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
// get the processdefinitionjson before saving,and then save the name and taskid // get the processdefinitionjson before saving,and then save the name and taskid
String oldJson = processDefinition.getProcessDefinitionJson(); String oldJson = processDefinition.getProcessDefinitionJson();
processDefinitionJson = processService.changeJson(processData, oldJson); processDefinitionJson = processService.changeJson(processData, oldJson);
// update TaskDefinition
ProcessData newProcessData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); ProcessData newProcessData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
List<TaskNode> taskNodeList = (newProcessData.getTasks() == null) ? new ArrayList<>() : newProcessData.getTasks(); int saveResult = processService.saveProcessDefinition(loginUser, project, name, desc,
locations, connects, newProcessData, processDefinition);
for (TaskNode task : taskNodeList) { if (saveResult > 0) {
// TODO update by code directly
Map<String, Object> stringObjectMap = taskDefinitionService.queryTaskDefinitionByName(loginUser, projectName, task.getName());
TaskDefinition taskDefinition = (TaskDefinition) stringObjectMap.get(Constants.DATA_LIST);
taskDefinitionService.updateTaskDefinition(loginUser, projectName, taskDefinition.getCode(), JSONUtils.toJsonString(task));
}
List<ProcessDefinitionLog> processDefinitionLogs = processDefinitionLogMapper.queryByDefinitionCode(processDefinition.getCode());
int version = getNextVersion(processDefinitionLogs);
Date now = new Date();
processDefinition.setVersion(version);
processDefinition.setName(name);
processDefinition.setReleaseState(ReleaseState.OFFLINE);
processDefinition.setProjectCode(project.getCode());
processDefinition.setDescription(desc);
processDefinition.setLocations(locations);
processDefinition.setConnects(connects);
processDefinition.setTimeout(processData.getTimeout());
processDefinition.setTenantId(processData.getTenantId());
//custom global params
List<Property> globalParamsList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(processData.getGlobalParams())) {
Set<Property> userDefParamsSet = new HashSet<>(processData.getGlobalParams());
globalParamsList = new ArrayList<>(userDefParamsSet);
}
processDefinition.setGlobalParamList(globalParamsList);
processDefinition.setUpdateTime(now);
processDefinition.setFlag(Flag.YES);
processDefinition.setVersion(version);
int update = processDefinitionMapper.updateById(processDefinition);
// save processDefinitionLog
ProcessDefinitionLog processDefinitionLog = JSONUtils.parseObject(
JSONUtils.toJsonString(processDefinition), ProcessDefinitionLog.class);
processDefinitionLog.setOperator(loginUser.getId());
processDefinitionLog.setOperateTime(now);
int insert = processDefinitionLogMapper.insert(processDefinitionLog);
if (update > 0 && insert > 0) {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition); result.put(Constants.DATA_LIST, processDefinition);
} else { } else {
@ -544,13 +441,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return result; return result;
} }
private int getNextVersion(List<ProcessDefinitionLog> processDefinitionLogs) {
return processDefinitionLogs
.stream()
.map(ProcessDefinitionLog::getVersion)
.max((x, y) -> x > y ? x : y)
.orElse(0) + 1;
}
/** /**
* verify process definition name unique * verify process definition name unique
@ -1732,18 +1622,8 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
, version); , version);
return result; return result;
} }
int switchVersion = processService.switchVersion(processDefinition, processDefinitionLog);
processDefinition.setVersion(processDefinitionLog.getVersion()); if (switchVersion > 0) {
processDefinition.setDescription(processDefinitionLog.getDescription());
processDefinition.setLocations(processDefinitionLog.getLocations());
processDefinition.setConnects(processDefinitionLog.getConnects());
processDefinition.setTimeout(processDefinitionLog.getTimeout());
processDefinition.setGlobalParams(processDefinitionLog.getGlobalParams());
processDefinition.setUpdateTime(new Date());
processDefinition.setWarningGroupId(processDefinitionLog.getWarningGroupId());
processDefinition.setResourceIds(processDefinitionLog.getResourceIds());
if (processDefinitionMapper.updateById(processDefinition) > 0) {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} else { } else {
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR); putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);

63
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -47,6 +47,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
@ -88,6 +89,9 @@ public class TaskDefinitionServiceImpl extends BaseService implements
@Autowired @Autowired
private ProcessDefinitionMapper processDefinitionMapper; private ProcessDefinitionMapper processDefinitionMapper;
@Autowired
private ProcessService processService;
/** /**
* create task definition * create task definition
* *
@ -144,7 +148,7 @@ public class TaskDefinitionServiceImpl extends BaseService implements
taskNode.getTaskTimeoutParameter().getInterval(), taskNode.getTaskTimeoutParameter().getInterval(),
now, now,
now); now);
taskDefinition.setResourceIds(getResourceIds(taskDefinition)); taskDefinition.setResourceIds(processService.getResourceIds(taskDefinition));
// save the new task definition // save the new task definition
taskDefinitionMapper.insert(taskDefinition); taskDefinitionMapper.insert(taskDefinition);
// save task definition log // save task definition log
@ -159,30 +163,6 @@ public class TaskDefinitionServiceImpl extends BaseService implements
return result; return result;
} }
/**
* get resource ids
*
* @param taskDefinition taskDefinition
* @return resource ids
*/
private String getResourceIds(TaskDefinition taskDefinition) {
Set<Integer> resourceIds = null;
// TODO modify taskDefinition.getTaskType()
AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType().getDescp(), taskDefinition.getTaskParams());
if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
resourceIds = params.getResourceFilesList().
stream()
.filter(t -> t.getId() != 0)
.map(ResourceInfo::getId)
.collect(Collectors.toSet());
}
if (CollectionUtils.isEmpty(resourceIds)) {
return StringUtils.EMPTY;
}
return StringUtils.join(resourceIds, ",");
}
/** /**
* query task definition * query task definition
* *
@ -276,38 +256,7 @@ public class TaskDefinitionServiceImpl extends BaseService implements
return result; return result;
} }
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByDefinitionCode(taskCode); processService.updateTaskDefinition(loginUser, project.getCode(), taskNode, taskDefinition);
int version = taskDefinitionLogs
.stream()
.map(TaskDefinitionLog::getVersion)
.max((x, y) -> x > y ? x : y)
.orElse(0) + 1;
Date now = new Date();
taskDefinition.setVersion(version);
taskDefinition.setCode(taskCode);
taskDefinition.setName(taskNode.getName());
taskDefinition.setDescription(taskNode.getDesc());
taskDefinition.setProjectCode(project.getCode());
taskDefinition.setUserId(loginUser.getId());
taskDefinition.setTaskType(TaskType.of(taskNode.getType()));
taskDefinition.setTaskParams(taskNode.getParams());
taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES);
taskDefinition.setTaskPriority(taskNode.getTaskInstancePriority());
taskDefinition.setWorkerGroup(taskNode.getWorkerGroup());
taskDefinition.setFailRetryTimes(taskNode.getMaxRetryTimes());
taskDefinition.setFailRetryInterval(taskNode.getRetryInterval());
taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
taskDefinition.setTaskTimeoutStrategy(taskNode.getTaskTimeoutParameter().getStrategy());
taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval());
taskDefinition.setUpdateTime(now);
taskDefinition.setResourceIds(getResourceIds(taskDefinition));
taskDefinitionMapper.updateById(taskDefinition);
// save task definition log
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
taskDefinitionLog.set(taskDefinition);
taskDefinitionLog.setOperator(loginUser.getId());
taskDefinitionLog.setOperateTime(now);
taskDefinitionLogMapper.insert(taskDefinitionLog);
result.put(Constants.DATA_LIST, taskCode); result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
return result; return result;

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java

@ -184,6 +184,8 @@ public class ProcessDefinition {
@TableField(exist = false) @TableField(exist = false)
private int warningGroupId; private int warningGroupId;
public ProcessDefinition(){}
public String getName() { public String getName() {
return name; return name;
} }

37
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java

@ -47,10 +47,23 @@ public class ProcessInstance {
*/ */
@TableId(value = "id", type = IdType.AUTO) @TableId(value = "id", type = IdType.AUTO)
private int id; private int id;
/** /**
* process definition id * process definition id
* TODO delete
*/ */
private int processDefinitionId; private int processDefinitionId;
/**
* process definition code
*/
private Long processDefinitionCode;
/**
* process definition version
*/
private int processDefinitionVersion;
/** /**
* process state * process state
*/ */
@ -145,6 +158,7 @@ public class ProcessInstance {
/** /**
* process instance json * process instance json
* TODO delete
*/ */
private String processInstanceJson; private String processInstanceJson;
@ -579,6 +593,22 @@ public class ProcessInstance {
this.tenantId = tenantId; this.tenantId = tenantId;
} }
public Long getProcessDefinitionCode() {
return processDefinitionCode;
}
public void setProcessDefinitionCode(Long processDefinitionCode) {
this.processDefinitionCode = processDefinitionCode;
}
public int getProcessDefinitionVersion() {
return processDefinitionVersion;
}
public void setProcessDefinitionVersion(int processDefinitionVersion) {
this.processDefinitionVersion = processDefinitionVersion;
}
@Override @Override
public String toString() { public String toString() {
return "ProcessInstance{" return "ProcessInstance{"
@ -651,6 +681,12 @@ public class ProcessInstance {
+ timeout + timeout
+ ", tenantId=" + ", tenantId="
+ tenantId + tenantId
+ ", processDefinitionCode='"
+ processDefinitionCode
+ '\''
+ ", processDefinitionVersion='"
+ processDefinitionVersion
+ '\''
+ '}'; + '}';
} }
@ -672,4 +708,5 @@ public class ProcessInstance {
public int hashCode() { public int hashCode() {
return Objects.hash(id); return Objects.hash(id);
} }
} }

43
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -58,7 +58,9 @@ public class TaskInstance implements Serializable {
/** /**
* process definition id * process definition id
* TODO delete
*/ */
@TableField(exist = false)
private int processDefinitionId; private int processDefinitionId;
/** /**
@ -66,6 +68,21 @@ public class TaskInstance implements Serializable {
*/ */
private int processInstanceId; private int processInstanceId;
/**
* task code
*/
private long taskCode;
/**
* process definition code
*/
private long processDefinitionCode;
/**
* task defintion version
*/
private String taskDefinitionVersion;
/** /**
* process instance name * process instance name
*/ */
@ -74,7 +91,9 @@ public class TaskInstance implements Serializable {
/** /**
* task json * task json
* TODO delete
*/ */
@TableField(exist = false)
private String taskJson; private String taskJson;
/** /**
@ -601,4 +620,28 @@ public class TaskInstance implements Serializable {
+ ", delayTime=" + delayTime + ", delayTime=" + delayTime
+ '}'; + '}';
} }
public long getTaskCode() {
return taskCode;
}
public void setTaskCode(long taskCode) {
this.taskCode = taskCode;
}
public long getProcessDefinitionCode() {
return processDefinitionCode;
}
public void setProcessDefinitionCode(long processDefinitionCode) {
this.processDefinitionCode = processDefinitionCode;
}
public String getTaskDefinitionVersion() {
return taskDefinitionVersion;
}
public void setTaskDefinitionVersion(String taskDefinitionVersion) {
this.taskDefinitionVersion = taskDefinitionVersion;
}
} }

11
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java

@ -45,6 +45,13 @@ public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinition
*/ */
List<ProcessDefinitionLog> queryByDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode); List<ProcessDefinitionLog> queryByDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
/**
* query max version for definition
* @param processDefinitionCode
* @return
*/
int queryMaxVersionForDefinition(@Param("processDefinitionCode") long processDefinitionCode);
/** /**
* query the certain process definition version info by process definition code and version number * query the certain process definition version info by process definition code and version number
* *
@ -52,6 +59,6 @@ public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinition
* @param version version number * @param version version number
* @return the process definition version info * @return the process definition version info
*/ */
ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("processDefinitionCode") Long processDefinitionCode, @Param("version") long version); ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("processDefinitionCode") Long processDefinitionCode,
@Param("version") long version);
} }

10
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml

@ -25,7 +25,6 @@
warning_group_id, timeout, tenant_id,operator, operate_time, create_time, warning_group_id, timeout, tenant_id,operator, operate_time, create_time,
update_time update_time
</sql> </sql>
<select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog"> <select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
select pd.id, pd.code, pd.name, pd.version, pd.description, pd.project_code, select pd.id, pd.code, pd.name, pd.version, pd.description, pd.project_code,
pd.release_state, pd.user_id,pd.global_params, pd.flag, pd.locations, pd.connects, pd.release_state, pd.user_id,pd.global_params, pd.flag, pd.locations, pd.connects,
@ -37,15 +36,13 @@
WHERE p.code = #{projectCode} WHERE p.code = #{projectCode}
and pd.name = #{processDefinitionName} and pd.name = #{processDefinitionName}
</select> </select>
<select id="queryByDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog"> <select id="queryByDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
select select
<include refid="baseSql"/> <include refid="baseSql"/>
from t_ds_process_definition_log from t_ds_process_definition_log
WHERE pd.code = #{processDefinitionCode} WHERE pd.code = #{processDefinitionCode}
</select> </select>
<select id="queryByDefinitionCodeAndVersion"
<select id="queryByProcessDefinitionCodeAndVersion"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog"> resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
select select
<include refid="baseSql"/> <include refid="baseSql"/>
@ -53,5 +50,10 @@
where code = #{processDefinitionCode} where code = #{processDefinitionCode}
and version = #{version} and version = #{version}
</select> </select>
<select id="queryMaxVersionForDefinition" resultType="java.lang.Integer">
select max(version)
from t_ds_process_definition_log
where code = #{processDefinitionCode}
</select>
</mapper> </mapper>

4
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -19,7 +19,7 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper"> <mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper">
<sql id="baseSql"> <sql id="baseSql">
id, name, process_definition_id, state, recovery, start_time, end_time, run_times,host, id, name, process_definition_id, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host,
command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type, command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type,
warning_group_id, schedule_time, command_start_time, global_params, process_instance_json, flag, warning_group_id, schedule_time, command_start_time, global_params, process_instance_json, flag,
update_time, is_sub_process, executor_id, locations, connects, history_cmd, dependence_schedule_times, update_time, is_sub_process, executor_id, locations, connects, history_cmd, dependence_schedule_times,
@ -88,7 +88,7 @@
</select> </select>
<select id="queryProcessInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance"> <select id="queryProcessInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select instance.id, instance.process_definition_id, instance.command_type, instance.executor_id, select instance.id, instance.process_definition_id, instance.command_type, instance.executor_id, instance.process_definition_version,
instance.name, instance.state, instance.schedule_time, instance.start_time, instance.end_time, instance.name, instance.state, instance.schedule_time, instance.start_time, instance.end_time,
instance.run_times, instance.recovery, instance.host instance.run_times, instance.recovery, instance.host
from t_ds_process_instance instance from t_ds_process_instance instance

10
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

@ -19,13 +19,13 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper"> <mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper">
<sql id="baseSql"> <sql id="baseSql">
id, name, task_type, process_definition_id, process_instance_id, task_json, state, submit_time, id, name, task_type, process_instance_id, task_code, task_definition_version, process_definition_code, state, submit_time,
start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link, start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link,
flag, retry_interval, max_retry_times, task_instance_priority, worker_group, executor_id, flag, retry_interval, max_retry_times, task_instance_priority, worker_group, executor_id,
first_submit_time, delay_time, var_pool first_submit_time, delay_time, var_pool
</sql> </sql>
<sql id="baseSqlV2"> <sql id="baseSqlV2">
${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.process_definition_id, ${alias}.process_instance_id, ${alias}.task_json, ${alias}.state, ${alias}.submit_time, ${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_definition_code, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time,
${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link, ${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link,
${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group, ${alias}.executor_id, ${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group, ${alias}.executor_id,
${alias}.first_submit_time, ${alias}.delay_time, ${alias}.var_pool ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.var_pool
@ -72,7 +72,7 @@
<select id="countTaskInstanceStateByUser" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount"> <select id="countTaskInstanceStateByUser" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
select state, count(0) as count select state, count(0) as count
from t_ds_task_instance t from t_ds_task_instance t
left join t_ds_process_definition d on d.id=t.process_definition_id left join t_ds_process_definition d on d.code=t.process_definition_code
left join t_ds_project p on p.id=d.project_id left join t_ds_project p on p.id=d.project_id
where 1=1 where 1=1
<if test="projectIds != null and projectIds.length != 0"> <if test="projectIds != null and projectIds.length != 0">
@ -98,7 +98,7 @@
<select id="countTask" resultType="java.lang.Integer"> <select id="countTask" resultType="java.lang.Integer">
select count(1) as count select count(1) as count
from t_ds_task_instance task,t_ds_process_definition process from t_ds_task_instance task,t_ds_process_definition process
where task.process_definition_id=process.id where task.process_definition_code=process.code
<if test="projectIds != null and projectIds.length != 0"> <if test="projectIds != null and projectIds.length != 0">
and process.project_id in and process.project_id in
<foreach collection="projectIds" index="index" item="i" open="(" separator="," close=")"> <foreach collection="projectIds" index="index" item="i" open="(" separator="," close=")">
@ -120,7 +120,7 @@
, ,
process.name as process_instance_name process.name as process_instance_name
from t_ds_task_instance instance from t_ds_task_instance instance
left join t_ds_process_definition define on instance.process_definition_id = define.id left join t_ds_process_definition define on instance.process_definition_code = define.code
left join t_ds_process_instance process on process.id=instance.process_instance_id left join t_ds_process_instance process on process.id=instance.process_instance_id
where define.project_id = #{projectId} where define.project_id = #{projectId}
<if test="startTime != null"> <if test="startTime != null">

248
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -35,13 +35,17 @@ import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.DateInterval; import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters; import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
@ -49,18 +53,22 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.CycleDependency; import org.apache.dolphinscheduler.dao.entity.CycleDependency;
import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand; import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.UdfFunc;
@ -68,12 +76,15 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper; import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper; import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
@ -125,6 +136,11 @@ public class ProcessService {
@Autowired @Autowired
private ProcessDefinitionMapper processDefineMapper; private ProcessDefinitionMapper processDefineMapper;
@Autowired
private ProcessDefinitionLogMapper processDefineLogMapper;
@Autowired @Autowired
private ProcessInstanceMapper processInstanceMapper; private ProcessInstanceMapper processInstanceMapper;
@ -158,6 +174,15 @@ public class ProcessService {
@Autowired @Autowired
private ProjectMapper projectMapper; private ProjectMapper projectMapper;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@Autowired
private TaskDefinitionLogMapper taskDefinitionLogMapper;
@Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper;
/** /**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction * handle Command (construct ProcessInstance from Command) , wrapped in transaction
* *
@ -339,6 +364,37 @@ public class ProcessService {
return processDefineMapper.selectById(processDefinitionId); return processDefineMapper.selectById(processDefinitionId);
} }
/**
* find process define by id.
*
* @param processDefinitionCode processDefinitionCode
* @return process definition
*/
public ProcessDefinition findProcessDefinition(Long processDefinitionCode, int version) {
ProcessDefinition processDefinition = processDefineMapper.queryByCode(processDefinitionCode);
if (processDefinition.getVersion() != version) {
ProcessDefinitionLog log = processDefineLogMapper.queryByDefinitionCodeAndVersion(processDefinitionCode, version);
processDefinition = convertFromLog(log);
}
return processDefinition;
}
/**
* covert log to process definition
* @param processDefinitionLog
* @return
*/
public ProcessDefinition convertFromLog(ProcessDefinitionLog processDefinitionLog) {
ProcessDefinition definition = null;
if (null != processDefinitionLog) {
definition = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinitionLog), ProcessDefinition.class);
}
if (null != definition) {
definition.setId(0);
}
return definition;
}
/** /**
* delete work process instance by id * delete work process instance by id
* *
@ -2055,4 +2111,196 @@ public class ProcessService {
} }
return JSONUtils.toJsonString(processData); return JSONUtils.toJsonString(processData);
} }
/**
* switch process definition version to process definition log version
*
* @param processDefinition
* @param processDefinitionLog
* @return
*/
public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) {
if (null == processDefinition || null == processDefinitionLog) {
return Constants.EXIT_CODE_FAILURE;
}
ProcessDefinition tmpDefinition = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinitionLog),
ProcessDefinition.class);
tmpDefinition.setId(processDefinition.getId());
tmpDefinition.setReleaseState(ReleaseState.OFFLINE);
tmpDefinition.setFlag(Flag.YES);
int switchResult = 0;
if (0 == processDefinition.getId()) {
switchResult = processDefineMapper.insert(tmpDefinition);
} else {
switchResult = processDefineMapper.updateById(tmpDefinition);
}
//TODO... switch task relations
return switchResult;
}
/**
* update task definition
*
* @param operator
* @param projectCode
* @param taskNode
* @param taskDefinition
* @return
*/
public int updateTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) {
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByDefinitionCode(taskDefinition.getCode());
int version = taskDefinitionLogs
.stream()
.map(TaskDefinitionLog::getVersion)
.max((x, y) -> x > y ? x : y)
.orElse(0) + 1;
Date now = new Date();
taskDefinition.setVersion(version);
taskDefinition.setCode(taskDefinition.getCode());
taskDefinition.setName(taskNode.getName());
taskDefinition.setDescription(taskNode.getDesc());
taskDefinition.setProjectCode(projectCode);
taskDefinition.setUserId(operator.getId());
taskDefinition.setTaskType(TaskType.of(taskNode.getType()));
taskDefinition.setTaskParams(taskNode.getParams());
taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES);
taskDefinition.setTaskPriority(taskNode.getTaskInstancePriority());
taskDefinition.setWorkerGroup(taskNode.getWorkerGroup());
taskDefinition.setFailRetryTimes(taskNode.getMaxRetryTimes());
taskDefinition.setFailRetryInterval(taskNode.getRetryInterval());
taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
taskDefinition.setTaskTimeoutStrategy(taskNode.getTaskTimeoutParameter().getStrategy());
taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval());
taskDefinition.setUpdateTime(now);
taskDefinition.setResourceIds(getResourceIds(taskDefinition));
int update = taskDefinitionMapper.updateById(taskDefinition);
// save task definition log
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
taskDefinitionLog.set(taskDefinition);
taskDefinitionLog.setOperator(operator.getId());
taskDefinitionLog.setOperateTime(now);
int insert = taskDefinitionLogMapper.insert(taskDefinitionLog);
return insert & update;
}
/**
* get resource ids
*
* @param taskDefinition taskDefinition
* @return resource ids
*/
public String getResourceIds(TaskDefinition taskDefinition) {
Set<Integer> resourceIds = null;
// TODO modify taskDefinition.getTaskType()
AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType().getDescp(), taskDefinition.getTaskParams());
if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
resourceIds = params.getResourceFilesList().
stream()
.filter(t -> t.getId() != 0)
.map(ResourceInfo::getId)
.collect(Collectors.toSet());
}
if (CollectionUtils.isEmpty(resourceIds)) {
return StringUtils.EMPTY;
}
return StringUtils.join(resourceIds, ",");
}
/**
* @param operator
* @param name
* @param desc
* @param locations
* @param connects
* @param project
* @param processData
* @param processDefinition
* @return
*/
public int saveProcessDefinition(User operator, Project project, String name, String desc, String locations,
String connects, ProcessData processData, ProcessDefinition processDefinition) {
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
for (TaskNode task : taskNodeList) {
// TODO update by code directly
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionName(project.getCode(), task.getName());
updateTaskDefinition(operator, project.getCode(), task, taskDefinition);
}
createTaskAndRelation(operator, project.getName(), "", processDefinition, processData);
ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog(operator, processDefinition.getCode(),
name, processData, project, desc, locations, connects);
return switchVersion(processDefinition, processDefinitionLog);
}
/**
* @param operator
* @param processDefinitionCode
* @param processDefinitionName
* @param processData
* @param project
* @param desc
* @param locations
* @param connects
* @return
*/
public ProcessDefinitionLog insertProcessDefinitionLog(User operator, Long processDefinitionCode, String processDefinitionName,
ProcessData processData, Project project,
String desc, String locations, String connects) {
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog();
int version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinitionLog.getCode());
processDefinitionLog.setCode(processDefinitionCode);
processDefinitionLog.setVersion(version);
processDefinitionLog.setName(processDefinitionName);
processDefinitionLog.setReleaseState(ReleaseState.OFFLINE);
processDefinitionLog.setProjectCode(project.getCode());
processDefinitionLog.setDescription(desc);
processDefinitionLog.setLocations(locations);
processDefinitionLog.setConnects(connects);
processDefinitionLog.setTimeout(processData.getTimeout());
processDefinitionLog.setTenantId(processData.getTenantId());
processDefinitionLog.setOperator(operator.getId());
Date updateTime = new Date();
processDefinitionLog.setOperateTime(updateTime);
processDefinitionLog.setUpdateTime(updateTime);
//custom global params
List<Property> globalParamsList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(processData.getGlobalParams())) {
Set<Property> userDefParamsSet = new HashSet<>(processData.getGlobalParams());
globalParamsList = new ArrayList<>(userDefParamsSet);
}
processDefinitionLog.setGlobalParamList(globalParamsList);
processDefinitionLog.setFlag(Flag.YES);
int insert = processDefinitionLogMapper.insert(processDefinitionLog);
if (insert > 0) {
return processDefinitionLog;
}
return null;
}
/**
* create task defintion and task relations
*
* @param loginUser
* @param projectName
* @param relationName
* @param processDefinition
* @param processData
* @return
*/
public void createTaskAndRelation(User loginUser, String projectName, String relationName,
ProcessDefinition processDefinition,
ProcessData processData) {
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
for (TaskNode task : taskNodeList) {
//TODO... task code exists, update task
//createTaskDefinition(loginUser, projectName, JSONUtils.toJsonString(task));
}
// TODO: query taskCode by projectCode and taskName
}
} }

2
sql/dolphinscheduler-postgre.sql

@ -674,10 +674,10 @@ CREATE TABLE t_ds_task_instance (
id int NOT NULL , id int NOT NULL ,
name varchar(255) DEFAULT NULL , name varchar(255) DEFAULT NULL ,
task_type varchar(64) DEFAULT NULL , task_type varchar(64) DEFAULT NULL ,
task_code bigint NOT NULL,
task_definition_version int DEFAULT NULL , task_definition_version int DEFAULT NULL ,
process_definition_code bigint DEFAULT NULL , process_definition_code bigint DEFAULT NULL ,
process_instance_id int DEFAULT NULL , process_instance_id int DEFAULT NULL ,
task_json text ,
state int DEFAULT NULL , state int DEFAULT NULL ,
submit_time timestamp DEFAULT NULL , submit_time timestamp DEFAULT NULL ,
start_time timestamp DEFAULT NULL , start_time timestamp DEFAULT NULL ,

4
sql/dolphinscheduler_mysql.sql

@ -810,11 +810,11 @@ DROP TABLE IF EXISTS `t_ds_task_instance`;
CREATE TABLE `t_ds_task_instance` ( CREATE TABLE `t_ds_task_instance` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key', `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
`name` varchar(255) DEFAULT NULL COMMENT 'task name', `name` varchar(255) DEFAULT NULL COMMENT 'task name',
`task_definition_version` int(11) DEFAULT NULL COMMENT 'task definition version',
`task_type` varchar(64) DEFAULT NULL COMMENT 'task type', `task_type` varchar(64) DEFAULT NULL COMMENT 'task type',
`task_code` bigint(20) NOT NULL COMMENT 'task definition code',
`task_definition_version` int(11) DEFAULT NULL COMMENT 'task definition version',
`process_definition_code` bigint(20) NOT NULL COMMENT 'process definition code', `process_definition_code` bigint(20) NOT NULL COMMENT 'process definition code',
`process_instance_id` int(11) DEFAULT NULL COMMENT 'process instance id', `process_instance_id` int(11) DEFAULT NULL COMMENT 'process instance id',
`task_json` longtext COMMENT 'task content json',
`state` tinyint(4) DEFAULT NULL COMMENT 'Status: 0 commit succeeded, 1 running, 2 prepare to pause, 3 pause, 4 prepare to stop, 5 stop, 6 fail, 7 succeed, 8 need fault tolerance, 9 kill, 10 wait for thread, 11 wait for dependency to complete', `state` tinyint(4) DEFAULT NULL COMMENT 'Status: 0 commit succeeded, 1 running, 2 prepare to pause, 3 pause, 4 prepare to stop, 5 stop, 6 fail, 7 succeed, 8 need fault tolerance, 9 kill, 10 wait for thread, 11 wait for dependency to complete',
`submit_time` datetime DEFAULT NULL COMMENT 'task submit time', `submit_time` datetime DEFAULT NULL COMMENT 'task submit time',
`start_time` datetime DEFAULT NULL COMMENT 'task start time', `start_time` datetime DEFAULT NULL COMMENT 'task start time',

Loading…
Cancel
Save