Browse Source

[Fix-6139][API]fix envCode in task_definition and process switchVersion (#6231)

* fix envCode in task_definition and process switchVersion

* fix conflicts

Co-authored-by: JinyLeeChina <297062848@qq.com>
2.0.7-release
JinyLeeChina 3 years ago committed by GitHub
parent
commit
9d0b00c1b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 9
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
  3. 8
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
  4. 8
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
  5. 51
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -1342,9 +1342,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result; return result;
} }
ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version);
.queryByDefinitionCodeAndVersion(code, version);
if (Objects.isNull(processDefinitionLog)) { if (Objects.isNull(processDefinitionLog)) {
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR, processDefinition.getCode(), version); putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR, processDefinition.getCode(), version);
return result; return result;

9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java

@ -137,7 +137,7 @@ public class TaskDefinition {
/** /**
* environment code * environment code
*/ */
private Long environmentCode; private long environmentCode;
/** /**
* fail retry times * fail retry times
@ -424,11 +424,11 @@ public class TaskDefinition {
this.modifyBy = modifyBy; this.modifyBy = modifyBy;
} }
public Long getEnvironmentCode() { public long getEnvironmentCode() {
return this.environmentCode; return this.environmentCode;
} }
public void setEnvironmentCode(Long environmentCode) { public void setEnvironmentCode(long environmentCode) {
this.environmentCode = environmentCode; this.environmentCode = environmentCode;
} }
@ -451,7 +451,8 @@ public class TaskDefinition {
&& Objects.equals(workerGroup, that.workerGroup) && Objects.equals(workerGroup, that.workerGroup)
&& timeoutFlag == that.timeoutFlag && timeoutFlag == that.timeoutFlag
&& timeoutNotifyStrategy == that.timeoutNotifyStrategy && timeoutNotifyStrategy == that.timeoutNotifyStrategy
&& Objects.equals(resourceIds, that.resourceIds); && Objects.equals(resourceIds, that.resourceIds)
&& environmentCode == that.environmentCode;
} }
@Override @Override

8
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml

@ -56,10 +56,10 @@
<foreach collection="taskDefinitionLogs" item="taskDefinitionLog" separator=","> <foreach collection="taskDefinitionLogs" item="taskDefinitionLog" separator=",">
(#{taskDefinitionLog.code},#{taskDefinitionLog.name},#{taskDefinitionLog.version},#{taskDefinitionLog.description}, (#{taskDefinitionLog.code},#{taskDefinitionLog.name},#{taskDefinitionLog.version},#{taskDefinitionLog.description},
#{taskDefinitionLog.projectCode},#{taskDefinitionLog.userId},#{taskDefinitionLog.taskType},#{taskDefinitionLog.taskParams}, #{taskDefinitionLog.projectCode},#{taskDefinitionLog.userId},#{taskDefinitionLog.taskType},#{taskDefinitionLog.taskParams},
#{taskDefinitionLog.flag},#{taskDefinitionLog.taskPriority},#{taskDefinitionLog.workerGroup},#{taskDefinitionLog.environmentCode},#{taskDefinitionLog.failRetryTimes}, #{taskDefinitionLog.flag},#{taskDefinitionLog.taskPriority},#{taskDefinitionLog.workerGroup},#{taskDefinition.environmentCode},
#{taskDefinitionLog.failRetryInterval},#{taskDefinitionLog.timeoutFlag},#{taskDefinitionLog.timeoutNotifyStrategy},#{taskDefinitionLog.timeout}, #{taskDefinitionLog.failRetryTimes},#{taskDefinitionLog.failRetryInterval},#{taskDefinitionLog.timeoutFlag},#{taskDefinitionLog.timeoutNotifyStrategy},
#{taskDefinitionLog.delayTime},#{taskDefinitionLog.resourceIds},#{taskDefinitionLog.operator},#{taskDefinitionLog.operateTime}, #{taskDefinitionLog.timeout},#{taskDefinitionLog.delayTime},#{taskDefinitionLog.resourceIds},#{taskDefinitionLog.operator},#{taskDefinitionLog.operateTime},
#{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime}) #{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime})
</foreach> </foreach>
</insert> </insert>
<delete id="deleteByCodeAndVersion"> <delete id="deleteByCodeAndVersion">

8
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

@ -80,15 +80,15 @@
values values
<foreach collection="taskDefinitions" item="taskDefinition" separator=","> <foreach collection="taskDefinitions" item="taskDefinition" separator=",">
(#{taskDefinition.code},#{taskDefinition.name},#{taskDefinition.version},#{taskDefinition.description}, (#{taskDefinition.code},#{taskDefinition.name},#{taskDefinition.version},#{taskDefinition.description},
#{taskDefinition.projectCode},#{taskDefinition.userId},#{taskDefinition.taskType},#{taskDefinition.taskParams}, #{taskDefinition.projectCode},#{taskDefinition.userId},#{taskDefinition.taskType},#{taskDefinition.taskParams},#{taskDefinition.flag},
#{taskDefinition.flag},#{taskDefinition.taskPriority},#{taskDefinition.workerGroup},#{taskDefinition.environmentCode},#{taskDefinition.failRetryTimes}, #{taskDefinition.taskPriority},#{taskDefinition.workerGroup},#{taskDefinition.environmentCode},#{taskDefinition.failRetryTimes},
#{taskDefinition.failRetryInterval},#{taskDefinition.timeoutFlag},#{taskDefinition.timeoutNotifyStrategy},#{taskDefinition.timeout}, #{taskDefinition.failRetryInterval},#{taskDefinition.timeoutFlag},#{taskDefinition.timeoutNotifyStrategy},#{taskDefinition.timeout},
#{taskDefinition.delayTime},#{taskDefinition.resourceIds},#{taskDefinition.createTime},#{taskDefinition.updateTime}) #{taskDefinition.delayTime},#{taskDefinition.resourceIds},#{taskDefinition.createTime},#{taskDefinition.updateTime})
</foreach> </foreach>
</insert> </insert>
<select id="queryDefineListPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition"> <select id="queryDefineListPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select td.id, td.code, td.name, td.version, td.description, td.project_code, td.user_id, td.task_type, td.task_params, select td.id, td.code, td.name, td.version, td.description, td.project_code, td.user_id, td.task_type, td.task_params, td.flag,
td.flag, td.task_priority, td.worker_group,td.environment_code, td.fail_retry_times, td.fail_retry_interval, td.timeout_flag, td.timeout_notify_strategy, td.task_priority, td.worker_group, td.environment_code, td.fail_retry_times, td.fail_retry_interval, td.timeout_flag, td.timeout_notify_strategy,
td.timeout, td.delay_time, td.resource_ids, td.create_time, td.update_time, u.user_name,p.name as project_name td.timeout, td.delay_time, td.resource_ids, td.create_time, td.update_time, u.user_name,p.name as project_name
from t_ds_task_definition td from t_ds_task_definition td
JOIN t_ds_user u ON td.user_id = u.id JOIN t_ds_user u ON td.user_id = u.id

51
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2101,64 +2101,31 @@ public class ProcessService {
/** /**
* switch process definition version to process definition log version * switch process definition version to process definition log version
*/ */
public int processDefinitionToDB(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog, Boolean isFromProcessDefine) { public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) {
if (null == processDefinition || null == processDefinitionLog) { if (null == processDefinition || null == processDefinitionLog) {
return Constants.DEFINITION_FAILURE; return Constants.DEFINITION_FAILURE;
} }
processDefinitionLog.setId(processDefinition.getId()); processDefinitionLog.setId(processDefinition.getId());
processDefinitionLog.setReleaseState(isFromProcessDefine ? ReleaseState.OFFLINE : ReleaseState.ONLINE); processDefinitionLog.setReleaseState(ReleaseState.OFFLINE);
processDefinitionLog.setFlag(Flag.YES); processDefinitionLog.setFlag(Flag.YES);
int result; int result = processDefineMapper.updateById(processDefinitionLog);
if (0 == processDefinition.getId()) { if (result > 0) {
result = processDefineMapper.insert(processDefinitionLog); result = switchProcessTaskRelationVersion(processDefinition);
} else { if (result <= 0) {
result = processDefineMapper.updateById(processDefinitionLog); return Constants.DEFINITION_FAILURE;
}
} }
return result; return result;
} }
/**
* switch process definition version to process definition log version
*/
public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) {
int switchResult = processDefinitionToDB(processDefinition, processDefinitionLog, true);
if (switchResult != Constants.DEFINITION_FAILURE) {
switchResult = switchProcessTaskRelationVersion(processDefinition);
}
return switchResult;
}
public int switchProcessTaskRelationVersion(ProcessDefinition processDefinition) { public int switchProcessTaskRelationVersion(ProcessDefinition processDefinition) {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()); List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
if (!processTaskRelationList.isEmpty()) { if (!processTaskRelationList.isEmpty()) {
processTaskRelationMapper.deleteByCode(processDefinition.getProjectCode(), processDefinition.getCode()); processTaskRelationMapper.deleteByCode(processDefinition.getProjectCode(), processDefinition.getCode());
} }
int result = 0;
List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
for (ProcessTaskRelationLog processTaskRelationLog : processTaskRelationLogList) { return processTaskRelationMapper.batchInsert(processTaskRelationLogList);
result += processTaskRelationMapper.insert(processTaskRelationLog);
}
return result;
}
private void setTaskFromTaskNode(TaskNode taskNode, TaskDefinition taskDefinition) {
taskDefinition.setName(taskNode.getName());
taskDefinition.setDescription(taskNode.getDesc());
taskDefinition.setTaskType(taskNode.getType().toUpperCase());
taskDefinition.setTaskParams(taskNode.getTaskParams());
taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES);
taskDefinition.setTaskPriority(taskNode.getTaskInstancePriority());
taskDefinition.setWorkerGroup(taskNode.getWorkerGroup());
taskDefinition.setEnvironmentCode(Objects.isNull(taskNode.getEnvironmentCode()) ? -1 : taskNode.getEnvironmentCode());
taskDefinition.setFailRetryTimes(taskNode.getMaxRetryTimes());
taskDefinition.setFailRetryInterval(taskNode.getRetryInterval());
taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
taskDefinition.setTimeoutNotifyStrategy(taskNode.getTaskTimeoutParameter().getStrategy());
taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval());
taskDefinition.setDelayTime(taskNode.getDelayTime());
taskDefinition.setResourceIds(getResourceIds(taskDefinition));
} }
/** /**

Loading…
Cancel
Save