Browse Source

[Feature][JsonSplit] add task query (#4619)

* add task query

* modify codestyle

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 4 years ago committed by GitHub
parent
commit
47e620fbc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  2. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
  3. 17
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
  4. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  5. 60
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
  6. 34
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  7. 18
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
  8. 19
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
  9. 21
      dolphinscheduler-dist/release-docs/licenses/ui-licenses/LICENSE-@form-create-element-ui

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -252,7 +252,7 @@ public enum Status {
EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"), EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"),
BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028,"batch export process definition by ids error", "批量导出工作流定义错误"), BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028,"batch export process definition by ids error", "批量导出工作流定义错误"),
IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"), IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"),
TASK_DEFINE_NOT_EXIST(50030, "task definition {0} does not exist", "任务定义[{0}]不存在"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
/** /**

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java

@ -31,7 +31,7 @@ public interface ProcessTaskRelationService {
* *
* @param loginUser login user * @param loginUser login user
* @param name relation name * @param name relation name
* @param projectCode process code * @param projectName process name
* @param processDefinitionCode process definition code * @param processDefinitionCode process definition code
* @param preTaskCode pre task code * @param preTaskCode pre task code
* @param postTaskCode post task code * @param postTaskCode post task code
@ -41,7 +41,7 @@ public interface ProcessTaskRelationService {
*/ */
Map<String, Object> createProcessTaskRelation(User loginUser, Map<String, Object> createProcessTaskRelation(User loginUser,
String name, String name,
Long projectCode, String projectName,
Long processDefinitionCode, Long processDefinitionCode,
Long preTaskCode, Long preTaskCode,
Long postTaskCode, Long postTaskCode,

17
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java

@ -17,13 +17,10 @@
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import java.util.Map; import java.util.Map;
import com.fasterxml.jackson.core.JsonProcessingException;
/** /**
* task definition service * task definition service
*/ */
@ -35,10 +32,20 @@ public interface TaskDefinitionService {
* @param loginUser login user * @param loginUser login user
* @param projectName project name * @param projectName project name
* @param taskDefinitionJson task definition json * @param taskDefinitionJson task definition json
* @throws JsonProcessingException JsonProcessingException
*/ */
Map<String, Object> createTaskDefinition(User loginUser, Map<String, Object> createTaskDefinition(User loginUser,
String projectName, String projectName,
String taskDefinitionJson) throws JsonProcessingException, SnowFlakeException; String taskDefinitionJson);
/**
* query task definition
*
* @param loginUser login user
* @param projectName project name
* @param taskName task name
*/
Map<String, Object> queryTaskDefinitionByName(User loginUser,
String projectName,
String taskName);
} }

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -253,7 +253,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
processTaskRelationService.createProcessTaskRelation( processTaskRelationService.createProcessTaskRelation(
loginUser, loginUser,
name, name,
project.getCode(), projectName,
processDefinitionCode, processDefinitionCode,
0L, 0L,
0L, 0L,

60
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@ -20,12 +20,20 @@ package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.BaseService; import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService; import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ConditionType; import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
@ -46,8 +54,14 @@ public class ProcessTaskRelationServiceImpl extends BaseService implements
private static final Logger logger = LoggerFactory.getLogger(ProcessTaskRelationServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(ProcessTaskRelationServiceImpl.class);
//@Autowired @Autowired
//private ProjectMapper projectMapper; private ProjectMapper projectMapper;
@Autowired
private ProjectService projectService;
@Autowired
private ProcessDefinitionMapper processDefineMapper;
@Autowired @Autowired
private ProcessTaskRelationMapper processTaskRelationMapper; private ProcessTaskRelationMapper processTaskRelationMapper;
@ -55,12 +69,15 @@ public class ProcessTaskRelationServiceImpl extends BaseService implements
@Autowired @Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper; private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
/** /**
* create process task relation * create process task relation
* *
* @param loginUser login user * @param loginUser login user
* @param name relation name * @param name relation name
* @param projectCode process code * @param projectName process name
* @param processDefinitionCode process definition code * @param processDefinitionCode process definition code
* @param preTaskCode pre task code * @param preTaskCode pre task code
* @param postTaskCode post task code * @param postTaskCode post task code
@ -72,20 +89,39 @@ public class ProcessTaskRelationServiceImpl extends BaseService implements
@Override @Override
public Map<String, Object> createProcessTaskRelation(User loginUser, public Map<String, Object> createProcessTaskRelation(User loginUser,
String name, String name,
Long projectCode, String projectName,
Long processDefinitionCode, Long processDefinitionCode,
Long preTaskCode, Long preTaskCode,
Long postTaskCode, Long postTaskCode,
String conditionType, String conditionType,
String conditionParams) { String conditionParams) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
// TODO check projectCode Project project = projectMapper.queryByName(projectName);
// TODO check processDefinitionCode // check project auth
// TODO check preTaskCode and postTaskCode Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
if (resultStatus != Status.SUCCESS) {
return checkResult;
}
// check processDefinitionCode
ProcessDefinition processDefinition = processDefineMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
return result;
}
// check preTaskCode and postTaskCode
checkTaskDefinitionCode(result, preTaskCode);
if (postTaskCode > 0) {
checkTaskDefinitionCode(result, postTaskCode);
}
resultStatus = (Status) result.get(Constants.STATUS);
if (resultStatus != Status.SUCCESS) {
return result;
}
Date now = new Date(); Date now = new Date();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(name, ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(name,
1, 1,
projectCode, project.getCode(),
processDefinitionCode, processDefinitionCode,
preTaskCode, preTaskCode,
postTaskCode, postTaskCode,
@ -104,5 +140,11 @@ public class ProcessTaskRelationServiceImpl extends BaseService implements
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
return result; return result;
} }
}
private void checkTaskDefinitionCode(Map<String, Object> result, Long taskCode) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(taskCode);
if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
}
}
}

34
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -84,7 +84,7 @@ public class TaskDefinitionServiceImpl extends BaseService implements
* @param projectName project name * @param projectName project name
* @param taskDefinitionJson task definition json * @param taskDefinitionJson task definition json
*/ */
@Transactional @Transactional(rollbackFor = Exception.class)
@Override @Override
public Map<String, Object> createTaskDefinition(User loginUser, public Map<String, Object> createTaskDefinition(User loginUser,
String projectName, String projectName,
@ -117,7 +117,7 @@ public class TaskDefinitionServiceImpl extends BaseService implements
logger.error("Task code get error, ", e); logger.error("Task code get error, ", e);
} }
if (code == 0L) { if (code == 0L) {
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);// TODO code message putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
return result; return result;
} }
Date now = new Date(); Date now = new Date();
@ -125,7 +125,7 @@ public class TaskDefinitionServiceImpl extends BaseService implements
taskNode.getName(), taskNode.getName(),
1, 1,
taskNode.getDesc(), taskNode.getDesc(),
0L, // TODO project.getCode() project.getCode(),
loginUser.getId(), loginUser.getId(),
TaskType.of(taskNode.getType()), TaskType.of(taskNode.getType()),
taskNode.getParams(), taskNode.getParams(),
@ -176,5 +176,33 @@ public class TaskDefinitionServiceImpl extends BaseService implements
return StringUtils.join(resourceIds, ","); return StringUtils.join(resourceIds, ",");
} }
/**
* query task definition
*
* @param loginUser login user
* @param projectName project name
* @param taskName task name
*/
@Override
public Map<String, Object> queryTaskDefinitionByName(User loginUser, String projectName, String taskName) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
if (resultStatus != Status.SUCCESS) {
return checkResult;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionName(project.getCode(), taskName);
if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskName);
} else {
result.put(Constants.DATA_LIST, taskDefinition);
putMsg(result, Status.SUCCESS);
}
return result;
}
} }

