Browse Source

[fix][Python] Support same task name in project defferent process definition (#10428)

close: #10431
k8s/config
陈家名 2 years ago committed by GitHub
parent
commit
b86dc53ad1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
  3. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  4. 123
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
  5. 6
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  6. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
  7. 20
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
  8. 25
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java
  9. 2
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py

14
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -166,7 +166,7 @@ public class PythonGateway {
return taskDefinitionService.genTaskCodeList(genNum);
}
public Map<String, Long> getCodeAndVersion(String projectName, String taskName) throws CodeGenerateUtils.CodeGenerateException {
public Map<String, Long> getCodeAndVersion(String projectName, String processDefinitionName, String taskName) throws CodeGenerateUtils.CodeGenerateException {
Project project = projectMapper.queryByName(projectName);
Map<String, Long> result = new HashMap<>();
// project do not exists, mean task not exists too, so we should directly return init value
@ -175,7 +175,15 @@ public class PythonGateway {
result.put("version", 0L);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName);
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
if (processDefinition == null) {
String msg = String.format("Can not find valid process definition by name %s", processDefinitionName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskName);
if (taskDefinition == null) {
result.put("code", CodeGenerateUtils.getInstance().genCode());
result.put("version", 0L);
@ -520,7 +528,7 @@ public class PythonGateway {
result.put("processDefinitionCode", processDefinition.getCode());
if (taskName != null) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(projectCode, taskName);
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(projectCode, processDefinition.getCode(), taskName);
result.put("taskDefinitionCode", taskDefinition.getCode());
}
return result;

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java

@ -60,10 +60,12 @@ public interface TaskDefinitionService {
*
* @param loginUser login user
* @param projectCode project code
* @param processCode process code
* @param taskName task name
*/
Map<String, Object> queryTaskDefinitionByName(User loginUser,
long projectCode,
long processCode,
String taskName);
/**

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -272,10 +272,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
*
* @param loginUser login user
* @param projectCode project code
* @param processCode process code
* @param taskName task name
*/
@Override
public Map<String, Object> queryTaskDefinitionByName(User loginUser, long projectCode, String taskName) {
public Map<String, Object> queryTaskDefinitionByName(User loginUser, long projectCode, long processCode, String taskName) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION);
@ -283,7 +284,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName);
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processCode, taskName);
if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskName);
} else {

123
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.python;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Date;
import java.util.Map;
/**
* python gate test
*/
@RunWith(MockitoJUnitRunner.class)
public class PythonGatewayTest {
@InjectMocks
private PythonGateway pythonGateway;
@Mock
private ProjectMapper projectMapper;
@Mock
private ProcessDefinitionMapper processDefinitionMapper;
@Mock
private TaskDefinitionMapper taskDefinitionMapper;
@Test
public void testGetCodeAndVersion() throws CodeGenerateUtils.CodeGenerateException {
Project project = getTestProject();
Mockito.when(projectMapper.queryByName(project.getName())).thenReturn(project);
ProcessDefinition processDefinition = getTestProcessDefinition();
Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), processDefinition.getName())).thenReturn(processDefinition);
TaskDefinition taskDefinition = getTestTaskDefinition();
Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskDefinition.getName())).thenReturn(taskDefinition);
Map<String, Long> result = pythonGateway.getCodeAndVersion(project.getName(), processDefinition.getName(), taskDefinition.getName());
Assert.assertEquals(result.get("code").longValue(), taskDefinition.getCode());
}
@Test
public void testGetDependentInfo() {
Project project = getTestProject();
Mockito.when(projectMapper.queryByName(project.getName())).thenReturn(project);
ProcessDefinition processDefinition = getTestProcessDefinition();
Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), processDefinition.getName())).thenReturn(processDefinition);
TaskDefinition taskDefinition = getTestTaskDefinition();
Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskDefinition.getName())).thenReturn(taskDefinition);
Map<String, Object> result = pythonGateway.getDependentInfo(project.getName(), processDefinition.getName(), taskDefinition.getName());
Assert.assertEquals((long) result.get("taskDefinitionCode"), taskDefinition.getCode());
}
private Project getTestProject() {
Project project = new Project();
project.setName("ut-project");
project.setUserId(111);
project.setCode(1L);
project.setCreateTime(new Date());
project.setUpdateTime(new Date());
return project;
}
private ProcessDefinition getTestProcessDefinition() {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(1L);
processDefinition.setName("ut-process-definition");
processDefinition.setProjectCode(1L);
processDefinition.setUserId(111);
processDefinition.setUpdateTime(new Date());
processDefinition.setCreateTime(new Date());
return processDefinition;
}
private TaskDefinition getTestTaskDefinition() {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setCode(888888L);
taskDefinition.setName("ut-task-definition");
taskDefinition.setProjectCode(1L);
taskDefinition.setTaskType("SHELL");
taskDefinition.setUserId(111);
taskDefinition.setResourceIds("1");
taskDefinition.setWorkerGroup("default");
taskDefinition.setEnvironmentCode(1L);
taskDefinition.setVersion(1);
taskDefinition.setCreateTime(new Date());
taskDefinition.setUpdateTime(new Date());
return taskDefinition;
}
}

