Browse Source
* add task and relation save code * modify codestyle Co-authored-by: JinyLeeChina <297062848@qq.com>pull/3/MERGE
JinyLeeChina
4 years ago
committed by
GitHub
16 changed files with 670 additions and 53 deletions
@ -0,0 +1,52 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.api.service; |
||||
|
||||
import org.apache.dolphinscheduler.dao.entity.User; |
||||
|
||||
import java.util.Map; |
||||
|
||||
/** |
||||
* process task relation service |
||||
*/ |
||||
public interface ProcessTaskRelationService { |
||||
|
||||
/** |
||||
* create process task relation |
||||
* |
||||
* @param loginUser login user |
||||
* @param name relation name |
||||
* @param projectCode process code |
||||
* @param processDefinitionCode process definition code |
||||
* @param preTaskCode pre task code |
||||
* @param postTaskCode post task code |
||||
* @param conditionType condition type |
||||
* @param conditionParams condition params |
||||
* @return create result code |
||||
*/ |
||||
Map<String, Object> createProcessTaskRelation(User loginUser, |
||||
String name, |
||||
Long projectCode, |
||||
Long processDefinitionCode, |
||||
Long preTaskCode, |
||||
Long postTaskCode, |
||||
String conditionType, |
||||
String conditionParams); |
||||
|
||||
} |
||||
|
@ -0,0 +1,44 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.api.service; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException; |
||||
import org.apache.dolphinscheduler.dao.entity.User; |
||||
|
||||
import java.util.Map; |
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException; |
||||
|
||||
/** |
||||
* task definition service |
||||
*/ |
||||
public interface TaskDefinitionService { |
||||
|
||||
/** |
||||
* create task definition |
||||
* |
||||
* @param loginUser login user |
||||
* @param projectName project name |
||||
* @param taskDefinitionJson task definition json |
||||
* @throws JsonProcessingException JsonProcessingException |
||||
*/ |
||||
Map<String, Object> createTaskDefinition(User loginUser, |
||||
String projectName, |
||||
String taskDefinitionJson) throws JsonProcessingException, SnowFlakeException; |
||||
} |
||||
|
@ -0,0 +1,108 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.api.service.impl; |
||||
|
||||
import org.apache.dolphinscheduler.api.enums.Status; |
||||
import org.apache.dolphinscheduler.api.service.BaseService; |
||||
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService; |
||||
import org.apache.dolphinscheduler.common.enums.ConditionType; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; |
||||
import org.apache.dolphinscheduler.dao.entity.User; |
||||
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; |
||||
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; |
||||
|
||||
import java.util.Date; |
||||
import java.util.HashMap; |
||||
import java.util.Map; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Service; |
||||
import org.springframework.transaction.annotation.Transactional; |
||||
|
||||
/** |
||||
* task definition service impl |
||||
*/ |
||||
@Service |
||||
public class ProcessTaskRelationServiceImpl extends BaseService implements |
||||
ProcessTaskRelationService { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ProcessTaskRelationServiceImpl.class); |
||||
|
||||
//@Autowired
|
||||
//private ProjectMapper projectMapper;
|
||||
|
||||
@Autowired |
||||
private ProcessTaskRelationMapper processTaskRelationMapper; |
||||
|
||||
@Autowired |
||||
private ProcessTaskRelationLogMapper processTaskRelationLogMapper; |
||||
|
||||
/** |
||||
* create process task relation |
||||
* |
||||
* @param loginUser login user |
||||
* @param name relation name |
||||
* @param projectCode process code |
||||
* @param processDefinitionCode process definition code |
||||
* @param preTaskCode pre task code |
||||
* @param postTaskCode post task code |
||||
* @param conditionType condition type |
||||
* @param conditionParams condition params |
||||
* @return create result code |
||||
*/ |
||||
@Transactional |
||||
@Override |
||||
public Map<String, Object> createProcessTaskRelation(User loginUser, |
||||
String name, |
||||
Long projectCode, |
||||
Long processDefinitionCode, |
||||
Long preTaskCode, |
||||
Long postTaskCode, |
||||
String conditionType, |
||||
String conditionParams) { |
||||
Map<String, Object> result = new HashMap<>(); |
||||
// TODO check projectCode
|
||||
// TODO check processDefinitionCode
|
||||
// TODO check preTaskCode and postTaskCode
|
||||
Date now = new Date(); |
||||
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(name, |
||||
1, |
||||
projectCode, |
||||
processDefinitionCode, |
||||
preTaskCode, |
||||
postTaskCode, |
||||
ConditionType.of(conditionType), |
||||
conditionParams, |
||||
now, |
||||
now); |
||||
// save process task relation
|
||||
processTaskRelationMapper.insert(processTaskRelation); |
||||
// save process task relation log
|
||||
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); |
||||
processTaskRelationLog.set(processTaskRelation); |
||||
processTaskRelationLog.setOperator(loginUser.getId()); |
||||
processTaskRelationLog.setOperateTime(now); |
||||
processTaskRelationLogMapper.insert(processTaskRelationLog); |
||||
putMsg(result, Status.SUCCESS); |
||||
return result; |
||||
} |
||||
} |
||||
|
@ -0,0 +1,180 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.api.service.impl; |
||||
|
||||
import org.apache.dolphinscheduler.api.enums.Status; |
||||
import org.apache.dolphinscheduler.api.service.BaseService; |
||||
import org.apache.dolphinscheduler.api.service.ProjectService; |
||||
import org.apache.dolphinscheduler.api.service.TaskDefinitionService; |
||||
import org.apache.dolphinscheduler.api.utils.CheckUtils; |
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.Flag; |
||||
import org.apache.dolphinscheduler.common.enums.TaskType; |
||||
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; |
||||
import org.apache.dolphinscheduler.common.model.TaskNode; |
||||
import org.apache.dolphinscheduler.common.process.ResourceInfo; |
||||
import org.apache.dolphinscheduler.common.task.AbstractParameters; |
||||
import org.apache.dolphinscheduler.common.utils.CollectionUtils; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils; |
||||
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException; |
||||
import org.apache.dolphinscheduler.common.utils.StringUtils; |
||||
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.Project; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; |
||||
import org.apache.dolphinscheduler.dao.entity.User; |
||||
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; |
||||
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; |
||||
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; |
||||
|
||||
import java.util.Date; |
||||
import java.util.HashMap; |
||||
import java.util.Map; |
||||
import java.util.Set; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Service; |
||||
import org.springframework.transaction.annotation.Transactional; |
||||
|
||||
/** |
||||
* task definition service impl |
||||
*/ |
||||
@Service |
||||
public class TaskDefinitionServiceImpl extends BaseService implements |
||||
TaskDefinitionService { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(TaskDefinitionServiceImpl.class); |
||||
|
||||
@Autowired |
||||
private ProjectMapper projectMapper; |
||||
|
||||
@Autowired |
||||
private ProjectService projectService; |
||||
|
||||
@Autowired |
||||
private TaskDefinitionMapper taskDefinitionMapper; |
||||
|
||||
@Autowired |
||||
private TaskDefinitionLogMapper taskDefinitionLogMapper; |
||||
|
||||
|
||||
/** |
||||
* create task definition |
||||
* |
||||
* @param loginUser login user |
||||
* @param projectName project name |
||||
* @param taskDefinitionJson task definition json |
||||
*/ |
||||
@Transactional |
||||
@Override |
||||
public Map<String, Object> createTaskDefinition(User loginUser, |
||||
String projectName, |
||||
String taskDefinitionJson) { |
||||
|
||||
Map<String, Object> result = new HashMap<>(); |
||||
Project project = projectMapper.queryByName(projectName); |
||||
// check project auth
|
||||
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); |
||||
Status resultStatus = (Status) checkResult.get(Constants.STATUS); |
||||
if (resultStatus != Status.SUCCESS) { |
||||
return checkResult; |
||||
} |
||||
|
||||
TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class); |
||||
if (taskNode == null) { |
||||
logger.error("taskDefinitionJson is not valid json"); |
||||
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson); |
||||
return result; |
||||
} |
||||
if (!CheckUtils.checkTaskNodeParameters(taskNode.getParams(), taskNode.getName())) { |
||||
logger.error("task node {} parameter invalid", taskNode.getName()); |
||||
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskNode.getName()); |
||||
return result; |
||||
} |
||||
long code = 0L; |
||||
try { |
||||
code = SnowFlakeUtils.getInstance().nextId(); |
||||
} catch (SnowFlakeException e) { |
||||
logger.error("Task code get error, ", e); |
||||
} |
||||
if (code == 0L) { |
||||
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);// TODO code message
|
||||
return result; |
||||
} |
||||
Date now = new Date(); |
||||
TaskDefinition taskDefinition = new TaskDefinition(code, |
||||
taskNode.getName(), |
||||
1, |
||||
taskNode.getDesc(), |
||||
0L, // TODO project.getCode()
|
||||
loginUser.getId(), |
||||
TaskType.of(taskNode.getType()), |
||||
taskNode.getParams(), |
||||
taskNode.isForbidden() ? Flag.NO : Flag.YES, taskNode.getTaskInstancePriority(), |
||||
taskNode.getWorkerGroup(), taskNode.getMaxRetryTimes(), |
||||
taskNode.getRetryInterval(), |
||||
taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE, |
||||
taskNode.getTaskTimeoutParameter().getStrategy(), |
||||
taskNode.getTaskTimeoutParameter().getInterval(), |
||||
now, |
||||
now); |
||||
taskDefinition.setResourceIds(getResourceIds(taskDefinition)); |
||||
// save the new task definition
|
||||
taskDefinitionMapper.insert(taskDefinition); |
||||
// save task definition log
|
||||
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(); |
||||
taskDefinitionLog.set(taskDefinition); |
||||
taskDefinitionLog.setOperator(loginUser.getId()); |
||||
taskDefinitionLog.setOperateTime(now); |
||||
taskDefinitionLogMapper.insert(taskDefinitionLog); |
||||
// return taskDefinition object with code
|
||||
result.put(Constants.DATA_LIST, code); |
||||
putMsg(result, Status.SUCCESS); |
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* get resource ids |
||||
* |
||||
* @param taskDefinition taskDefinition |
||||
* @return resource ids |
||||
*/ |
||||
private String getResourceIds(TaskDefinition taskDefinition) { |
||||
Set<Integer> resourceIds = null; |
||||
// TODO modify taskDefinition.getTaskType()
|
||||
AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType().getDescp(), taskDefinition.getTaskParams()); |
||||
|
||||
if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) { |
||||
resourceIds = params.getResourceFilesList(). |
||||
stream() |
||||
.filter(t -> t.getId() != 0) |
||||
.map(ResourceInfo::getId) |
||||
.collect(Collectors.toSet()); |
||||
} |
||||
if (CollectionUtils.isEmpty(resourceIds)) { |
||||
return StringUtils.EMPTY; |
||||
} |
||||
return StringUtils.join(resourceIds, ","); |
||||
} |
||||
|
||||
} |
||||
|
@ -0,0 +1,29 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.dao.mapper; |
||||
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; |
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||
|
||||
/** |
||||
* process task relation log mapper interface
|
||||
*/ |
||||
public interface ProcessTaskRelationLogMapper extends BaseMapper<ProcessTaskRelationLog> { |
||||
|
||||
} |
@ -0,0 +1,40 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.dao.mapper; |
||||
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; |
||||
|
||||
import org.apache.ibatis.annotations.Param; |
||||
|
||||
import java.util.List; |
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||
|
||||
/** |
||||
* process task relation mapper interface
|
||||
*/ |
||||
public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelation> { |
||||
|
||||
/** |
||||
* process task relation by processDefinitionCode |
||||
* |
||||
* @param processDefinitionCode processDefinitionCode |
||||
* @return task definition |
||||
*/ |
||||
List<ProcessTaskRelation> queryByProcessDefinitionCode(@Param("processDefinitionCode") String processDefinitionCode); |
||||
} |
@ -0,0 +1,22 @@
|
||||
<?xml version="1.0" encoding="UTF-8" ?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > |
||||
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper"> |
||||
|
||||
</mapper> |
@ -0,0 +1,31 @@
|
||||
<?xml version="1.0" encoding="UTF-8" ?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > |
||||
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper"> |
||||
<sql id="baseSql"> |
||||
id, `name`, version, project_code, process_definition_code, pre_project_code, pre_task_code, post_project_code, |
||||
post_task_code, condition_type, condition_params, create_time, update_time |
||||
</sql> |
||||
<select id="queryByProcessDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation"> |
||||
select |
||||
<include refid="baseSql"/> |
||||
from t_ds_process_task_relation |
||||
WHERE process_definition_code = #{processDefinitionCode} |
||||
</select> |
||||
</mapper> |
Loading…
Reference in new issue