Browse Source

Merge branch 'json_split' of https://github.com/apache/incubator-dolphinscheduler into spilit

pull/3/MERGE
lenboo 4 years ago
parent
commit
9d81c5da9f
  1. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
  3. 33
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProjectUser.java
  4. 15
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.java
  5. 26
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml
  6. 2
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectUserMapper.xml
  7. 78
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  8. 2
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  9. 2
      sql/dolphinscheduler_mysql.sql

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -1505,6 +1505,8 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return checkResult;
}
// TODO:
// Project targetProject = projectMapper.queryDetailByCode(targetProjectCode);
Project targetProject = projectMapper.queryDetailById(targetProjectId);
if (targetProject == null) {
putMsg(result, Status.PROJECT_NOT_FOUNT, targetProjectId);
@ -1553,6 +1555,8 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return result;
}
// TODO :
// Project targetProject = projectMapper.queryDetailByCode(targetProjectCode);
Project targetProject = projectMapper.queryDetailById(targetProjectId);
if (targetProject == null) {
putMsg(result, Status.PROJECT_NOT_FOUNT, targetProjectId);

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java

@ -100,7 +100,7 @@ public class ProjectServiceImpl extends BaseService implements ProjectService {
.updateTime(now)
.build();
} catch (SnowFlakeException e) {
putMsg(result, Status.CREATE_PROCESS_DEFINITION);
putMsg(result, Status.CREATE_PROJECT_ERROR);
return result;
}

33
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProjectUser.java

@ -37,6 +37,9 @@ public class ProjectUser {
@TableField("project_id")
private int projectId;
@TableField("project_code")
private int projectCode;
/**
* project name
*/
@ -60,6 +63,14 @@ public class ProjectUser {
@TableField("update_time")
private Date updateTime;
public int getProjectCode() {
return projectCode;
}
public void setProjectCode(int projectCode) {
this.projectCode = projectCode;
}
public int getId() {
return id;
}
@ -123,17 +134,19 @@ public class ProjectUser {
public void setPerm(int perm) {
this.perm = perm;
}
@Override
public String toString() {
return "ProjectUser{" +
"id=" + id +
", projectId=" + projectId +
", projectName='" + projectName + '\'' +
", userId=" + userId +
", userName='" + userName + '\'' +
", perm=" + perm +
", createTime=" + createTime +
", updateTime=" + updateTime +
'}';
return "ProjectUser{"
+ "id=" + id
+ ", userId=" + userId
+ ", projectId=" + projectId
+ ", projectCode=" + projectCode
+ ", projectName='" + projectName + '\''
+ ", userName='" + userName + '\''
+ ", perm=" + perm
+ ", createTime=" + createTime
+ ", updateTime=" + updateTime
+ '}';
}
}

15
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.java

@ -31,14 +31,28 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
* project mapper interface
*/
public interface ProjectMapper extends BaseMapper<Project> {
/**
* query project detail by code
* @param projectCode projectCode
* @return project
*/
Project queryByCode(@Param("projectCode") Long projectCode);
/**
* TODO: delete
* query project detail by id
* @param projectId projectId
* @return project
*/
Project queryDetailById(@Param("projectId") int projectId);
/**
* query project detail by code
* @param projectCode projectCode
* @return project
*/
Project queryDetailByCode(@Param("projectCode") Long projectCode);
/**
* query project by name
* @param projectName projectName
@ -91,5 +105,4 @@ public interface ProjectMapper extends BaseMapper<Project> {
* @return projectName and userName
*/
ProjectUser queryProjectWithUserByProcessInstanceId(@Param("processInstanceId") int processInstanceId);
}

26
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml

@ -25,6 +25,26 @@
<sql id="baseSqlV2">
${alias}.id, ${alias}.name, ${alias}.code, ${alias}.description, ${alias}.user_id, ${alias}.flag, ${alias}.create_time, ${alias}.update_time
</sql>
<select id="queryByCode" resultType="org.apache.dolphinscheduler.dao.entity.Project">
select
<include refid="baseSql"/>
from t_ds_project
where code = #{code}
</select>
<select id="queryDetailByCode" resultType="org.apache.dolphinscheduler.dao.entity.Project">
select
<include refid="baseSqlV2">
<property name="alias" value="p"/>
</include>
,
u.user_name as user_name
from t_ds_project p
join t_ds_user u on p.user_id = u.id
where p.code = #{projectCode}
</select>
<select id="queryDetailById" resultType="org.apache.dolphinscheduler.dao.entity.Project">
select
<include refid="baseSqlV2">
@ -100,12 +120,12 @@
<select id="queryProjectWithUserByProcessInstanceId" resultType="org.apache.dolphinscheduler.dao.entity.ProjectUser">
select
dp.id projectId,
dp.code projectCode,
dp.name projectName,
u.user_name userName
from t_ds_process_instance di
join t_ds_process_definition dpd on di.process_definition_id = dpd.id
join t_ds_project dp on dpd.project_id = dp.id
join t_ds_process_definition dpd on di.process_definition_code = dpd.code
join t_ds_project dp on dpd.project_code = dp.code
join t_ds_user u on dp.user_id = u.id
where di.id = #{processInstanceId};
</select>

2
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectUserMapper.xml

@ -19,7 +19,7 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProjectUserMapper">
<sql id="baseSql">
id, user_id, project_id, perm, create_time, update_time
id, user_id, project_code, perm, create_time, update_time
</sql>
<delete id="deleteProjectRelation">
delete from t_ds_relation_project_user

78
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -44,7 +44,6 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
@ -97,8 +96,8 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
@ -2278,11 +2277,12 @@ public class ProcessService {
/**
* create task definition and task relations
*/
public int createTaskAndRelation(User operator,
Long projectCode,
ProcessDefinition processDefinition,
ProcessData processData) {
public void createTaskAndRelation(User operator,
Long projectCode,
ProcessDefinition processDefinition,
ProcessData processData) {
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
Map<String, Long> taskNameAndCode = new HashMap<>();
for (TaskNode taskNode : taskNodeList) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionName(projectCode, taskNode.getName());
if (taskDefinition == null) {
@ -2292,44 +2292,60 @@ public class ProcessService {
taskDefinition = new TaskDefinition();
taskDefinition.setCode(code);
} catch (SnowFlakeException e) {
logger.error("Task code get error, ", e);
return -1;
throw new ServiceException("Task code get error", e);
}
saveTaskDefinition(operator, projectCode, taskNode, taskDefinition);
} else {
if (isTaskOnline(taskDefinition.getCode())) {
// TODO return something for fail
return -1;
throw new ServiceException(String.format("The task %s is on line in process", taskNode.getName()));
}
updateTaskDefinition(operator, projectCode, taskNode, taskDefinition);
}
taskNameAndCode.put(taskNode.getName(), taskDefinition.getCode());
}
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
if (!processTaskRelationList.isEmpty()) {
processTaskRelationMapper.deleteByCode(projectCode, processDefinition.getCode());
}
// TODO parse taskNodeList for preTaskCode and postTaskCode
List<TaskNodeRelation> taskNodeRelationList = DagHelper.getProcessDag(taskNodeList).getEdges();
List<ProcessTaskRelation> builderRelationList = new ArrayList<>();
Date now = new Date();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation("",// todo relation name
processDefinition.getVersion(),
projectCode,
processDefinition.getCode(),
0L, // todo pre task code
0L, // todo post task code
ConditionType.of(""), // todo conditionType
"", // todo conditionParams
now,
now);
// save process task relation
int insert = processTaskRelationMapper.insert(processTaskRelation);
// save process task relation log
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.set(processTaskRelation);
processTaskRelationLog.setOperator(operator.getId());
processTaskRelationLog.setOperateTime(now);
int logInsert = processTaskRelationLogMapper.insert(processTaskRelationLog);
return insert & logInsert;
for (TaskNode taskNode : taskNodeList) {
List<String> depList = taskNode.getDepList();
if (CollectionUtils.isNotEmpty(depList)) {
for (String preTaskName : depList) {
builderRelationList.add(new ProcessTaskRelation("",// todo relation name
processDefinition.getVersion(),
projectCode,
processDefinition.getCode(),
taskNameAndCode.get(preTaskName),
taskNameAndCode.get(taskNode.getName()),
ConditionType.of("none"), // todo conditionType
taskNode.getConditionResult(),
now,
now));
}
} else {
builderRelationList.add(new ProcessTaskRelation("",// todo relation name
processDefinition.getVersion(),
projectCode,
processDefinition.getCode(),
0L,
taskNameAndCode.get(taskNode.getName()),
ConditionType.of("none"), // todo conditionType
taskNode.getConditionResult(),
now,
now));
}
}
for (ProcessTaskRelation processTaskRelation : builderRelationList) {
processTaskRelationMapper.insert(processTaskRelation);
// save process task relation log
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.set(processTaskRelation);
processTaskRelationLog.setOperator(operator.getId());
processTaskRelationLog.setOperateTime(now);
processTaskRelationLogMapper.insert(processTaskRelationLog);
}
}
public int saveTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) {

2
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -439,7 +439,7 @@ public class ProcessServiceTest {
String expect = JSONUtils.toJsonString(exceptProcessData);
String oldJson = JSONUtils.toJsonString(oldProcessData);
Assert.assertEquals(expect, processService.changeJson(newProcessData,oldJson));
Assert.assertEquals(expect, processService.changeJson(newProcessData, oldJson));
}
}

2
sql/dolphinscheduler_mysql.sql

@ -692,7 +692,7 @@ DROP TABLE IF EXISTS `t_ds_relation_project_user`;
CREATE TABLE `t_ds_relation_project_user` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
`user_id` int(11) NOT NULL COMMENT 'user id',
`project_id` int(11) DEFAULT NULL COMMENT 'project id',
`project_code` bigint(20) DEFAULT NULL COMMENT 'project code',
`perm` int(11) DEFAULT '1' COMMENT 'limits of authority',
`create_time` datetime DEFAULT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time',

Loading…
Cancel
Save