From 81930e54208f7c0f305c0b9f8846149bfc38f428 Mon Sep 17 00:00:00 2001 From: Jiajie Zhong Date: Thu, 14 Jul 2022 22:06:07 +0800 Subject: [PATCH] [python] Fix tasks with multiple upstream and workflow query error (#10941) * when task with more than one upstreams, mapper TaskDefinitionMapper method queryByName will return more than one record, and failed the mybatis result type, so we have to add `limit 1` to it to * add multiple runs of example in integrate test * Change from subprocess.Popen to subprocess.call_check in integrating test which will raise an error when failed --- .github/workflows/py-ci.yml | 4 +-- .../api/python/PythonGateway.java | 7 +++-- .../api/service/impl/QueueServiceImpl.java | 12 ++++---- .../api/service/impl/TenantServiceImpl.java | 4 +-- .../api/service/QueueServiceTest.java | 15 ++++++++++ .../api/service/TenantServiceTest.java | 29 +++++++++++++++++++ .../dao/mapper/TaskDefinitionMapper.xml | 1 + .../core/process_definition.py | 4 +-- .../tests/integration/test_submit_examples.py | 22 ++++++++++---- 9 files changed, 78 insertions(+), 20 deletions(-) diff --git a/.github/workflows/py-ci.yml b/.github/workflows/py-ci.yml index 0b1304d0ef..fdc55bac10 100644 --- a/.github/workflows/py-ci.yml +++ b/.github/workflows/py-ci.yml @@ -51,7 +51,7 @@ jobs: not-docs: - '!(docs/**)' py-change: - - 'dolphinscheduler-python/pydolphinscheduler' + - 'dolphinscheduler-python/pydolphinscheduler/**' lint: name: Lint if: ${{ (needs.paths-filter.outputs.py-change == 'true') || (github.event_name == 'push') }} @@ -165,7 +165,7 @@ jobs: - name: Install Dependences run: | python -m pip install --upgrade ${{ env.DEPENDENCES }} - - name: Run Tests Build Docs + - name: Run Integrate Tests run: | python -m tox -vv -e integrate-test result: diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index ca8a107d9d..e3aecec9be 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -171,10 +171,11 @@ public class PythonGateway { } ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName); + // In the case project exists, but current process definition still not created, we should also return the init version of it if (processDefinition == null) { - String msg = String.format("Can not find valid process definition by name %s", processDefinitionName); - logger.error(msg); - throw new IllegalArgumentException(msg); + result.put("code", CodeGenerateUtils.getInstance().genCode()); + result.put("version", 0L); + return result; } TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskName); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java index b8e47a4145..9a1d1c1978 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java @@ -282,14 +282,14 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService { */ @Override public Queue createQueueIfNotExists(String queue, String queueName) { - Queue queueObj = new Queue(queueName, queue); - createQueueValid(queueObj); Queue existsQueue = queueMapper.queryQueueName(queue, queueName); - if (Objects.isNull(existsQueue)) { - queueMapper.insert(queueObj); - return queueObj; + if (!Objects.isNull(existsQueue)) { + return existsQueue; } - return existsQueue; + Queue queueObj = new Queue(queueName, queue); + createQueueValid(queueObj); + queueMapper.insert(queueObj); + return queueObj; } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java index 6e7cfecb83..f07966af76 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java @@ -366,8 +366,8 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService return tenantMapper.queryByTenantCode(tenantCode); } - Queue newQueue = queueService.createQueueIfNotExists(queue, queueName); - Tenant tenant = new Tenant(tenantCode, desc, newQueue.getId()); + Queue queueObj = queueService.createQueueIfNotExists(queue, queueName); + Tenant tenant = new Tenant(tenantCode, desc, queueObj.getId()); createTenantValid(tenant); tenantMapper.insert(tenant); return tenant; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java index f2001f314f..945a30626c 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java @@ -215,6 +215,21 @@ public class QueueServiceTest { Assert.assertEquals(result.getCode().intValue(), Status.SUCCESS.getCode()); } + @Test + public void testCreateQueueIfNotExists() { + Queue queue; + + // queue exists + Mockito.when(queueMapper.queryQueueName(QUEUE, QUEUE_NAME)).thenReturn(getQUEUE()); + queue = queueService.createQueueIfNotExists(QUEUE, QUEUE_NAME); + Assert.assertEquals(getQUEUE(), queue); + + // queue not exists + Mockito.when(queueMapper.queryQueueName(QUEUE, QUEUE_NAME)).thenReturn(null); + queue = queueService.createQueueIfNotExists(QUEUE, QUEUE_NAME); + Assert.assertEquals(new Queue(QUEUE_NAME, QUEUE), queue); + } + /** * create admin user */ diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java index 0229db3c02..62fa91d2c5 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java @@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Queue; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; @@ -77,6 +78,9 @@ public class TenantServiceTest { @InjectMocks private TenantServiceImpl tenantService; + @Mock + private QueueService queueService; + @Mock private TenantMapper tenantMapper; @@ -94,6 +98,8 @@ public class TenantServiceTest { private static final String tenantCode = "hayden"; private static final String tenantDesc = "This is the tenant desc"; + private static final String queue = "queue"; + private static final String queueName = "queue_name"; @Test public void testCreateTenant() throws Exception { @@ -229,6 +235,23 @@ public class TenantServiceTest { Assert.assertEquals(Status.SUCCESS.getMsg(), result.getMsg()); } + @Test + public void testCreateTenantIfNotExists() { + Tenant tenant; + + // Tenant exists + Mockito.when(tenantMapper.existTenant(tenantCode)).thenReturn(true); + Mockito.when(tenantMapper.queryByTenantCode(tenantCode)).thenReturn(getTenant()); + tenant = tenantService.createTenantIfNotExists(tenantCode, tenantDesc, queue, queueName); + Assert.assertEquals(getTenant(), tenant); + + // Tenant not exists + Mockito.when(tenantMapper.existTenant(tenantCode)).thenReturn(false); + Mockito.when(queueService.createQueueIfNotExists(queue, queueName)).thenReturn(getQueue()); + tenant = tenantService.createTenantIfNotExists(tenantCode, tenantDesc, queue, queueName); + Assert.assertEquals(new Tenant(tenantCode, tenantDesc, getQueue().getId()), tenant); + } + /** * get user */ @@ -284,4 +307,10 @@ public class TenantServiceTest { return processDefinitions; } + private Queue getQueue() { + Queue queue = new Queue(); + queue.setId(1); + return queue; + } + } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml index b8c49faa3a..4417e6e463 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml @@ -41,6 +41,7 @@ and td.name = #{name} and ptr.process_definition_code = #{processCode} and td.code = ptr.post_task_code + limit 1