18
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java

@ -33,16 +33,6 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
*/ */
public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> { public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
/**
* verify task definition by name
*
* @param projectCode projectCode
* @param name name
* @return task definition
*/
TaskDefinition verifyByDefineName(@Param("projectCode") Long projectCode,
@Param("taskDefinitionName") String name);
/** /**
* query task definition by name * query task definition by name
* *
@ -61,6 +51,14 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
*/ */
TaskDefinition queryByDefinitionId(@Param("taskDefinitionId") int taskDefinitionId); TaskDefinition queryByDefinitionId(@Param("taskDefinitionId") int taskDefinitionId);
/**
* query task definition by code
*
* @param taskDefinitionCode taskDefinitionCode
* @return task definition
*/
TaskDefinition queryByDefinitionCode(@Param("taskDefinitionCode") Long taskDefinitionCode);
/** /**
* query all task definition list * query all task definition list
* *

19
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

@ -23,24 +23,13 @@
worker_group, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, worker_group, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout,
resource_ids, create_time, update_time resource_ids, create_time, update_time
</sql> </sql>
<select id="verifyByDefineName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition"> <select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select select
<include refid="baseSql"/> <include refid="baseSql"/>
from t_ds_task_definition from t_ds_task_definition
WHERE projectCode = #{projectCode} WHERE projectCode = #{projectCode}
and `name` = #{taskDefinitionName} and `name` = #{taskDefinitionName}
</select> </select>
<select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select td.id, td.code, td.name, td.version, td.description, td.project_code, td.user_id, td.task_type, td.task_params,
td.flag, td.task_priority, td.worker_group, td.fail_retry_times, td.fail_retry_interval, td.timeout_flag,
td.timeout_notify_strategy, td.timeout, td.resource_ids, td.create_time, td.update_time, u.user_name,p.name as project_name
from t_ds_task_definition td
JOIN t_ds_user u ON td.user_id = u.id
JOIN t_ds_project p ON td.project_code = p.code
WHERE p.code = #{projectCode}
and td.name = #{taskDefinitionName}
</select>
<select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition"> <select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select select
<include refid="baseSql"/> <include refid="baseSql"/>
@ -81,6 +70,12 @@
JOIN t_ds_project p ON td.project_code = p.code JOIN t_ds_project p ON td.project_code = p.code
WHERE td.id = #{taskDefinitionId} WHERE td.id = #{taskDefinitionId}
</select> </select>
<select id="queryByDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select
<include refid="baseSql"/>
from t_ds_task_definition
where code = #{taskDefinitionCode}
</select>
<select id="listResources" resultType="java.util.HashMap"> <select id="listResources" resultType="java.util.HashMap">
SELECT id,resource_ids SELECT id,resource_ids
FROM t_ds_task_definition FROM t_ds_task_definition

21
dolphinscheduler-dist/release-docs/licenses/ui-licenses/LICENSE-@form-create-element-ui vendored

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2018 xaboy
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Loading…
Cancel
Save