|
|
|
@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
|
|
|
|
|
import org.apache.dolphinscheduler.api.utils.CheckUtils; |
|
|
|
|
import org.apache.dolphinscheduler.common.Constants; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.Flag; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.ReleaseState; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.TaskType; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; |
|
|
|
|
import org.apache.dolphinscheduler.common.model.TaskNode; |
|
|
|
@ -35,16 +36,21 @@ import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
|
|
|
|
|
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException; |
|
|
|
|
import org.apache.dolphinscheduler.common.utils.StringUtils; |
|
|
|
|
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; |
|
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
|
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; |
|
|
|
|
import org.apache.dolphinscheduler.dao.entity.Project; |
|
|
|
|
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; |
|
|
|
|
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; |
|
|
|
|
import org.apache.dolphinscheduler.dao.entity.User; |
|
|
|
|
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; |
|
|
|
|
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; |
|
|
|
|
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; |
|
|
|
|
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; |
|
|
|
|
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; |
|
|
|
|
|
|
|
|
|
import java.util.Date; |
|
|
|
|
import java.util.HashMap; |
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.Map; |
|
|
|
|
import java.util.Set; |
|
|
|
|
import java.util.stream.Collectors; |
|
|
|
@ -76,6 +82,11 @@ public class TaskDefinitionServiceImpl extends BaseService implements
|
|
|
|
|
@Autowired |
|
|
|
|
private TaskDefinitionLogMapper taskDefinitionLogMapper; |
|
|
|
|
|
|
|
|
|
@Autowired |
|
|
|
|
private ProcessTaskRelationMapper processTaskRelationMapper; |
|
|
|
|
|
|
|
|
|
@Autowired |
|
|
|
|
private ProcessDefinitionMapper processDefinitionMapper; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* create task definition |
|
|
|
@ -100,14 +111,8 @@ public class TaskDefinitionServiceImpl extends BaseService implements
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class); |
|
|
|
|
if (taskNode == null) { |
|
|
|
|
logger.error("taskDefinitionJson is not valid json"); |
|
|
|
|
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson); |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
if (!CheckUtils.checkTaskNodeParameters(taskNode.getParams(), taskNode.getName())) { |
|
|
|
|
logger.error("task node {} parameter invalid", taskNode.getName()); |
|
|
|
|
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskNode.getName()); |
|
|
|
|
checkTaskNode(result, taskNode, taskDefinitionJson); |
|
|
|
|
if (result.get(Constants.STATUS) != Status.SUCCESS) { |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
long code = 0L; |
|
|
|
@ -129,8 +134,10 @@ public class TaskDefinitionServiceImpl extends BaseService implements
|
|
|
|
|
loginUser.getId(), |
|
|
|
|
TaskType.of(taskNode.getType()), |
|
|
|
|
taskNode.getParams(), |
|
|
|
|
taskNode.isForbidden() ? Flag.NO : Flag.YES, taskNode.getTaskInstancePriority(), |
|
|
|
|
taskNode.getWorkerGroup(), taskNode.getMaxRetryTimes(), |
|
|
|
|
taskNode.isForbidden() ? Flag.NO : Flag.YES, |
|
|
|
|
taskNode.getTaskInstancePriority(), |
|
|
|
|
taskNode.getWorkerGroup(), |
|
|
|
|
taskNode.getMaxRetryTimes(), |
|
|
|
|
taskNode.getRetryInterval(), |
|
|
|
|
taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE, |
|
|
|
|
taskNode.getTaskTimeoutParameter().getStrategy(), |
|
|
|
@ -204,5 +211,188 @@ public class TaskDefinitionServiceImpl extends BaseService implements
|
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* delete task definition |
|
|
|
|
* |
|
|
|
|
* @param loginUser login user |
|
|
|
|
* @param projectName project name |
|
|
|
|
* @param taskCode task code |
|
|
|
|
*/ |
|
|
|
|
@Transactional(rollbackFor = RuntimeException.class) |
|
|
|
|
@Override |
|
|
|
|
public Map<String, Object> deleteTaskDefinitionByCode(User loginUser, String projectName, Long taskCode) { |
|
|
|
|
Map<String, Object> result = new HashMap<>(5); |
|
|
|
|
Project project = projectMapper.queryByName(projectName); |
|
|
|
|
|
|
|
|
|
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); |
|
|
|
|
Status resultEnum = (Status) checkResult.get(Constants.STATUS); |
|
|
|
|
if (resultEnum != Status.SUCCESS) { |
|
|
|
|
return checkResult; |
|
|
|
|
} |
|
|
|
|
checkTaskRelation(result, taskCode); |
|
|
|
|
resultEnum = (Status) result.get(Constants.STATUS); |
|
|
|
|
if (resultEnum != Status.SUCCESS) { |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
int delete = taskDefinitionMapper.deleteByCode(taskCode); |
|
|
|
|
if (delete > 0) { |
|
|
|
|
putMsg(result, Status.SUCCESS); |
|
|
|
|
} else { |
|
|
|
|
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR); |
|
|
|
|
} |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* update task definition |
|
|
|
|
* |
|
|
|
|
* @param loginUser login user |
|
|
|
|
* @param projectName project name |
|
|
|
|
* @param taskCode task code |
|
|
|
|
* @param taskDefinitionJson task definition json |
|
|
|
|
*/ |
|
|
|
|
@Override |
|
|
|
|
public Map<String, Object> updateTaskDefinition(User loginUser, String projectName, Long taskCode, String taskDefinitionJson) { |
|
|
|
|
Map<String, Object> result = new HashMap<>(5); |
|
|
|
|
Project project = projectMapper.queryByName(projectName); |
|
|
|
|
|
|
|
|
|
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); |
|
|
|
|
Status resultEnum = (Status) checkResult.get(Constants.STATUS); |
|
|
|
|
if (resultEnum != Status.SUCCESS) { |
|
|
|
|
return checkResult; |
|
|
|
|
} |
|
|
|
|
checkTaskRelation(result, taskCode); |
|
|
|
|
if (result.get(Constants.STATUS) != Status.SUCCESS) { |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(taskCode); |
|
|
|
|
if (taskDefinition == null) { |
|
|
|
|
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class); |
|
|
|
|
checkTaskNode(result, taskNode, taskDefinitionJson); |
|
|
|
|
if (result.get(Constants.STATUS) != Status.SUCCESS) { |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByDefinitionCode(taskCode); |
|
|
|
|
int version = taskDefinitionLogs |
|
|
|
|
.stream() |
|
|
|
|
.map(TaskDefinitionLog::getVersion) |
|
|
|
|
.max((x, y) -> x > y ? x : y) |
|
|
|
|
.orElse(0) + 1; |
|
|
|
|
Date now = new Date(); |
|
|
|
|
taskDefinition.setVersion(version); |
|
|
|
|
taskDefinition.setCode(taskCode); |
|
|
|
|
taskDefinition.setName(taskNode.getName()); |
|
|
|
|
taskDefinition.setDescription(taskNode.getDesc()); |
|
|
|
|
taskDefinition.setProjectCode(project.getCode()); |
|
|
|
|
taskDefinition.setUserId(loginUser.getId()); |
|
|
|
|
taskDefinition.setTaskType(TaskType.of(taskNode.getType())); |
|
|
|
|
taskDefinition.setTaskParams(taskNode.getParams()); |
|
|
|
|
taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES); |
|
|
|
|
taskDefinition.setTaskPriority(taskNode.getTaskInstancePriority()); |
|
|
|
|
taskDefinition.setWorkerGroup(taskNode.getWorkerGroup()); |
|
|
|
|
taskDefinition.setFailRetryTimes(taskNode.getMaxRetryTimes()); |
|
|
|
|
taskDefinition.setFailRetryInterval(taskNode.getRetryInterval()); |
|
|
|
|
taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE); |
|
|
|
|
taskDefinition.setTaskTimeoutStrategy(taskNode.getTaskTimeoutParameter().getStrategy()); |
|
|
|
|
taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval()); |
|
|
|
|
taskDefinition.setUpdateTime(now); |
|
|
|
|
taskDefinition.setResourceIds(getResourceIds(taskDefinition)); |
|
|
|
|
taskDefinitionMapper.updateById(taskDefinition); |
|
|
|
|
// save task definition log
|
|
|
|
|
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(); |
|
|
|
|
taskDefinitionLog.set(taskDefinition); |
|
|
|
|
taskDefinitionLog.setOperator(loginUser.getId()); |
|
|
|
|
taskDefinitionLog.setOperateTime(now); |
|
|
|
|
taskDefinitionLogMapper.insert(taskDefinitionLog); |
|
|
|
|
result.put(Constants.DATA_LIST, taskCode); |
|
|
|
|
putMsg(result, Status.SUCCESS); |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public void checkTaskRelation(Map<String, Object> result, Long taskCode) { |
|
|
|
|
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode, taskCode); |
|
|
|
|
if (!processTaskRelationList.isEmpty()) { |
|
|
|
|
Set<Long> processDefinitionCodes = processTaskRelationList |
|
|
|
|
.stream() |
|
|
|
|
.map(ProcessTaskRelation::getProcessDefinitionCode) |
|
|
|
|
.collect(Collectors.toSet()); |
|
|
|
|
List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryByCodes(processDefinitionCodes); |
|
|
|
|
// check process definition is already online
|
|
|
|
|
for (ProcessDefinition processDefinition : processDefinitionList) { |
|
|
|
|
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { |
|
|
|
|
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinition.getCode()); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public void checkTaskNode(Map<String, Object> result, TaskNode taskNode, String taskDefinitionJson) { |
|
|
|
|
if (taskNode == null) { |
|
|
|
|
logger.error("taskDefinitionJson is not valid json"); |
|
|
|
|
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (!CheckUtils.checkTaskNodeParameters(taskNode.getParams(), taskNode.getName())) { |
|
|
|
|
logger.error("task node {} parameter invalid", taskNode.getName()); |
|
|
|
|
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskNode.getName()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* update task definition |
|
|
|
|
* |
|
|
|
|
* @param loginUser login user |
|
|
|
|
* @param projectName project name |
|
|
|
|
* @param taskCode task code |
|
|
|
|
* @param version the version user want to switch |
|
|
|
|
*/ |
|
|
|
|
@Override |
|
|
|
|
public Map<String, Object> switchVersion(User loginUser, String projectName, Long taskCode, int version) { |
|
|
|
|
Map<String, Object> result = new HashMap<>(5); |
|
|
|
|
Project project = projectMapper.queryByName(projectName); |
|
|
|
|
|
|
|
|
|
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); |
|
|
|
|
Status resultEnum = (Status) checkResult.get(Constants.STATUS); |
|
|
|
|
if (resultEnum != Status.SUCCESS) { |
|
|
|
|
return checkResult; |
|
|
|
|
} |
|
|
|
|
checkTaskRelation(result, taskCode); |
|
|
|
|
if (result.get(Constants.STATUS) != Status.SUCCESS) { |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(taskCode); |
|
|
|
|
if (taskDefinition == null) { |
|
|
|
|
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version); |
|
|
|
|
taskDefinition.setVersion(version); |
|
|
|
|
taskDefinition.setCode(taskCode); |
|
|
|
|
taskDefinition.setName(taskDefinitionLog.getName()); |
|
|
|
|
taskDefinition.setDescription(taskDefinitionLog.getDescription()); |
|
|
|
|
taskDefinition.setProjectCode(taskDefinitionLog.getProjectCode()); |
|
|
|
|
taskDefinition.setUserId(loginUser.getId()); |
|
|
|
|
taskDefinition.setTaskType(taskDefinitionLog.getTaskType()); |
|
|
|
|
taskDefinition.setTaskParams(taskDefinitionLog.getTaskParams()); |
|
|
|
|
taskDefinition.setFlag(taskDefinitionLog.getFlag()); |
|
|
|
|
taskDefinition.setTaskPriority(taskDefinitionLog.getTaskPriority()); |
|
|
|
|
taskDefinition.setWorkerGroup(taskDefinitionLog.getWorkerGroup()); |
|
|
|
|
taskDefinition.setFailRetryTimes(taskDefinitionLog.getFailRetryTimes()); |
|
|
|
|
taskDefinition.setFailRetryInterval(taskDefinitionLog.getFailRetryInterval()); |
|
|
|
|
taskDefinition.setTimeoutFlag(taskDefinitionLog.getTimeoutFlag()); |
|
|
|
|
taskDefinition.setTaskTimeoutStrategy(taskDefinitionLog.getTaskTimeoutStrategy()); |
|
|
|
|
taskDefinition.setTimeout(taskDefinitionLog.getTimeout()); |
|
|
|
|
taskDefinition.setUpdateTime(new Date()); |
|
|
|
|
taskDefinition.setResourceIds(taskDefinitionLog.getResourceIds()); |
|
|
|
|
taskDefinitionMapper.updateById(taskDefinition); |
|
|
|
|
result.put(Constants.DATA_LIST, taskCode); |
|
|
|
|
putMsg(result, Status.SUCCESS); |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|