diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index 8aea47471b..4fb74f1e66 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -83,8 +83,6 @@ import py4j.GatewayServer; public class PythonGateway { private static final Logger logger = LoggerFactory.getLogger(PythonGateway.class); - private static final WarningType DEFAULT_WARNING_TYPE = WarningType.NONE; - private static final int DEFAULT_WARNING_GROUP_ID = 0; private static final FailureStrategy DEFAULT_FAILURE_STRATEGY = FailureStrategy.CONTINUE; private static final Priority DEFAULT_PRIORITY = Priority.MEDIUM; private static final Long DEFAULT_ENVIRONMENT_CODE = -1L; @@ -200,6 +198,8 @@ public class PythonGateway { * @param globalParams global params * @param schedule schedule for process definition, will not set schedule if null, * and if would always fresh exists schedule if not null + * @param warningType warning type + * @param warningGroupId warning group id * @param locations locations json object about all tasks * @param timeout timeout for process definition working, if running time longer than timeout, * task will mark as fail @@ -215,6 +215,8 @@ public class PythonGateway { String description, String globalParams, String schedule, + String warningType, + int warningGroupId, String locations, int timeout, String workerGroup, @@ -244,7 +246,7 @@ public class PythonGateway { // Fresh process definition schedule if (schedule != null) { - createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup); + createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId); } processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE); return processDefinitionCode; @@ -283,25 +285,29 @@ public class PythonGateway { * @param processDefinitionCode process definition code * @param schedule schedule expression * @param workerGroup work group + * @param warningType warning type + * @param warningGroupId warning group id */ private void createOrUpdateSchedule(User user, long projectCode, long processDefinitionCode, String schedule, - String workerGroup) { + String workerGroup, + String warningType, + int warningGroupId) { Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode); // create or update schedule int scheduleId; if (scheduleObj == null) { processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE); - Map result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, DEFAULT_WARNING_TYPE, - DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); + Map result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, WarningType.valueOf(warningType), + warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); scheduleId = (int) result.get("scheduleId"); } else { scheduleId = scheduleObj.getId(); processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE); - schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, DEFAULT_WARNING_TYPE, - DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); + schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType), + warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); } schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE); } @@ -311,6 +317,8 @@ public class PythonGateway { String processDefinitionName, String cronTime, String workerGroup, + String warningType, + int warningGroupId, Integer timeout ) { User user = usersService.queryUser(userName); @@ -328,8 +336,8 @@ public class PythonGateway { DEFAULT_FAILURE_STRATEGY, null, DEFAULT_TASK_DEPEND_TYPE, - DEFAULT_WARNING_TYPE, - DEFAULT_WARNING_GROUP_ID, + WarningType.valueOf(warningType), + warningGroupId, DEFAULT_RUN_MODE, DEFAULT_PRIORITY, workerGroup, diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst index 66c7f08525..b7879dd94e 100644 --- a/dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst +++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst @@ -108,6 +108,8 @@ All environment variables as below, and you could modify their value via `Bash < | | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``. | + +------------------------------------+------------------------------------------------------------------------------------------------------------------+ | | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``. | ++ +------------------------------------+------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``. | +------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------+ .. note:: diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py index 14f2fe9fce..03ac0977e4 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py @@ -183,5 +183,8 @@ WORKFLOW_WORKER_GROUP = os.environ.get( WORKFLOW_TIME_ZONE = os.environ.get( "PYDS_WORKFLOW_TIME_ZONE", configs.get("default.workflow.time_zone") ) +WORKFLOW_WARNING_TYPE = os.environ.get( + "PYDS_WORKFLOW_WARNING_TYPE", configs.get("default.workflow.warning_type") +) # End Common Configuration Setting diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml index 410f64d6d3..e437e55c31 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml @@ -49,3 +49,4 @@ default: queue: queuePythonGateway worker_group: default time_zone: Asia/Shanghai + warning_type: NONE diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py index 7615226de6..649e2ce51a 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py @@ -80,6 +80,8 @@ class ProcessDefinition(Base): "_project", "_tenant", "worker_group", + "warning_type", + "warning_group_id", "timeout", "release_state", "param", @@ -100,6 +102,8 @@ class ProcessDefinition(Base): project: Optional[str] = configuration.WORKFLOW_PROJECT, tenant: Optional[str] = configuration.WORKFLOW_TENANT, worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP, + warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE, + warning_group_id: Optional[int] = 0, timeout: Optional[int] = 0, release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE, param: Optional[Dict] = None, @@ -113,6 +117,14 @@ class ProcessDefinition(Base): self._project = project self._tenant = tenant self.worker_group = worker_group + self.warning_type = warning_type + if warning_type.strip().upper() not in ("FAILURE", "SUCCESS", "ALL", "NONE"): + raise PyDSParamException( + "Parameter `warning_type` with unexpect value `%s`", warning_type + ) + else: + self.warning_type = warning_type.strip().upper() + self.warning_group_id = warning_group_id self.timeout = timeout self.release_state = release_state self.param = param @@ -361,6 +373,8 @@ class ProcessDefinition(Base): str(self.description) if self.description else "", json.dumps(self.param_json), json.dumps(self.schedule_json) if self.schedule_json else None, + self.warning_type, + self.warning_group_id, json.dumps(self.task_location), self.timeout, self.worker_group, @@ -384,5 +398,7 @@ class ProcessDefinition(Base): self.name, "", self.worker_group, + self.warning_type, + self.warning_group_id, 24 * 3600, ) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_configuration.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_configuration.py index 394fd33d92..c7e217a46f 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_configuration.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_configuration.py @@ -187,6 +187,7 @@ def test_get_configs_build_in(): ("default.workflow.queue", "queuePythonGateway", "editQueuePythonGateway"), ("default.workflow.worker_group", "default", "specific"), ("default.workflow.time_zone", "Asia/Shanghai", "Asia/Beijing"), + ("default.workflow.warning_type", "NONE", "ALL"), ], ) def test_single_config_get_set(teardown_file_env, key: str, val: Any, new_val: Any): @@ -222,6 +223,7 @@ def test_single_config_get_set_not_exists_key(): ("WORKFLOW_QUEUE", "queuePythonGateway"), ("WORKFLOW_WORKER_GROUP", "default"), ("WORKFLOW_TIME_ZONE", "Asia/Shanghai"), + ("WORKFLOW_WARNING_TYPE", "NONE"), ], ) def test_get_configuration(config_name: str, expect: Any): @@ -250,6 +252,7 @@ def test_get_configuration(config_name: str, expect: Any): ("WORKFLOW_QUEUE", "queuePythonGateway", "envQueuePythonGateway"), ("WORKFLOW_WORKER_GROUP", "default", "custom"), ("WORKFLOW_TIME_ZONE", "Asia/Shanghai", "America/Los_Angeles"), + ("WORKFLOW_WARNING_TYPE", "NONE", "ALL"), ], ) def test_get_configuration_env(config_name: str, src: Any, dest: Any): @@ -262,5 +265,8 @@ def test_get_configuration_env(config_name: str, src: Any, dest: Any): importlib.reload(configuration) assert getattr(configuration, config_name) == dest + # pop and reload configuration to test whether this config equal to `src` value os.environ.pop(env_name, None) + importlib.reload(configuration) + assert getattr(configuration, config_name) == src assert env_name not in os.environ diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py index e311be2ec1..88028f72c7 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py @@ -65,6 +65,8 @@ def test_process_definition_key_attr(func): ), ), ("worker_group", configuration.WORKFLOW_WORKER_GROUP), + ("warning_type", configuration.WORKFLOW_WARNING_TYPE), + ("warning_group_id", 0), ("release_state", ProcessDefinitionReleaseState.ONLINE), ], ) @@ -85,6 +87,8 @@ def test_process_definition_default_value(name, value): ("schedule", str, "schedule"), ("timezone", str, "timezone"), ("worker_group", str, "worker_group"), + ("warning_type", str, "FAILURE"), + ("warning_group_id", int, 1), ("timeout", int, 1), ("release_state", str, "OFFLINE"), ("param", dict, {"key": "value"}), @@ -152,6 +156,21 @@ def test__parse_datetime_not_support_type(val: Any): pd._parse_datetime(val) +@pytest.mark.parametrize( + "val", + [ + "ALLL", + "nonee", + ], +) +def test_warn_type_not_support_type(val: str): + """Test process definition param warning_type not support type error.""" + with pytest.raises( + PyDSParamException, match="Parameter `warning_type` with unexpect value.*?" + ): + ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, warning_type=val) + + @pytest.mark.parametrize( "param, expect", [ @@ -234,6 +253,8 @@ def test_process_definition_get_define_without_task(): "project": configuration.WORKFLOW_PROJECT, "tenant": configuration.WORKFLOW_TENANT, "workerGroup": configuration.WORKFLOW_WORKER_GROUP, + "warningType": configuration.WORKFLOW_WARNING_TYPE, + "warningGroupId": 0, "timeout": 0, "releaseState": ProcessDefinitionReleaseState.ONLINE, "param": None, diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py b/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py index aa6d9eee6c..4b1b05ed33 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py @@ -61,6 +61,7 @@ expects = [ "default.workflow.queue": ("queuePythonGateway", "SmithEdit"), "default.workflow.worker_group": ("default", "SmithEdit"), "default.workflow.time_zone": ("Asia/Shanghai", "SmithEdit"), + "default.workflow.warning_type": ("NONE", "SmithEdit"), }, ]