Browse Source

[python] Add parameter schedule for process definition (#6664)

* [python] Add parameter schedule for process definition

* Rebase and fix some code style

* May schedule work on both string and datetime

* Fix flaky test

* Add comment about freeze time

* Add edge test for schedule_json with None schedule

* Fix test function name

* Fix rebase error
3.0.0/version-upgrade
Jiajie Zhong 3 years ago committed by GitHub
parent
commit
e76cf77040
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
  2. 1
      dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt
  3. 24
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
  4. 61
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
  5. 81
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/date.py
  6. 146
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
  7. 18
      dolphinscheduler-python/pydolphinscheduler/tests/utils/__init__.py
  8. 71
      dolphinscheduler-python/pydolphinscheduler/tests/utils/test_date.py
  9. 118
      dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java

7
dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py

@ -34,7 +34,12 @@ from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.shell import Shell
with ProcessDefinition(name="tutorial", tenant="tenant_exists") as pd:
with ProcessDefinition(
name="tutorial",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
tenant="tenant_exists",
) as pd:
task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
task_child_one = Shell(name="task_child_one", command="echo 'child one'")
task_child_two = Shell(name="task_child_two", command="echo 'child two'")

1
dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt

@ -17,6 +17,7 @@
# testting
pytest~=6.2.5
freezegun
# code linting and formatting
flake8
flake8-docstrings

24
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

@ -38,6 +38,7 @@ class ProcessDefinitionDefault:
USER_STATE: int = 1
QUEUE: str = "queuePythonGateway"
WORKER_GROUP: str = "default"
TIME_ZONE: str = "Asia/Shanghai"
class TaskPriority(str):
@ -85,3 +86,26 @@ class JavaGatewayDefault(str):
RESULT_STATUS_SUCCESS = "SUCCESS"
RESULT_DATA = "data"
class Delimiter(str):
"""Constants for delimiter."""
BAR = "-"
DASH = "/"
COLON = ":"
class Time(str):
"""Constants for date."""
FMT_STD_DATE = "%Y-%m-%d"
LEN_STD_DATE = 10
FMT_DASH_DATE = "%Y/%m/%d"
FMT_SHORT_DATE = "%Y%m%d"
LEN_SHORT_DATE = 8
FMT_STD_TIME = "%H:%M:%S"
FMT_NO_COLON_TIME = "%H%M%S"

61
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

@ -18,7 +18,8 @@
"""Module process definition, core class for workflow define."""
import json
from typing import Optional, List, Dict, Set
from datetime import datetime
from typing import Optional, List, Dict, Set, Any
from pydolphinscheduler.constants import (
ProcessDefinitionReleaseState,
@ -27,6 +28,7 @@ from pydolphinscheduler.constants import (
from pydolphinscheduler.core.base import Base
from pydolphinscheduler.java_gateway import launch_gateway
from pydolphinscheduler.side import Tenant, Project, User
from pydolphinscheduler.utils.date import conv_from_str, conv_to_schedule, MAX_DATETIME
class ProcessDefinitionContext:
@ -83,6 +85,10 @@ class ProcessDefinition(Base):
self,
name: str,
description: Optional[str] = None,
schedule: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
timezone: Optional[str] = ProcessDefinitionDefault.TIME_ZONE,
user: Optional[str] = ProcessDefinitionDefault.USER,
project: Optional[str] = ProcessDefinitionDefault.PROJECT,
tenant: Optional[str] = ProcessDefinitionDefault.TENANT,
@ -93,6 +99,10 @@ class ProcessDefinition(Base):
param: Optional[List] = None,
):
super().__init__(name, description)
self.schedule = schedule
self._start_time = start_time
self._end_time = end_time
self.timezone = timezone
self._user = user
self._project = project
self._tenant = tenant
@ -149,6 +159,35 @@ class ProcessDefinition(Base):
ProcessDefinitionDefault.USER_STATE,
)
@staticmethod
def _parse_datetime(val: Any) -> Any:
if val is None or isinstance(val, datetime):
return val
elif isinstance(val, str):
return conv_from_str(val)
else:
raise ValueError("Do not support value type %s for now", type(val))
@property
def start_time(self) -> Any:
"""Get attribute start_time."""
return self._parse_datetime(self._start_time)
@start_time.setter
def start_time(self, val) -> None:
"""Set attribute start_time."""
self._start_time = val
@property
def end_time(self) -> Any:
"""Get attribute end_time."""
return self._parse_datetime(self._end_time)
@end_time.setter
def end_time(self, val) -> None:
"""Set attribute end_time."""
self._end_time = val
@property
def task_definition_json(self) -> List[Dict]:
"""Return all tasks definition in list of dict."""
@ -166,6 +205,25 @@ class ProcessDefinition(Base):
self._handle_root_relation()
return [tr.to_dict() for tr in self._task_relations]
@property
def schedule_json(self) -> Optional[Dict]:
"""Get schedule parameter json object. This is requests from java gateway interface."""
if not self.schedule:
return None
else:
start_time = conv_to_schedule(
self.start_time if self.start_time else datetime.now()
)
end_time = conv_to_schedule(
self.end_time if self.end_time else MAX_DATETIME
)
return {
"startTime": start_time,
"endTime": end_time,
"crontab": self.schedule,
"timezoneId": self.timezone,
}
# TODO inti DAG's tasks are in the same location with default {x: 0, y: 0}
@property
def task_location(self) -> List[Dict]:
@ -274,6 +332,7 @@ class ProcessDefinition(Base):
self.name,
str(self.description) if self.description else "",
str(self.param) if self.param else None,
json.dumps(self.schedule_json) if self.schedule_json else None,
json.dumps(self.task_location),
self.timeout,
self.worker_group,

81
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/date.py

@ -0,0 +1,81 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Date util function collections."""
from datetime import datetime
from pydolphinscheduler.constants import Delimiter, Time
LEN_SUPPORT_DATETIME = (
15,
19,
)
FMT_SHORT = f"{Time.FMT_SHORT_DATE} {Time.FMT_NO_COLON_TIME}"
FMT_DASH = f"{Time.FMT_DASH_DATE} {Time.FMT_STD_TIME}"
FMT_STD = f"{Time.FMT_STD_DATE} {Time.FMT_STD_TIME}"
MAX_DATETIME = datetime(9999, 12, 31, 23, 59, 59)
def conv_to_schedule(src: datetime) -> str:
"""Convert given datetime to schedule date string."""
return datetime.strftime(src, FMT_STD)
def conv_from_str(src: str) -> datetime:
"""Convert given string to datetime.
This function give an ability to convert string to datetime, and for now it could handle
format like:
- %Y-%m-%d
- %Y/%m/%d
- %Y%m%d
- %Y-%m-%d %H:%M:%S
- %Y/%m/%d %H:%M:%S
- %Y%m%d %H%M%S
If pattern not like above be given will raise NotImplementedError.
"""
len_ = len(src)
if len_ == Time.LEN_SHORT_DATE:
return datetime.strptime(src, Time.FMT_SHORT_DATE)
elif len_ == Time.LEN_STD_DATE:
if Delimiter.BAR in src:
return datetime.strptime(src, Time.FMT_STD_DATE)
elif Delimiter.DASH in src:
return datetime.strptime(src, Time.FMT_DASH_DATE)
else:
raise NotImplementedError(
"%s could not be convert to datetime for now.", src
)
elif len_ in LEN_SUPPORT_DATETIME:
if Delimiter.BAR in src and Delimiter.COLON in src:
return datetime.strptime(src, FMT_STD)
elif Delimiter.DASH in src and Delimiter.COLON in src:
return datetime.strptime(src, FMT_DASH)
elif (
Delimiter.DASH not in src
and Delimiter.BAR not in src
and Delimiter.COLON not in src
):
return datetime.strptime(src, FMT_SHORT)
else:
raise NotImplementedError(
"%s could not be convert to datetime for now.", src
)
else:
raise NotImplementedError("%s could not be convert to datetime for now.", src)

146
dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py

@ -17,6 +17,9 @@
"""Test process definition."""
from datetime import datetime
from pydolphinscheduler.utils.date import conv_to_schedule
import pytest
from pydolphinscheduler.constants import (
@ -27,6 +30,7 @@ from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.task import TaskParams
from pydolphinscheduler.side import Tenant, Project, User
from tests.testing.task import Task
from freezegun import freeze_time
TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
@ -43,6 +47,7 @@ def test_process_definition_key_attr(func):
@pytest.mark.parametrize(
"name,value",
[
("timezone", ProcessDefinitionDefault.TIME_ZONE),
("project", Project(ProcessDefinitionDefault.PROJECT)),
("tenant", Tenant(ProcessDefinitionDefault.TENANT)),
(
@ -73,20 +78,63 @@ def test_process_definition_default_value(name, value):
@pytest.mark.parametrize(
"name,cls,expect",
[
("project", Project, "project"),
("tenant", Tenant, "tenant"),
("name", str, "name"),
("description", str, "description"),
("schedule", str, "schedule"),
("timezone", str, "timezone"),
("worker_group", str, "worker_group"),
("timeout", int, 1),
("release_state", str, "OFFLINE"),
("param", dict, {"key": "value"}),
],
)
def test_process_definition_set_attr(name, cls, expect):
"""Test process definition set specific attributes."""
def test_set_attr(name, cls, expect):
"""Test process definition set attributes which get with same type."""
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
setattr(pd, name, cls(expect))
assert getattr(pd, name) == cls(
expect
setattr(pd, name, expect)
assert (
getattr(pd, name) == expect
), f"ProcessDefinition set attribute `{name}` do not work expect"
@pytest.mark.parametrize(
"set_attr,set_val,get_attr,get_val",
[
("_project", "project", "project", Project("project")),
("_tenant", "tenant", "tenant", Tenant("tenant")),
("_start_time", "2021-01-01", "start_time", datetime(2021, 1, 1)),
("_end_time", "2021-01-01", "end_time", datetime(2021, 1, 1)),
],
)
def test_set_attr_return_special_object(set_attr, set_val, get_attr, get_val):
"""Test process definition set attributes which get with different type."""
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
setattr(pd, set_attr, set_val)
assert get_val == getattr(
pd, get_attr
), f"Set attribute {set_attr} can not get back with {get_val}."
@pytest.mark.parametrize(
"val,expect",
[
(datetime(2021, 1, 1), datetime(2021, 1, 1)),
(None, None),
("2021-01-01", datetime(2021, 1, 1)),
("2021-01-01 01:01:01", datetime(2021, 1, 1, 1, 1, 1)),
],
)
def test__parse_datetime(val, expect):
"""Test process definition function _parse_datetime.
Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file.
"""
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
assert expect == pd._parse_datetime(
val
), f"Function _parse_datetime with unexpect value by {val}."
def test_process_definition_to_dict_without_task():
"""Test process definition function to_dict without task."""
expect = {
@ -173,3 +221,87 @@ def test_set_process_definition_user_attr(user_attrs):
assert (
except_attr == actual_attr
), f"Except attribute is {except_attr} but get {actual_attr}"
def test_schedule_json_none_schedule():
"""Test function schedule_json with None as schedule."""
with ProcessDefinition(
TEST_PROCESS_DEFINITION_NAME,
schedule=None,
) as pd:
assert pd.schedule_json is None
# We freeze time here, because we test start_time with None, and if will get datetime.datetime.now. If we do
# not freeze time, it will cause flaky test here.
@freeze_time("2021-01-01")
@pytest.mark.parametrize(
"start_time,end_time,expect_date",
[
(
"20210101",
"20210201",
{"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
),
(
"2021-01-01",
"2021-02-01",
{"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
),
(
"2021/01/01",
"2021/02/01",
{"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
),
# Test mix pattern
(
"2021/01/01 01:01:01",
"2021-02-02 02:02:02",
{"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
),
(
"2021/01/01 01:01:01",
"20210202 020202",
{"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
),
(
"20210101 010101",
"2021-02-02 02:02:02",
{"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
),
# Test None value
(
"2021/01/01 01:02:03",
None,
{"start_time": "2021-01-01 01:02:03", "end_time": "9999-12-31 23:59:59"},
),
(
None,
None,
{
"start_time": conv_to_schedule(datetime(2021, 1, 1)),
"end_time": "9999-12-31 23:59:59",
},
),
],
)
def test_schedule_json_start_and_end_time(start_time, end_time, expect_date):
"""Test function schedule_json about handle start_time and end_time.
Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file.
"""
schedule = "0 0 0 * * ? *"
expect = {
"crontab": schedule,
"startTime": expect_date["start_time"],
"endTime": expect_date["end_time"],
"timezoneId": ProcessDefinitionDefault.TIME_ZONE,
}
with ProcessDefinition(
TEST_PROCESS_DEFINITION_NAME,
schedule=schedule,
start_time=start_time,
end_time=end_time,
timezone=ProcessDefinitionDefault.TIME_ZONE,
) as pd:
assert pd.schedule_json == expect

18
dolphinscheduler-python/pydolphinscheduler/tests/utils/__init__.py

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Init tests for utils package."""

71
dolphinscheduler-python/pydolphinscheduler/tests/utils/test_date.py

@ -0,0 +1,71 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test utils.date module."""
import pytest
from datetime import datetime
from pydolphinscheduler.utils.date import (
conv_from_str,
conv_to_schedule,
FMT_STD,
)
curr_date = datetime.now()
@pytest.mark.parametrize(
"src,expect",
[
(curr_date, curr_date.strftime(FMT_STD)),
(datetime(2021, 1, 1), "2021-01-01 00:00:00"),
(datetime(2021, 1, 1, 1), "2021-01-01 01:00:00"),
(datetime(2021, 1, 1, 1, 1), "2021-01-01 01:01:00"),
(datetime(2021, 1, 1, 1, 1, 1), "2021-01-01 01:01:01"),
(datetime(2021, 1, 1, 1, 1, 1, 1), "2021-01-01 01:01:01"),
],
)
def test_conv_to_schedule(src: datetime, expect: str) -> None:
"""Test function conv_to_schedule."""
assert expect == conv_to_schedule(src)
@pytest.mark.parametrize(
"src,expect",
[
("2021-01-01", datetime(2021, 1, 1)),
("2021/01/01", datetime(2021, 1, 1)),
("20210101", datetime(2021, 1, 1)),
("2021-01-01 01:01:01", datetime(2021, 1, 1, 1, 1, 1)),
("2021/01/01 01:01:01", datetime(2021, 1, 1, 1, 1, 1)),
("20210101 010101", datetime(2021, 1, 1, 1, 1, 1)),
],
)
def test_conv_from_str_success(src: str, expect: datetime) -> None:
"""Test function conv_from_str success case."""
assert expect == conv_from_str(
src
), f"Function conv_from_str convert {src} not expect to {expect}."
@pytest.mark.parametrize(
"src", ["2021-01-01 010101", "2021:01:01", "202111", "20210101010101"]
)
def test_conv_from_str_not_impl(src: str) -> None:
"""Test function conv_from_str fail case."""
with pytest.raises(NotImplementedError):
conv_from_str(src)

118
dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.QueueService;
import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.service.TenantService;
import org.apache.dolphinscheduler.api.service.UsersService;
@ -39,11 +40,13 @@ import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import java.util.HashMap;
@ -73,6 +76,16 @@ import py4j.GatewayServer;
})
public class PythonGatewayServer extends SpringBootServletInitializer {
private static final Logger LOGGER = LoggerFactory.getLogger(PythonGatewayServer.class);
private static final WarningType DEFAULT_WARNING_TYPE = WarningType.NONE;
private static final int DEFAULT_WARNING_GROUP_ID = 0;
private static final FailureStrategy DEFAULT_FAILURE_STRATEGY = FailureStrategy.CONTINUE;
private static final Priority DEFAULT_PRIORITY = Priority.MEDIUM;
private static final Long DEFAULT_ENVIRONMENT_CODE = -1L;
private static final TaskDependType DEFAULT_TASK_DEPEND_TYPE = TaskDependType.TASK_POST;
private static final RunMode DEFAULT_RUN_MODE = RunMode.RUN_MODE_SERIAL;
private static final int DEFAULT_DRY_RUN = 0;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@ -104,6 +117,12 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@Autowired
private SchedulerService schedulerService;
@Autowired
private ScheduleMapper scheduleMapper;
// TODO replace this user to build in admin user if we make sure build in one could not be change
private final User dummyAdminUser = new User() {
{
@ -154,14 +173,18 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
* create or update process definition.
* If process definition do not exists in Project=`projectCode` would create a new one
* If process definition already exists in Project=`projectCode` would update it
* All requests
* <p>
*
* @param userName user name who create or update process definition
* @param projectName project name which process definition belongs to
* @param name process definition name
* @param description description
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param schedule schedule for process definition, will not set schedule if null,
* and if would always fresh exists schedule if not null
* @param locations locations json object about all tasks
* @param timeout timeout for process definition working, if running time longer than timeout,
* task will mark as fail
* @param workerGroup run task in which worker group
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
@ -172,8 +195,10 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
String name,
String description,
String globalParams,
String schedule,
String locations,
int timeout,
String workerGroup,
String tenantCode,
String taskRelationJson,
String taskDefinitionJson,
@ -182,28 +207,67 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST);
long projectCode = project.getCode();
Map<String, Object> verifyProcessDefinitionExists = processDefinitionService.verifyProcessDefinitionName(user, projectCode, name);
Status verifyStatus = (Status) verifyProcessDefinitionExists.get(Constants.STATUS);
long processDefinitionCode;
// create or update process definition
if (verifyStatus == Status.PROCESS_DEFINITION_NAME_EXIST) {
// update process definition
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, name);
long processDefinitionCode = processDefinition.getCode();
processDefinitionCode = processDefinition.getCode();
// make sure process definition offline which could edit
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
Map<String, Object> result = processDefinitionService.updateProcessDefinition(user, projectCode, name, processDefinitionCode, description, globalParams,
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType);
return processDefinitionCode;
} else if (verifyStatus == Status.SUCCESS) {
// create process definition
Map<String, Object> result = processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams,
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType);
ProcessDefinition processDefinition = (ProcessDefinition) result.get(Constants.DATA_LIST);
return processDefinition.getCode();
processDefinitionCode = processDefinition.getCode();
} else {
String msg = "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST.";
LOGGER.error(msg);
throw new RuntimeException(msg);
}
// Fresh process definition schedule
if (schedule != null) {
createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup);
}
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
return processDefinitionCode;
}
/**
* create or update process definition schedule.
* It would always use latest schedule define in workflow-as-code, and set schedule online when
* it's not null
*
* @param user user who create or update schedule
* @param projectCode project which process definition belongs to
* @param processDefinitionCode process definition code
* @param schedule schedule expression
* @param workerGroup work group
*/
private void createOrUpdateSchedule(User user,
long projectCode,
long processDefinitionCode,
String schedule,
String workerGroup) {
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
// create or update schedule
int scheduleId;
if (schedules.isEmpty()) {
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, DEFAULT_WARNING_TYPE,
DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
scheduleId = (int) result.get("scheduleId");
} else {
scheduleId = schedules.get(0).getId();
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, DEFAULT_WARNING_TYPE,
DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
}
schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
}
public void execProcessInstance(String userName,
@ -217,18 +281,6 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
Project project = projectMapper.queryByName(projectName);
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
// temp default value
FailureStrategy failureStrategy = FailureStrategy.CONTINUE;
TaskDependType taskDependType = TaskDependType.TASK_POST;
WarningType warningType = WarningType.NONE;
RunMode runMode = RunMode.RUN_MODE_SERIAL;
Priority priority = Priority.MEDIUM;
int warningGroupId = 0;
Long environmentCode = -1L;
Map<String, String> startParams = null;
Integer expectedParallelismNumber = null;
String startNodeList = null;
// make sure process definition online
processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(), ReleaseState.ONLINE);
@ -237,19 +289,19 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
processDefinition.getCode(),
cronTime,
null,
failureStrategy,
startNodeList,
taskDependType,
warningType,
warningGroupId,
runMode,
priority,
DEFAULT_FAILURE_STRATEGY,
null,
DEFAULT_TASK_DEPEND_TYPE,
DEFAULT_WARNING_TYPE,
DEFAULT_WARNING_GROUP_ID,
DEFAULT_RUN_MODE,
DEFAULT_PRIORITY,
workerGroup,
environmentCode,
DEFAULT_ENVIRONMENT_CODE,
timeout,
startParams,
expectedParallelismNumber,
0
null,
null,
DEFAULT_DRY_RUN
);
}

Loading…
Cancel
Save