6
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -146,7 +146,7 @@ public class TaskDefinitionServiceImplTest {
public void queryTaskDefinitionByName() {
String taskName = "task";
long projectCode = 1L;
long processCode = 1L;
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
@ -158,11 +158,11 @@ public class TaskDefinitionServiceImplTest {
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION )).thenReturn(result);
Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), taskName))
Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processCode, taskName))
.thenReturn(new TaskDefinition());
Map<String, Object> relation = taskDefinitionService
.queryTaskDefinitionByName(loginUser, projectCode, taskName);
.queryTaskDefinitionByName(loginUser, projectCode, processCode, taskName);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
}

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java

@ -41,10 +41,12 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
* query task definition by name
*
* @param projectCode projectCode
* @param processCode processCode
* @param name name
* @return task definition
*/
TaskDefinition queryByName(@Param("projectCode") long projectCode,
@Param("processCode") long processCode,
@Param("name") String name);
/**

20
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

@ -23,12 +23,24 @@
worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
resource_ids, create_time, update_time, task_group_id,task_group_priority, cpu_quota, memory_max
</sql>
<sql id="baseSqlV2">
${alias}.id, ${alias}.code, ${alias}.name, ${alias}.version, ${alias}.description, ${alias}.project_code, ${alias}.user_id,
${alias}.task_type, ${alias}.task_params, ${alias}.flag, ${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code,
${alias}.fail_retry_times, ${alias}.fail_retry_interval, ${alias}.timeout_flag, ${alias}.timeout_notify_strategy, ${alias}.timeout,
${alias}.delay_time, ${alias}.resource_ids, ${alias}.create_time, ${alias}.update_time, ${alias}.task_group_id,
${alias}.task_group_priority, ${alias}.cpu_quota, ${alias}.memory_max
</sql>
<select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select
<include refid="baseSql"/>
from t_ds_task_definition
WHERE project_code = #{projectCode}
and name = #{name}
<include refid="baseSqlV2">
<property name="alias" value="td"/>
</include>
from t_ds_task_definition td
join t_ds_process_task_relation ptr on ptr.project_code = td.project_code
where td.project_code = #{projectCode}
and td.name = #{name}
and ptr.process_definition_code = #{processCode}
and td.code = ptr.post_task_code
</select>
<select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select

25
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
@ -36,6 +37,9 @@ public class TaskDefinitionMapperTest extends BaseDaoTest {
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
private UserMapper userMapper;
@ -60,6 +64,24 @@ public class TaskDefinitionMapperTest extends BaseDaoTest {
return taskDefinition;
}
/**
* insert
*
* @return ProcessDefinition
*/
private ProcessTaskRelation insertTaskRelation(long postTaskCode) {
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setName("def 1");
processTaskRelation.setProjectCode(1L);
processTaskRelation.setProcessDefinitionCode(1L);
processTaskRelation.setPostTaskCode(postTaskCode);
processTaskRelation.setPreTaskCode(0L);
processTaskRelation.setUpdateTime(new Date());
processTaskRelation.setCreateTime(new Date());
processTaskRelationMapper.insert(processTaskRelation);
return processTaskRelation;
}
@Test
public void testInsert() {
TaskDefinition taskDefinition = insertOne();
@ -69,7 +91,8 @@ public class TaskDefinitionMapperTest extends BaseDaoTest {
@Test
public void testQueryByDefinitionName() {
TaskDefinition taskDefinition = insertOne();
TaskDefinition result = taskDefinitionMapper.queryByName(taskDefinition.getProjectCode()
ProcessTaskRelation processTaskRelation = insertTaskRelation(taskDefinition.getCode());
TaskDefinition result = taskDefinitionMapper.queryByName(taskDefinition.getProjectCode(), processTaskRelation.getProcessDefinitionCode()
, taskDefinition.getName());
Assert.assertNotNull(result);

2
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py

@ -273,7 +273,7 @@ class Task(Base):
# TODO get code from specific project process definition and task name
gateway = launch_gateway()
result = gateway.entry_point.getCodeAndVersion(
self.process_definition._project, self.name
self.process_definition._project, self.process_definition.name, self.name
)
# result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
# gateway_result_checker(result)

Loading…
Cancel
Save