Browse Source

[python] Fix tasks with multiple upstream and workflow query error (#10941)

* when task with more than one upstreams, mapper
   TaskDefinitionMapper method queryByName will return
   more than one record, and failed the mybatis result
   type, so we have to add `limit 1` to it to
* add multiple runs of example in integrate test
* Change from subprocess.Popen to subprocess.call_check
  in integrating test which will raise an error when failed

(cherry picked from commit 81930e5420)
3.0.0/version-upgrade
Jiajie Zhong 2 years ago
parent
commit
e4ab488426
  1. 4
      .github/workflows/py-ci.yml
  2. 7
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
  3. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
  4. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
  5. 15
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
  6. 29
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
  7. 1
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
  8. 14
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
  9. 22
      dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py

4
.github/workflows/py-ci.yml

@ -51,7 +51,7 @@ jobs:
not-docs:
- '!(docs/**)'
py-change:
- 'dolphinscheduler-python/pydolphinscheduler'
- 'dolphinscheduler-python/pydolphinscheduler/**'
lint:
name: Lint
if: ${{ (needs.paths-filter.outputs.py-change == 'true') || (github.event_name == 'push') }}
@ -163,7 +163,7 @@ jobs:
- name: Install Dependences
run: |
python -m pip install --upgrade ${{ env.DEPENDENCES }}
- name: Run Tests Build Docs
- name: Run Integrate Tests
run: |
python -m tox -vv -e integrate-test
result:

7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -174,10 +174,11 @@ public class PythonGateway {
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
// In the case project exists, but current process definition still not created, we should also return the init version of it
if (processDefinition == null) {
String msg = String.format("Can not find valid process definition by name %s", processDefinitionName);
logger.error(msg);
throw new IllegalArgumentException(msg);
result.put("code", CodeGenerateUtils.getInstance().genCode());
result.put("version", 0L);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskName);

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java

@ -282,14 +282,14 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
*/
@Override
public Queue createQueueIfNotExists(String queue, String queueName) {
Queue existsQueue = queueMapper.queryQueueName(queue, queueName);
if (!Objects.isNull(existsQueue)) {
return existsQueue;
}
Queue queueObj = new Queue(queueName, queue);
createQueueValid(queueObj);
Queue existsQueue = queueMapper.queryQueueName(queue, queueName);
if (Objects.isNull(existsQueue)) {
queueMapper.insert(queueObj);
return queueObj;
}
return existsQueue;
}
}

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java

@ -366,8 +366,8 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
return tenantMapper.queryByTenantCode(tenantCode);
}
Queue newQueue = queueService.createQueueIfNotExists(queue, queueName);
Tenant tenant = new Tenant(tenantCode, desc, newQueue.getId());
Queue queueObj = queueService.createQueueIfNotExists(queue, queueName);
Tenant tenant = new Tenant(tenantCode, desc, queueObj.getId());
createTenantValid(tenant);
tenantMapper.insert(tenant);
return tenant;

15
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java

@ -212,6 +212,21 @@ public class QueueServiceTest {
Assert.assertEquals(result.getCode().intValue(), Status.SUCCESS.getCode());
}
@Test
public void testCreateQueueIfNotExists() {
Queue queue;
// queue exists
Mockito.when(queueMapper.queryQueueName(QUEUE, QUEUE_NAME)).thenReturn(getQUEUE());
queue = queueService.createQueueIfNotExists(QUEUE, QUEUE_NAME);
Assert.assertEquals(getQUEUE(), queue);
// queue not exists
Mockito.when(queueMapper.queryQueueName(QUEUE, QUEUE_NAME)).thenReturn(null);
queue = queueService.createQueueIfNotExists(QUEUE, QUEUE_NAME);
Assert.assertEquals(new Queue(QUEUE_NAME, QUEUE), queue);
}
/**
* create admin user
*/

29
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java

