From e462918ac9a5af8fe60c3785ed1549f6f186ec37 Mon Sep 17 00:00:00 2001 From: JieguangZhou Date: Sat, 17 Sep 2022 10:24:07 +0800 Subject: [PATCH] [Feat][python] Add parameter environment to task (#11763) (#11989) (cherry picked from commit 8988492c4329c93b4872ba3a3059a92582e99bda) --- .../api/python/PythonGateway.java | 24 +++++++++++++++++++ .../src/pydolphinscheduler/core/task.py | 11 +++++++++ .../src/pydolphinscheduler/java_gateway.py | 4 ++++ .../tests/core/test_engine.py | 1 + .../tests/core/test_task.py | 1 + .../tests/tasks/test_condition.py | 1 + .../tests/tasks/test_datax.py | 2 ++ .../tests/tasks/test_dependent.py | 1 + .../tests/tasks/test_flink.py | 1 + .../tests/tasks/test_http.py | 1 + .../tests/tasks/test_map_reduce.py | 1 + .../tests/tasks/test_procedure.py | 1 + .../tests/tasks/test_python.py | 1 + .../tests/tasks/test_sagemaker.py | 1 + .../tests/tasks/test_shell.py | 3 +++ .../tests/tasks/test_spark.py | 1 + .../tests/tasks/test_sql.py | 1 + .../tests/tasks/test_sub_process.py | 1 + .../tests/tasks/test_switch.py | 1 + 19 files changed, 58 insertions(+) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index bcb62f7333..5d0873e6c9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -30,8 +30,10 @@ import javax.annotation.PostConstruct; import org.apache.commons.collections.CollectionUtils; import org.apache.dolphinscheduler.api.configuration.PythonGatewayConfiguration; +import org.apache.dolphinscheduler.api.dto.EnvironmentDto; import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.EnvironmentService; import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ProjectService; @@ -98,6 +100,9 @@ public class PythonGateway { @Autowired private TenantService tenantService; + @Autowired + private EnvironmentService environmentService; + @Autowired private ExecutorService executorService; @@ -562,6 +567,25 @@ public class PythonGateway { return result; } + /** + * Get environment info by given environment name. It return environment code. + * Useful in Python API create task which need environment information. + * + * @param environmentName name of the environment + */ + public Long getEnvironmentInfo(String environmentName) { + Map result = environmentService.queryEnvironmentByName(environmentName); + + if (result.get("data") == null) { + String msg = String.format("Can not find valid environment by name %s", environmentName); + logger.error(msg); + throw new IllegalArgumentException(msg); + } + EnvironmentDto environmentDto = EnvironmentDto.class.cast(result.get("data")); + return environmentDto.getCode(); + } + + /** * Get resource by given resource type and full name. It return map contain resource id, name. * Useful in Python API create task which need processDefinition information. diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py index 93c5f28342..d1a2eae9dd 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py @@ -90,6 +90,7 @@ class Task(Base): "flag", "task_priority", "worker_group", + "environment_code", "delay_time", "fail_retry_times", "fail_retry_interval", @@ -110,6 +111,7 @@ class Task(Base): flag: Optional[str] = TaskFlag.YES, task_priority: Optional[str] = TaskPriority.MEDIUM, worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP, + environment_name: Optional[str] = None, delay_time: Optional[int] = 0, fail_retry_times: Optional[int] = 0, fail_retry_interval: Optional[int] = 1, @@ -129,6 +131,7 @@ class Task(Base): self.flag = flag self.task_priority = task_priority self.worker_group = worker_group + self._environment_name = environment_name self.fail_retry_times = fail_retry_times self.fail_retry_interval = fail_retry_interval self.delay_time = delay_time @@ -145,6 +148,7 @@ class Task(Base): # move attribute code and version after _process_definition and process_definition declare self.code, self.version = self.gen_code_and_version() # Add task to process definition, maybe we could put into property process_definition latter + if ( self.process_definition is not None and self.code not in self.process_definition.tasks @@ -306,3 +310,10 @@ class Task(Base): # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT) # gateway_result_checker(result) return result.get("code"), result.get("version") + + @property + def environment_code(self) -> str: + """Convert environment name to code.""" + if self._environment_name is None: + return None + return JavaGate().query_environment_info(self._environment_name) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py index 59d411b62c..0ff74ba655 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py @@ -98,6 +98,10 @@ class JavaGate: """Get resources file info through java gateway.""" return self.java_gateway.entry_point.queryResourcesFileInfo(user_name, name) + def query_environment_info(self, name: str): + """Get environment info through java gateway.""" + return self.java_gateway.entry_point.getEnvironmentInfo(name) + def get_code_and_version( self, project_name: str, process_definition_name: str, task_name: str ): diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py index e36c47ba1b..ba44fad669 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py @@ -124,6 +124,7 @@ def test_property_task_params(mock_resource, mock_code_version, attr, expect): "flag": "YES", "taskPriority": "MEDIUM", "workerGroup": "default", + "environmentCode": None, "failRetryTimes": 0, "failRetryInterval": 1, "timeoutFlag": "CLOSE", diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py index 201bdb30cd..3909077b98 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py @@ -161,6 +161,7 @@ def test_task_get_define(): "flag": "YES", "taskPriority": "MEDIUM", "workerGroup": "default", + "environmentCode": None, "failRetryTimes": 0, "failRetryInterval": 1, "timeoutFlag": "CLOSE", diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py index 523264034a..72eec28ed7 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py @@ -381,6 +381,7 @@ def test_condition_get_define(mock_condition_code_version, mock_task_code_versio "flag": "YES", "taskPriority": "MEDIUM", "workerGroup": "default", + "environmentCode": None, "failRetryTimes": 0, "failRetryInterval": 1, "timeoutFlag": "CLOSE", diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py index 9473f57321..5d1890e83d 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py @@ -67,6 +67,7 @@ def test_datax_get_define(mock_datasource): "flag": "YES", "taskPriority": "MEDIUM", "workerGroup": "default", + "environmentCode": None, "failRetryTimes": 0, "failRetryInterval": 1, "timeoutFlag": "CLOSE", @@ -108,6 +109,7 @@ def test_custom_datax_get_define(json_template): "flag": "YES", "taskPriority": "MEDIUM", "workerGroup": "default", + "environmentCode": None, "failRetryTimes": 0, "failRetryInterval": 1, "timeoutFlag": "CLOSE", diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py index f16e291c82..f55700e04b 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py @@ -782,6 +782,7 @@ def test_dependent_get_define(mock_code_version, mock_dep_code): "flag": "YES", "taskPriority": "MEDIUM", "workerGroup": "default", + "environmentCode": None, "failRetryTimes": 0, "failRetryInterval": 1, "timeoutFlag": "CLOSE", diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py index 92ae3ba91f..2f30a494b9 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py @@ -68,6 +68,7 @@ def test_flink_get_define(mock_resource): "flag": "YES", "taskPriority": "MEDIUM", "workerGroup": "default", + "environmentCode": None, "failRetryTimes": 0, "failRetryInterval": 1, "timeoutFlag": "CLOSE", diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py index 060cdec0b0..399829b68c 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py @@ -130,6 +130,7 @@ def test_http_get_define(): "flag": "YES", "taskPriority": "MEDIUM", "workerGroup": "default", + "environmentCode": None, "failRetryTimes": 0, "failRetryInterval": 1, "timeoutFlag": "CLOSE", diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py index dbe9e513f5..5d38e93aa4 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py @@ -61,6 +61,7 @@ def test_mr_get_define(mock_resource): "flag": "YES", "taskPriority": "MEDIUM", "workerGroup": "default", + "environmentCode": None, "failRetryTimes": 0, "failRetryInterval": 1, "timeoutFlag": "CLOSE", diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py index 1782593955..80afe7b879 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py @@ -96,6 +96,7 @@ def test_sql_get_define(mock_datasource, mock_code_version): "flag": "YES", "taskPriority": "MEDIUM", "workerGroup": "default", + "environmentCode": None, "failRetryTimes": 0, "failRetryInterval": 1, "timeoutFlag": "CLOSE", diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py index 1cdd85d2cb..e8f7f10d77 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py @@ -132,6 +132,7 @@ def test_python_get_define(name, script_code, raw): "flag": "YES", "taskPriority": "MEDIUM", "workerGroup": "default", + "environmentCode": None, "failRetryTimes": 0, "failRetryInterval": 1, "timeoutFlag": "CLOSE", diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py index 8838eaf497..20edc22805 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py @@ -87,6 +87,7 @@ def test_sagemaker_get_define(): "flag": "YES", "taskPriority": "MEDIUM", "workerGroup": "default", + "environmentCode": None, "failRetryTimes": 0, "failRetryInterval": 1, "timeoutFlag": "CLOSE", diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py index e42f6dc0fb..e2c87d8e7d 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py @@ -62,6 +62,7 @@ def test_shell_get_define(): "name": name, "version": 1, "description": None, + "environmentCode": None, "delayTime": 0, "taskType": "SHELL", "taskParams": { @@ -75,6 +76,7 @@ def test_shell_get_define(): "flag": "YES", "taskPriority": "MEDIUM", "workerGroup": "default", + "environmentCode": None, "failRetryTimes": 0, "failRetryInterval": 1, "timeoutFlag": "CLOSE", @@ -86,4 +88,5 @@ def test_shell_get_define(): return_value=(code, version), ): shell = Shell(name, command) + print(shell.get_define()) assert shell.get_define() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py index 3b0672f963..ed83f9f953 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py @@ -68,6 +68,7 @@ def test_spark_get_define(mock_resource): "flag": "YES", "taskPriority": "MEDIUM", "workerGroup": "default", + "environmentCode": None, "failRetryTimes": 0, "failRetryInterval": 1, "timeoutFlag": "CLOSE", diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py index 50ccd946a9..ba9daa9b2d 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py @@ -152,6 +152,7 @@ def test_sql_get_define(mock_datasource): "flag": "YES", "taskPriority": "MEDIUM", "workerGroup": "default", + "environmentCode": None, "failRetryTimes": 0, "failRetryInterval": 1, "timeoutFlag": "CLOSE", diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py index 7f471a1b8b..126ab1015e 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py @@ -99,6 +99,7 @@ def test_sub_process_get_define(mock_process_definition): "flag": "YES", "taskPriority": "MEDIUM", "workerGroup": "default", + "environmentCode": None, "failRetryTimes": 0, "failRetryInterval": 1, "timeoutFlag": "CLOSE", diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py index 1f6ff5bfa2..3206b12f7e 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py @@ -250,6 +250,7 @@ def test_switch_get_define(mock_task_code_version): "flag": "YES", "taskPriority": "MEDIUM", "workerGroup": "default", + "environmentCode": None, "failRetryTimes": 0, "failRetryInterval": 1, "timeoutFlag": "CLOSE",