Browse Source

[improve][python] Specify warning group when deploy process-definition (#9773)

3.0.0/version-upgrade
陈家名 3 years ago committed by GitHub
parent
commit
828034f8b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
  2. 2
      dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst
  3. 3
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py
  4. 1
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml
  5. 16
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
  6. 6
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_configuration.py
  7. 21
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
  8. 1
      dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py

28
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -83,8 +83,6 @@ import py4j.GatewayServer;
public class PythonGateway {
private static final Logger logger = LoggerFactory.getLogger(PythonGateway.class);
private static final WarningType DEFAULT_WARNING_TYPE = WarningType.NONE;
private static final int DEFAULT_WARNING_GROUP_ID = 0;
private static final FailureStrategy DEFAULT_FAILURE_STRATEGY = FailureStrategy.CONTINUE;
private static final Priority DEFAULT_PRIORITY = Priority.MEDIUM;
private static final Long DEFAULT_ENVIRONMENT_CODE = -1L;
@ -200,6 +198,8 @@ public class PythonGateway {
* @param globalParams global params
* @param schedule schedule for process definition, will not set schedule if null,
* and if would always fresh exists schedule if not null
* @param warningType warning type
* @param warningGroupId warning group id
* @param locations locations json object about all tasks
* @param timeout timeout for process definition working, if running time longer than timeout,
* task will mark as fail
@ -215,6 +215,8 @@ public class PythonGateway {
String description,
String globalParams,
String schedule,
String warningType,
int warningGroupId,
String locations,
int timeout,
String workerGroup,
@ -244,7 +246,7 @@ public class PythonGateway {
// Fresh process definition schedule
if (schedule != null) {
createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup);
createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId);
}
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
return processDefinitionCode;
@ -283,25 +285,29 @@ public class PythonGateway {
* @param processDefinitionCode process definition code
* @param schedule schedule expression
* @param workerGroup work group
* @param warningType warning type
* @param warningGroupId warning group id
*/
private void createOrUpdateSchedule(User user,
long projectCode,
long processDefinitionCode,
String schedule,
String workerGroup) {
String workerGroup,
String warningType,
int warningGroupId) {
Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
// create or update schedule
int scheduleId;
if (scheduleObj == null) {
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, DEFAULT_WARNING_TYPE,
DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, WarningType.valueOf(warningType),
warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
scheduleId = (int) result.get("scheduleId");
} else {
scheduleId = scheduleObj.getId();
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, DEFAULT_WARNING_TYPE,
DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType),
warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
}
schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
}
@ -311,6 +317,8 @@ public class PythonGateway {
String processDefinitionName,
String cronTime,
String workerGroup,
String warningType,
int warningGroupId,
Integer timeout
) {
User user = usersService.queryUser(userName);
@ -328,8 +336,8 @@ public class PythonGateway {
DEFAULT_FAILURE_STRATEGY,
null,
DEFAULT_TASK_DEPEND_TYPE,
DEFAULT_WARNING_TYPE,
DEFAULT_WARNING_GROUP_ID,
WarningType.valueOf(warningType),
warningGroupId,
DEFAULT_RUN_MODE,
DEFAULT_PRIORITY,
workerGroup,

2
dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst

@ -108,6 +108,8 @@ All environment variables as below, and you could modify their value via `Bash <
| | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``. |
+------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------+
.. note::

3
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py

@ -183,5 +183,8 @@ WORKFLOW_WORKER_GROUP = os.environ.get(
WORKFLOW_TIME_ZONE = os.environ.get(
"PYDS_WORKFLOW_TIME_ZONE", configs.get("default.workflow.time_zone")
)
WORKFLOW_WARNING_TYPE = os.environ.get(
"PYDS_WORKFLOW_WARNING_TYPE", configs.get("default.workflow.warning_type")
)
# End Common Configuration Setting

1
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml

@ -49,3 +49,4 @@ default:
queue: queuePythonGateway
worker_group: default
time_zone: Asia/Shanghai
warning_type: NONE

16
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

@ -80,6 +80,8 @@ class ProcessDefinition(Base):
"_project",
"_tenant",
"worker_group",
"warning_type",
"warning_group_id",
"timeout",
"release_state",
"param",
@ -100,6 +102,8 @@ class ProcessDefinition(Base):
project: Optional[str] = configuration.WORKFLOW_PROJECT,
tenant: Optional[str] = configuration.WORKFLOW_TENANT,
worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE,
warning_group_id: Optional[int] = 0,
timeout: Optional[int] = 0,
release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE,
param: Optional[Dict] = None,
@ -113,6 +117,14 @@ class ProcessDefinition(Base):
self._project = project
self._tenant = tenant
self.worker_group = worker_group
self.warning_type = warning_type
if warning_type.strip().upper() not in ("FAILURE", "SUCCESS", "ALL", "NONE"):
raise PyDSParamException(
"Parameter `warning_type` with unexpect value `%s`", warning_type
)
else:
self.warning_type = warning_type.strip().upper()
self.warning_group_id = warning_group_id
self.timeout = timeout
self.release_state = release_state
self.param = param
@ -361,6 +373,8 @@ class ProcessDefinition(Base):
str(self.description) if self.description else "",
json.dumps(self.param_json),
json.dumps(self.schedule_json) if self.schedule_json else None,
self.warning_type,
self.warning_group_id,
json.dumps(self.task_location),
self.timeout,
self.worker_group,
@ -384,5 +398,7 @@ class ProcessDefinition(Base):
self.name,
"",
self.worker_group,
self.warning_type,
self.warning_group_id,
24 * 3600,
)

6
dolphinscheduler-python/pydolphinscheduler/tests/core/test_configuration.py

@ -187,6 +187,7 @@ def test_get_configs_build_in():
("default.workflow.queue", "queuePythonGateway", "editQueuePythonGateway"),
("default.workflow.worker_group", "default", "specific"),
("default.workflow.time_zone", "Asia/Shanghai", "Asia/Beijing"),
("default.workflow.warning_type", "NONE", "ALL"),
],
)
def test_single_config_get_set(teardown_file_env, key: str, val: Any, new_val: Any):
@ -222,6 +223,7 @@ def test_single_config_get_set_not_exists_key():
("WORKFLOW_QUEUE", "queuePythonGateway"),
("WORKFLOW_WORKER_GROUP", "default"),
("WORKFLOW_TIME_ZONE", "Asia/Shanghai"),
("WORKFLOW_WARNING_TYPE", "NONE"),
],
)
def test_get_configuration(config_name: str, expect: Any):
@ -250,6 +252,7 @@ def test_get_configuration(config_name: str, expect: Any):
("WORKFLOW_QUEUE", "queuePythonGateway", "envQueuePythonGateway"),
("WORKFLOW_WORKER_GROUP", "default", "custom"),
("WORKFLOW_TIME_ZONE", "Asia/Shanghai", "America/Los_Angeles"),
("WORKFLOW_WARNING_TYPE", "NONE", "ALL"),
],
)
def test_get_configuration_env(config_name: str, src: Any, dest: Any):
@ -262,5 +265,8 @@ def test_get_configuration_env(config_name: str, src: Any, dest: Any):
importlib.reload(configuration)
assert getattr(configuration, config_name) == dest
# pop and reload configuration to test whether this config equal to `src` value
os.environ.pop(env_name, None)
importlib.reload(configuration)
assert getattr(configuration, config_name) == src
assert env_name not in os.environ

21
dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py

@ -65,6 +65,8 @@ def test_process_definition_key_attr(func):
),
),
("worker_group", configuration.WORKFLOW_WORKER_GROUP),
("warning_type", configuration.WORKFLOW_WARNING_TYPE),
("warning_group_id", 0),
("release_state", ProcessDefinitionReleaseState.ONLINE),
],
)
@ -85,6 +87,8 @@ def test_process_definition_default_value(name, value):
("schedule", str, "schedule"),
("timezone", str, "timezone"),
("worker_group", str, "worker_group"),
("warning_type", str, "FAILURE"),
("warning_group_id", int, 1),
("timeout", int, 1),
("release_state", str, "OFFLINE"),
("param", dict, {"key": "value"}),
@ -152,6 +156,21 @@ def test__parse_datetime_not_support_type(val: Any):
pd._parse_datetime(val)
@pytest.mark.parametrize(
"val",
[
"ALLL",
"nonee",
],
)
def test_warn_type_not_support_type(val: str):
"""Test process definition param warning_type not support type error."""
with pytest.raises(
PyDSParamException, match="Parameter `warning_type` with unexpect value.*?"
):
ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, warning_type=val)
@pytest.mark.parametrize(
"param, expect",
[
@ -234,6 +253,8 @@ def test_process_definition_get_define_without_task():
"project": configuration.WORKFLOW_PROJECT,
"tenant": configuration.WORKFLOW_TENANT,
"workerGroup": configuration.WORKFLOW_WORKER_GROUP,
"warningType": configuration.WORKFLOW_WARNING_TYPE,
"warningGroupId": 0,
"timeout": 0,
"releaseState": ProcessDefinitionReleaseState.ONLINE,
"param": None,

1
dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py

@ -61,6 +61,7 @@ expects = [
"default.workflow.queue": ("queuePythonGateway", "SmithEdit"),
"default.workflow.worker_group": ("default", "SmithEdit"),
"default.workflow.time_zone": ("Asia/Shanghai", "SmithEdit"),
"default.workflow.warning_type": ("NONE", "SmithEdit"),
},
]

Loading…
Cancel
Save