@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
@ -77,6 +78,9 @@ public class TenantServiceTest {
@InjectMocks
private TenantServiceImpl tenantService;
@Mock
private QueueService queueService;
@Mock
private TenantMapper tenantMapper;
@ -94,6 +98,8 @@ public class TenantServiceTest {
private static final String tenantCode = "hayden";
private static final String tenantDesc = "This is the tenant desc";
private static final String queue = "queue";
private static final String queueName = "queue_name";
@Test
public void testCreateTenant() throws Exception {
@ -229,6 +235,23 @@ public class TenantServiceTest {
Assert.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
}
@Test
public void testCreateTenantIfNotExists() {
Tenant tenant;
// Tenant exists
Mockito.when(tenantMapper.existTenant(tenantCode)).thenReturn(true);
Mockito.when(tenantMapper.queryByTenantCode(tenantCode)).thenReturn(getTenant());
tenant = tenantService.createTenantIfNotExists(tenantCode, tenantDesc, queue, queueName);
Assert.assertEquals(getTenant(), tenant);
// Tenant not exists
Mockito.when(tenantMapper.existTenant(tenantCode)).thenReturn(false);
Mockito.when(queueService.createQueueIfNotExists(queue, queueName)).thenReturn(getQueue());
tenant = tenantService.createTenantIfNotExists(tenantCode, tenantDesc, queue, queueName);
Assert.assertEquals(new Tenant(tenantCode, tenantDesc, getQueue().getId()), tenant);
}
/**
* get user
*/
@ -284,4 +307,10 @@ public class TenantServiceTest {
return processDefinitions;
}
private Queue getQueue() {
Queue queue = new Queue();
queue.setId(1);
return queue;
}
}

1
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

@ -41,6 +41,7 @@
and td.name = #{name}
and ptr.process_definition_code = #{processCode}
and td.code = ptr.post_task_code
limit 1
</select>
<select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select

14
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

@ -63,6 +63,9 @@ class ProcessDefinition(Base):
thought Web UI after it :func:`submit` or :func:`run`. It will create a new project belongs to
``user`` if it does not exists. And when ``project`` exists but project's create do not belongs
to ``user``, will grant `project` to ``user`` automatically.
:param resource_list: Resource files required by the current process definition.You can create and modify
resource files from this field. When the process definition is submitted, these resource files are
also submitted along with it.
"""
# key attribute for identify ProcessDefinition object
@ -88,6 +91,7 @@ class ProcessDefinition(Base):
"tasks",
"task_definition_json",
"task_relation_json",
"resource_list",
}
def __init__(
@ -107,6 +111,7 @@ class ProcessDefinition(Base):
timeout: Optional[int] = 0,
release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
param: Optional[Dict] = None,
resource_list: Optional[List] = None,
):
super().__init__(name, description)
self.schedule = schedule
@ -132,6 +137,7 @@ class ProcessDefinition(Base):
# TODO how to fix circle import
self._task_relations: set["TaskRelation"] = set() # noqa: F821
self._process_definition_code = None
self.resource_list = resource_list or []
def __enter__(self) -> "ProcessDefinition":
ProcessDefinitionContext.set(self)
@ -407,6 +413,14 @@ class ProcessDefinition(Base):
None,
None,
)
if len(self.resource_list) > 0:
for res in self.resource_list:
gateway.entry_point.createOrUpdateResource(
self._user,
res.name,
res.description,
res.content,
)
return self._process_definition_code
def start(self) -> None:

22
dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py

@ -17,8 +17,8 @@
"""Test whether success submit examples DAG to PythonGatewayService."""
import subprocess
from pathlib import Path
from subprocess import Popen
import pytest
@ -38,7 +38,19 @@ def test_exec_white_list_example(example_path: Path):
"""Test execute examples and submit DAG to PythonGatewayService."""
try:
# Because our task decorator used module ``inspect`` to get the source, and it will
# raise IOError when call it by built-in function ``exec``, so we change to ``subprocess.Popen``
Popen(["python", str(example_path)])
except Exception:
raise Exception("Run example %s failed.", example_path.stem)
# raise IOError when call it by built-in function ``exec``, so we change to ``subprocess.check_call``
subprocess.check_call(["python", str(example_path)])
except subprocess.CalledProcessError:
raise RuntimeError("Run example %s failed.", example_path.stem)
def test_exec_multiple_times():
"""Test whether process definition can be executed more than one times."""
tutorial_path = path_example.joinpath("tutorial.py")
time = 0
while time < 3:
try:
subprocess.check_call(["python", str(tutorial_path)])
except subprocess.CalledProcessError:
raise RuntimeError("Run example %s failed.", tutorial_path.stem)
time += 1

Loading…
Cancel
Save