diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py index c223836549..451bb75b22 100644 --- a/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py +++ b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py @@ -34,7 +34,12 @@ from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.tasks.shell import Shell -with ProcessDefinition(name="tutorial", tenant="tenant_exists") as pd: +with ProcessDefinition( + name="tutorial", + schedule="0 0 0 * * ? *", + start_time="2021-01-01", + tenant="tenant_exists", +) as pd: task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler") task_child_one = Shell(name="task_child_one", command="echo 'child one'") task_child_two = Shell(name="task_child_two", command="echo 'child two'") diff --git a/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt b/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt index 2c3d409f7b..49b4005954 100644 --- a/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt +++ b/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt @@ -17,6 +17,7 @@ # testting pytest~=6.2.5 +freezegun # code linting and formatting flake8 flake8-docstrings diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index bdf0d9cf7f..d0d94c6725 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -38,6 +38,7 @@ class ProcessDefinitionDefault: USER_STATE: int = 1 QUEUE: str = "queuePythonGateway" WORKER_GROUP: str = "default" + TIME_ZONE: str = "Asia/Shanghai" class TaskPriority(str): @@ -85,3 +86,26 @@ class JavaGatewayDefault(str): RESULT_STATUS_SUCCESS = "SUCCESS" RESULT_DATA = "data" + + +class Delimiter(str): + """Constants for delimiter.""" + + BAR = "-" + DASH = "/" + COLON = ":" + + +class Time(str): + """Constants for date.""" + + FMT_STD_DATE = "%Y-%m-%d" + LEN_STD_DATE = 10 + + FMT_DASH_DATE = "%Y/%m/%d" + + FMT_SHORT_DATE = "%Y%m%d" + LEN_SHORT_DATE = 8 + + FMT_STD_TIME = "%H:%M:%S" + FMT_NO_COLON_TIME = "%H%M%S" diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py index 3acfa8f34e..152119bf5d 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py @@ -18,7 +18,8 @@ """Module process definition, core class for workflow define.""" import json -from typing import Optional, List, Dict, Set +from datetime import datetime +from typing import Optional, List, Dict, Set, Any from pydolphinscheduler.constants import ( ProcessDefinitionReleaseState, @@ -27,6 +28,7 @@ from pydolphinscheduler.constants import ( from pydolphinscheduler.core.base import Base from pydolphinscheduler.java_gateway import launch_gateway from pydolphinscheduler.side import Tenant, Project, User +from pydolphinscheduler.utils.date import conv_from_str, conv_to_schedule, MAX_DATETIME class ProcessDefinitionContext: @@ -83,6 +85,10 @@ class ProcessDefinition(Base): self, name: str, description: Optional[str] = None, + schedule: Optional[str] = None, + start_time: Optional[str] = None, + end_time: Optional[str] = None, + timezone: Optional[str] = ProcessDefinitionDefault.TIME_ZONE, user: Optional[str] = ProcessDefinitionDefault.USER, project: Optional[str] = ProcessDefinitionDefault.PROJECT, tenant: Optional[str] = ProcessDefinitionDefault.TENANT, @@ -93,6 +99,10 @@ class ProcessDefinition(Base): param: Optional[List] = None, ): super().__init__(name, description) + self.schedule = schedule + self._start_time = start_time + self._end_time = end_time + self.timezone = timezone self._user = user self._project = project self._tenant = tenant @@ -149,6 +159,35 @@ class ProcessDefinition(Base): ProcessDefinitionDefault.USER_STATE, ) + @staticmethod + def _parse_datetime(val: Any) -> Any: + if val is None or isinstance(val, datetime): + return val + elif isinstance(val, str): + return conv_from_str(val) + else: + raise ValueError("Do not support value type %s for now", type(val)) + + @property + def start_time(self) -> Any: + """Get attribute start_time.""" + return self._parse_datetime(self._start_time) + + @start_time.setter + def start_time(self, val) -> None: + """Set attribute start_time.""" + self._start_time = val + + @property + def end_time(self) -> Any: + """Get attribute end_time.""" + return self._parse_datetime(self._end_time) + + @end_time.setter + def end_time(self, val) -> None: + """Set attribute end_time.""" + self._end_time = val + @property def task_definition_json(self) -> List[Dict]: """Return all tasks definition in list of dict.""" @@ -166,6 +205,25 @@ class ProcessDefinition(Base): self._handle_root_relation() return [tr.to_dict() for tr in self._task_relations] + @property + def schedule_json(self) -> Optional[Dict]: + """Get schedule parameter json object. This is requests from java gateway interface.""" + if not self.schedule: + return None + else: + start_time = conv_to_schedule( + self.start_time if self.start_time else datetime.now() + ) + end_time = conv_to_schedule( + self.end_time if self.end_time else MAX_DATETIME + ) + return { + "startTime": start_time, + "endTime": end_time, + "crontab": self.schedule, + "timezoneId": self.timezone, + } + # TODO inti DAG's tasks are in the same location with default {x: 0, y: 0} @property def task_location(self) -> List[Dict]: @@ -274,6 +332,7 @@ class ProcessDefinition(Base): self.name, str(self.description) if self.description else "", str(self.param) if self.param else None, + json.dumps(self.schedule_json) if self.schedule_json else None, json.dumps(self.task_location), self.timeout, self.worker_group, diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/date.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/date.py new file mode 100644 index 0000000000..e2a4cd1cd4 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/date.py @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Date util function collections.""" + +from datetime import datetime +from pydolphinscheduler.constants import Delimiter, Time + +LEN_SUPPORT_DATETIME = ( + 15, + 19, +) + +FMT_SHORT = f"{Time.FMT_SHORT_DATE} {Time.FMT_NO_COLON_TIME}" +FMT_DASH = f"{Time.FMT_DASH_DATE} {Time.FMT_STD_TIME}" +FMT_STD = f"{Time.FMT_STD_DATE} {Time.FMT_STD_TIME}" + +MAX_DATETIME = datetime(9999, 12, 31, 23, 59, 59) + + +def conv_to_schedule(src: datetime) -> str: + """Convert given datetime to schedule date string.""" + return datetime.strftime(src, FMT_STD) + + +def conv_from_str(src: str) -> datetime: + """Convert given string to datetime. + + This function give an ability to convert string to datetime, and for now it could handle + format like: + - %Y-%m-%d + - %Y/%m/%d + - %Y%m%d + - %Y-%m-%d %H:%M:%S + - %Y/%m/%d %H:%M:%S + - %Y%m%d %H%M%S + If pattern not like above be given will raise NotImplementedError. + """ + len_ = len(src) + if len_ == Time.LEN_SHORT_DATE: + return datetime.strptime(src, Time.FMT_SHORT_DATE) + elif len_ == Time.LEN_STD_DATE: + if Delimiter.BAR in src: + return datetime.strptime(src, Time.FMT_STD_DATE) + elif Delimiter.DASH in src: + return datetime.strptime(src, Time.FMT_DASH_DATE) + else: + raise NotImplementedError( + "%s could not be convert to datetime for now.", src + ) + elif len_ in LEN_SUPPORT_DATETIME: + if Delimiter.BAR in src and Delimiter.COLON in src: + return datetime.strptime(src, FMT_STD) + elif Delimiter.DASH in src and Delimiter.COLON in src: + return datetime.strptime(src, FMT_DASH) + elif ( + Delimiter.DASH not in src + and Delimiter.BAR not in src + and Delimiter.COLON not in src + ): + return datetime.strptime(src, FMT_SHORT) + else: + raise NotImplementedError( + "%s could not be convert to datetime for now.", src + ) + else: + raise NotImplementedError("%s could not be convert to datetime for now.", src) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py index f0c0a1d5b0..0a028e8c0c 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py @@ -17,6 +17,9 @@ """Test process definition.""" +from datetime import datetime +from pydolphinscheduler.utils.date import conv_to_schedule + import pytest from pydolphinscheduler.constants import ( @@ -27,6 +30,7 @@ from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.core.task import TaskParams from pydolphinscheduler.side import Tenant, Project, User from tests.testing.task import Task +from freezegun import freeze_time TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition" @@ -43,6 +47,7 @@ def test_process_definition_key_attr(func): @pytest.mark.parametrize( "name,value", [ + ("timezone", ProcessDefinitionDefault.TIME_ZONE), ("project", Project(ProcessDefinitionDefault.PROJECT)), ("tenant", Tenant(ProcessDefinitionDefault.TENANT)), ( @@ -73,20 +78,63 @@ def test_process_definition_default_value(name, value): @pytest.mark.parametrize( "name,cls,expect", [ - ("project", Project, "project"), - ("tenant", Tenant, "tenant"), + ("name", str, "name"), + ("description", str, "description"), + ("schedule", str, "schedule"), + ("timezone", str, "timezone"), ("worker_group", str, "worker_group"), + ("timeout", int, 1), + ("release_state", str, "OFFLINE"), + ("param", dict, {"key": "value"}), ], ) -def test_process_definition_set_attr(name, cls, expect): - """Test process definition set specific attributes.""" +def test_set_attr(name, cls, expect): + """Test process definition set attributes which get with same type.""" with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: - setattr(pd, name, cls(expect)) - assert getattr(pd, name) == cls( - expect + setattr(pd, name, expect) + assert ( + getattr(pd, name) == expect ), f"ProcessDefinition set attribute `{name}` do not work expect" +@pytest.mark.parametrize( + "set_attr,set_val,get_attr,get_val", + [ + ("_project", "project", "project", Project("project")), + ("_tenant", "tenant", "tenant", Tenant("tenant")), + ("_start_time", "2021-01-01", "start_time", datetime(2021, 1, 1)), + ("_end_time", "2021-01-01", "end_time", datetime(2021, 1, 1)), + ], +) +def test_set_attr_return_special_object(set_attr, set_val, get_attr, get_val): + """Test process definition set attributes which get with different type.""" + with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: + setattr(pd, set_attr, set_val) + assert get_val == getattr( + pd, get_attr + ), f"Set attribute {set_attr} can not get back with {get_val}." + + +@pytest.mark.parametrize( + "val,expect", + [ + (datetime(2021, 1, 1), datetime(2021, 1, 1)), + (None, None), + ("2021-01-01", datetime(2021, 1, 1)), + ("2021-01-01 01:01:01", datetime(2021, 1, 1, 1, 1, 1)), + ], +) +def test__parse_datetime(val, expect): + """Test process definition function _parse_datetime. + + Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file. + """ + with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: + assert expect == pd._parse_datetime( + val + ), f"Function _parse_datetime with unexpect value by {val}." + + def test_process_definition_to_dict_without_task(): """Test process definition function to_dict without task.""" expect = { @@ -173,3 +221,87 @@ def test_set_process_definition_user_attr(user_attrs): assert ( except_attr == actual_attr ), f"Except attribute is {except_attr} but get {actual_attr}" + + +def test_schedule_json_none_schedule(): + """Test function schedule_json with None as schedule.""" + with ProcessDefinition( + TEST_PROCESS_DEFINITION_NAME, + schedule=None, + ) as pd: + assert pd.schedule_json is None + + +# We freeze time here, because we test start_time with None, and if will get datetime.datetime.now. If we do +# not freeze time, it will cause flaky test here. +@freeze_time("2021-01-01") +@pytest.mark.parametrize( + "start_time,end_time,expect_date", + [ + ( + "20210101", + "20210201", + {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"}, + ), + ( + "2021-01-01", + "2021-02-01", + {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"}, + ), + ( + "2021/01/01", + "2021/02/01", + {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"}, + ), + # Test mix pattern + ( + "2021/01/01 01:01:01", + "2021-02-02 02:02:02", + {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"}, + ), + ( + "2021/01/01 01:01:01", + "20210202 020202", + {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"}, + ), + ( + "20210101 010101", + "2021-02-02 02:02:02", + {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"}, + ), + # Test None value + ( + "2021/01/01 01:02:03", + None, + {"start_time": "2021-01-01 01:02:03", "end_time": "9999-12-31 23:59:59"}, + ), + ( + None, + None, + { + "start_time": conv_to_schedule(datetime(2021, 1, 1)), + "end_time": "9999-12-31 23:59:59", + }, + ), + ], +) +def test_schedule_json_start_and_end_time(start_time, end_time, expect_date): + """Test function schedule_json about handle start_time and end_time. + + Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file. + """ + schedule = "0 0 0 * * ? *" + expect = { + "crontab": schedule, + "startTime": expect_date["start_time"], + "endTime": expect_date["end_time"], + "timezoneId": ProcessDefinitionDefault.TIME_ZONE, + } + with ProcessDefinition( + TEST_PROCESS_DEFINITION_NAME, + schedule=schedule, + start_time=start_time, + end_time=end_time, + timezone=ProcessDefinitionDefault.TIME_ZONE, + ) as pd: + assert pd.schedule_json == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/utils/__init__.py b/dolphinscheduler-python/pydolphinscheduler/tests/utils/__init__.py new file mode 100644 index 0000000000..119f825bc0 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/utils/__init__.py @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Init tests for utils package.""" diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_date.py b/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_date.py new file mode 100644 index 0000000000..53ba4784be --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_date.py @@ -0,0 +1,71 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test utils.date module.""" + +import pytest +from datetime import datetime +from pydolphinscheduler.utils.date import ( + conv_from_str, + conv_to_schedule, + FMT_STD, +) + +curr_date = datetime.now() + + +@pytest.mark.parametrize( + "src,expect", + [ + (curr_date, curr_date.strftime(FMT_STD)), + (datetime(2021, 1, 1), "2021-01-01 00:00:00"), + (datetime(2021, 1, 1, 1), "2021-01-01 01:00:00"), + (datetime(2021, 1, 1, 1, 1), "2021-01-01 01:01:00"), + (datetime(2021, 1, 1, 1, 1, 1), "2021-01-01 01:01:01"), + (datetime(2021, 1, 1, 1, 1, 1, 1), "2021-01-01 01:01:01"), + ], +) +def test_conv_to_schedule(src: datetime, expect: str) -> None: + """Test function conv_to_schedule.""" + assert expect == conv_to_schedule(src) + + +@pytest.mark.parametrize( + "src,expect", + [ + ("2021-01-01", datetime(2021, 1, 1)), + ("2021/01/01", datetime(2021, 1, 1)), + ("20210101", datetime(2021, 1, 1)), + ("2021-01-01 01:01:01", datetime(2021, 1, 1, 1, 1, 1)), + ("2021/01/01 01:01:01", datetime(2021, 1, 1, 1, 1, 1)), + ("20210101 010101", datetime(2021, 1, 1, 1, 1, 1)), + ], +) +def test_conv_from_str_success(src: str, expect: datetime) -> None: + """Test function conv_from_str success case.""" + assert expect == conv_from_str( + src + ), f"Function conv_from_str convert {src} not expect to {expect}." + + +@pytest.mark.parametrize( + "src", ["2021-01-01 010101", "2021:01:01", "202111", "20210101010101"] +) +def test_conv_from_str_not_impl(src: str) -> None: + """Test function conv_from_str fail case.""" + with pytest.raises(NotImplementedError): + conv_from_str(src) diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java index 4e15fbb1a9..4131e8da93 100644 --- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java +++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.QueueService; +import org.apache.dolphinscheduler.api.service.SchedulerService; import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.service.TenantService; import org.apache.dolphinscheduler.api.service.UsersService; @@ -39,11 +40,13 @@ import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Queue; +import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import java.util.HashMap; @@ -73,6 +76,16 @@ import py4j.GatewayServer; }) public class PythonGatewayServer extends SpringBootServletInitializer { private static final Logger LOGGER = LoggerFactory.getLogger(PythonGatewayServer.class); + + private static final WarningType DEFAULT_WARNING_TYPE = WarningType.NONE; + private static final int DEFAULT_WARNING_GROUP_ID = 0; + private static final FailureStrategy DEFAULT_FAILURE_STRATEGY = FailureStrategy.CONTINUE; + private static final Priority DEFAULT_PRIORITY = Priority.MEDIUM; + private static final Long DEFAULT_ENVIRONMENT_CODE = -1L; + + private static final TaskDependType DEFAULT_TASK_DEPEND_TYPE = TaskDependType.TASK_POST; + private static final RunMode DEFAULT_RUN_MODE = RunMode.RUN_MODE_SERIAL; + private static final int DEFAULT_DRY_RUN = 0; @Autowired private ProcessDefinitionMapper processDefinitionMapper; @@ -104,6 +117,12 @@ public class PythonGatewayServer extends SpringBootServletInitializer { @Autowired private TaskDefinitionMapper taskDefinitionMapper; + @Autowired + private SchedulerService schedulerService; + + @Autowired + private ScheduleMapper scheduleMapper; + // TODO replace this user to build in admin user if we make sure build in one could not be change private final User dummyAdminUser = new User() { { @@ -154,14 +173,18 @@ public class PythonGatewayServer extends SpringBootServletInitializer { * create or update process definition. * If process definition do not exists in Project=`projectCode` would create a new one * If process definition already exists in Project=`projectCode` would update it - * All requests - *

* + * @param userName user name who create or update process definition + * @param projectName project name which process definition belongs to * @param name process definition name * @param description description * @param globalParams global params - * @param locations locations for nodes - * @param timeout timeout + * @param schedule schedule for process definition, will not set schedule if null, + * and if would always fresh exists schedule if not null + * @param locations locations json object about all tasks + * @param timeout timeout for process definition working, if running time longer than timeout, + * task will mark as fail + * @param workerGroup run task in which worker group * @param tenantCode tenantCode * @param taskRelationJson relation json for nodes * @param taskDefinitionJson taskDefinitionJson @@ -172,8 +195,10 @@ public class PythonGatewayServer extends SpringBootServletInitializer { String name, String description, String globalParams, + String schedule, String locations, int timeout, + String workerGroup, String tenantCode, String taskRelationJson, String taskDefinitionJson, @@ -182,28 +207,67 @@ public class PythonGatewayServer extends SpringBootServletInitializer { Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST); long projectCode = project.getCode(); Map verifyProcessDefinitionExists = processDefinitionService.verifyProcessDefinitionName(user, projectCode, name); - Status verifyStatus = (Status) verifyProcessDefinitionExists.get(Constants.STATUS); + + long processDefinitionCode; + // create or update process definition if (verifyStatus == Status.PROCESS_DEFINITION_NAME_EXIST) { - // update process definition ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, name); - long processDefinitionCode = processDefinition.getCode(); + processDefinitionCode = processDefinition.getCode(); // make sure process definition offline which could edit processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE); Map result = processDefinitionService.updateProcessDefinition(user, projectCode, name, processDefinitionCode, description, globalParams, locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType); - return processDefinitionCode; } else if (verifyStatus == Status.SUCCESS) { - // create process definition Map result = processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams, locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType); ProcessDefinition processDefinition = (ProcessDefinition) result.get(Constants.DATA_LIST); - return processDefinition.getCode(); + processDefinitionCode = processDefinition.getCode(); } else { String msg = "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST."; LOGGER.error(msg); throw new RuntimeException(msg); } + + // Fresh process definition schedule + if (schedule != null) { + createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup); + } + processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE); + return processDefinitionCode; + } + + /** + * create or update process definition schedule. + * It would always use latest schedule define in workflow-as-code, and set schedule online when + * it's not null + * + * @param user user who create or update schedule + * @param projectCode project which process definition belongs to + * @param processDefinitionCode process definition code + * @param schedule schedule expression + * @param workerGroup work group + */ + private void createOrUpdateSchedule(User user, + long projectCode, + long processDefinitionCode, + String schedule, + String workerGroup) { + List schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode); + // create or update schedule + int scheduleId; + if (schedules.isEmpty()) { + processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE); + Map result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, DEFAULT_WARNING_TYPE, + DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); + scheduleId = (int) result.get("scheduleId"); + } else { + scheduleId = schedules.get(0).getId(); + processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE); + schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, DEFAULT_WARNING_TYPE, + DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); + } + schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE); } public void execProcessInstance(String userName, @@ -217,18 +281,6 @@ public class PythonGatewayServer extends SpringBootServletInitializer { Project project = projectMapper.queryByName(projectName); ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName); - // temp default value - FailureStrategy failureStrategy = FailureStrategy.CONTINUE; - TaskDependType taskDependType = TaskDependType.TASK_POST; - WarningType warningType = WarningType.NONE; - RunMode runMode = RunMode.RUN_MODE_SERIAL; - Priority priority = Priority.MEDIUM; - int warningGroupId = 0; - Long environmentCode = -1L; - Map startParams = null; - Integer expectedParallelismNumber = null; - String startNodeList = null; - // make sure process definition online processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(), ReleaseState.ONLINE); @@ -237,19 +289,19 @@ public class PythonGatewayServer extends SpringBootServletInitializer { processDefinition.getCode(), cronTime, null, - failureStrategy, - startNodeList, - taskDependType, - warningType, - warningGroupId, - runMode, - priority, + DEFAULT_FAILURE_STRATEGY, + null, + DEFAULT_TASK_DEPEND_TYPE, + DEFAULT_WARNING_TYPE, + DEFAULT_WARNING_GROUP_ID, + DEFAULT_RUN_MODE, + DEFAULT_PRIORITY, workerGroup, - environmentCode, + DEFAULT_ENVIRONMENT_CODE, timeout, - startParams, - expectedParallelismNumber, - 0 + null, + null, + DEFAULT_DRY_RUN ); }