Browse Source

[Feat][python] Add parameter environment to task (#11763) (#11989)

(cherry picked from commit 8988492c43)
3.1.0-release
JieguangZhou 2 years ago committed by GitHub
parent
commit
e462918ac9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
  2. 11
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
  3. 4
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
  4. 1
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py
  5. 1
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
  6. 1
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
  7. 2
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
  8. 1
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py
  9. 1
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
  10. 1
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py
  11. 1
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py
  12. 1
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py
  13. 1
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
  14. 1
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py
  15. 3
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
  16. 1
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py
  17. 1
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
  18. 1
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py
  19. 1
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py

24
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -30,8 +30,10 @@ import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.api.configuration.PythonGatewayConfiguration; import org.apache.dolphinscheduler.api.configuration.PythonGatewayConfiguration;
import org.apache.dolphinscheduler.api.dto.EnvironmentDto;
import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent; import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.EnvironmentService;
import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.ProjectService;
@ -98,6 +100,9 @@ public class PythonGateway {
@Autowired @Autowired
private TenantService tenantService; private TenantService tenantService;
@Autowired
private EnvironmentService environmentService;
@Autowired @Autowired
private ExecutorService executorService; private ExecutorService executorService;
@ -562,6 +567,25 @@ public class PythonGateway {
return result; return result;
} }
/**
* Get environment info by given environment name. It return environment code.
* Useful in Python API create task which need environment information.
*
* @param environmentName name of the environment
*/
public Long getEnvironmentInfo(String environmentName) {
Map<String, Object> result = environmentService.queryEnvironmentByName(environmentName);
if (result.get("data") == null) {
String msg = String.format("Can not find valid environment by name %s", environmentName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
EnvironmentDto environmentDto = EnvironmentDto.class.cast(result.get("data"));
return environmentDto.getCode();
}
/** /**
* Get resource by given resource type and full name. It return map contain resource id, name. * Get resource by given resource type and full name. It return map contain resource id, name.
* Useful in Python API create task which need processDefinition information. * Useful in Python API create task which need processDefinition information.

11
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py

@ -90,6 +90,7 @@ class Task(Base):
"flag", "flag",
"task_priority", "task_priority",
"worker_group", "worker_group",
"environment_code",
"delay_time", "delay_time",
"fail_retry_times", "fail_retry_times",
"fail_retry_interval", "fail_retry_interval",
@ -110,6 +111,7 @@ class Task(Base):
flag: Optional[str] = TaskFlag.YES, flag: Optional[str] = TaskFlag.YES,
task_priority: Optional[str] = TaskPriority.MEDIUM, task_priority: Optional[str] = TaskPriority.MEDIUM,
worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP, worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
environment_name: Optional[str] = None,
delay_time: Optional[int] = 0, delay_time: Optional[int] = 0,
fail_retry_times: Optional[int] = 0, fail_retry_times: Optional[int] = 0,
fail_retry_interval: Optional[int] = 1, fail_retry_interval: Optional[int] = 1,
@ -129,6 +131,7 @@ class Task(Base):
self.flag = flag self.flag = flag
self.task_priority = task_priority self.task_priority = task_priority
self.worker_group = worker_group self.worker_group = worker_group
self._environment_name = environment_name
self.fail_retry_times = fail_retry_times self.fail_retry_times = fail_retry_times
self.fail_retry_interval = fail_retry_interval self.fail_retry_interval = fail_retry_interval
self.delay_time = delay_time self.delay_time = delay_time
@ -145,6 +148,7 @@ class Task(Base):
# move attribute code and version after _process_definition and process_definition declare # move attribute code and version after _process_definition and process_definition declare
self.code, self.version = self.gen_code_and_version() self.code, self.version = self.gen_code_and_version()
# Add task to process definition, maybe we could put into property process_definition latter # Add task to process definition, maybe we could put into property process_definition latter
if ( if (
self.process_definition is not None self.process_definition is not None
and self.code not in self.process_definition.tasks and self.code not in self.process_definition.tasks
@ -306,3 +310,10 @@ class Task(Base):
# result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT) # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
# gateway_result_checker(result) # gateway_result_checker(result)
return result.get("code"), result.get("version") return result.get("code"), result.get("version")
@property
def environment_code(self) -> str:
"""Convert environment name to code."""
if self._environment_name is None:
return None
return JavaGate().query_environment_info(self._environment_name)

4
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py

@ -98,6 +98,10 @@ class JavaGate:
"""Get resources file info through java gateway.""" """Get resources file info through java gateway."""
return self.java_gateway.entry_point.queryResourcesFileInfo(user_name, name) return self.java_gateway.entry_point.queryResourcesFileInfo(user_name, name)
def query_environment_info(self, name: str):
"""Get environment info through java gateway."""
return self.java_gateway.entry_point.getEnvironmentInfo(name)
def get_code_and_version( def get_code_and_version(
self, project_name: str, process_definition_name: str, task_name: str self, project_name: str, process_definition_name: str, task_name: str
): ):

1
dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py

@ -124,6 +124,7 @@ def test_property_task_params(mock_resource, mock_code_version, attr, expect):
"flag": "YES", "flag": "YES",
"taskPriority": "MEDIUM", "taskPriority": "MEDIUM",
"workerGroup": "default", "workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0, "failRetryTimes": 0,
"failRetryInterval": 1, "failRetryInterval": 1,
"timeoutFlag": "CLOSE", "timeoutFlag": "CLOSE",

1
dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py

@ -161,6 +161,7 @@ def test_task_get_define():
"flag": "YES", "flag": "YES",
"taskPriority": "MEDIUM", "taskPriority": "MEDIUM",
"workerGroup": "default", "workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0, "failRetryTimes": 0,
"failRetryInterval": 1, "failRetryInterval": 1,
"timeoutFlag": "CLOSE", "timeoutFlag": "CLOSE",

1
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py

@ -381,6 +381,7 @@ def test_condition_get_define(mock_condition_code_version, mock_task_code_versio
"flag": "YES", "flag": "YES",
"taskPriority": "MEDIUM", "taskPriority": "MEDIUM",
"workerGroup": "default", "workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0, "failRetryTimes": 0,
"failRetryInterval": 1, "failRetryInterval": 1,
"timeoutFlag": "CLOSE", "timeoutFlag": "CLOSE",

2
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py

@ -67,6 +67,7 @@ def test_datax_get_define(mock_datasource):
"flag": "YES", "flag": "YES",
"taskPriority": "MEDIUM", "taskPriority": "MEDIUM",
"workerGroup": "default", "workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0, "failRetryTimes": 0,
"failRetryInterval": 1, "failRetryInterval": 1,
"timeoutFlag": "CLOSE", "timeoutFlag": "CLOSE",
@ -108,6 +109,7 @@ def test_custom_datax_get_define(json_template):
"flag": "YES", "flag": "YES",
"taskPriority": "MEDIUM", "taskPriority": "MEDIUM",
"workerGroup": "default", "workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0, "failRetryTimes": 0,
"failRetryInterval": 1, "failRetryInterval": 1,
"timeoutFlag": "CLOSE", "timeoutFlag": "CLOSE",

1
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py

@ -782,6 +782,7 @@ def test_dependent_get_define(mock_code_version, mock_dep_code):
"flag": "YES", "flag": "YES",
"taskPriority": "MEDIUM", "taskPriority": "MEDIUM",
"workerGroup": "default", "workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0, "failRetryTimes": 0,
"failRetryInterval": 1, "failRetryInterval": 1,
"timeoutFlag": "CLOSE", "timeoutFlag": "CLOSE",

1
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py

@ -68,6 +68,7 @@ def test_flink_get_define(mock_resource):
"flag": "YES", "flag": "YES",
"taskPriority": "MEDIUM", "taskPriority": "MEDIUM",
"workerGroup": "default", "workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0, "failRetryTimes": 0,
"failRetryInterval": 1, "failRetryInterval": 1,
"timeoutFlag": "CLOSE", "timeoutFlag": "CLOSE",

1
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py

@ -130,6 +130,7 @@ def test_http_get_define():
"flag": "YES", "flag": "YES",
"taskPriority": "MEDIUM", "taskPriority": "MEDIUM",
"workerGroup": "default", "workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0, "failRetryTimes": 0,
"failRetryInterval": 1, "failRetryInterval": 1,
"timeoutFlag": "CLOSE", "timeoutFlag": "CLOSE",

1
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py

@ -61,6 +61,7 @@ def test_mr_get_define(mock_resource):
"flag": "YES", "flag": "YES",
"taskPriority": "MEDIUM", "taskPriority": "MEDIUM",
"workerGroup": "default", "workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0, "failRetryTimes": 0,
"failRetryInterval": 1, "failRetryInterval": 1,
"timeoutFlag": "CLOSE", "timeoutFlag": "CLOSE",

1
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py

@ -96,6 +96,7 @@ def test_sql_get_define(mock_datasource, mock_code_version):
"flag": "YES", "flag": "YES",
"taskPriority": "MEDIUM", "taskPriority": "MEDIUM",
"workerGroup": "default", "workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0, "failRetryTimes": 0,
"failRetryInterval": 1, "failRetryInterval": 1,
"timeoutFlag": "CLOSE", "timeoutFlag": "CLOSE",

1
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py

@ -132,6 +132,7 @@ def test_python_get_define(name, script_code, raw):
"flag": "YES", "flag": "YES",
"taskPriority": "MEDIUM", "taskPriority": "MEDIUM",
"workerGroup": "default", "workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0, "failRetryTimes": 0,
"failRetryInterval": 1, "failRetryInterval": 1,
"timeoutFlag": "CLOSE", "timeoutFlag": "CLOSE",

1
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sagemaker.py

@ -87,6 +87,7 @@ def test_sagemaker_get_define():
"flag": "YES", "flag": "YES",
"taskPriority": "MEDIUM", "taskPriority": "MEDIUM",
"workerGroup": "default", "workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0, "failRetryTimes": 0,
"failRetryInterval": 1, "failRetryInterval": 1,
"timeoutFlag": "CLOSE", "timeoutFlag": "CLOSE",

3
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py

@ -62,6 +62,7 @@ def test_shell_get_define():
"name": name, "name": name,
"version": 1, "version": 1,
"description": None, "description": None,
"environmentCode": None,
"delayTime": 0, "delayTime": 0,
"taskType": "SHELL", "taskType": "SHELL",
"taskParams": { "taskParams": {
@ -75,6 +76,7 @@ def test_shell_get_define():
"flag": "YES", "flag": "YES",
"taskPriority": "MEDIUM", "taskPriority": "MEDIUM",
"workerGroup": "default", "workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0, "failRetryTimes": 0,
"failRetryInterval": 1, "failRetryInterval": 1,
"timeoutFlag": "CLOSE", "timeoutFlag": "CLOSE",
@ -86,4 +88,5 @@ def test_shell_get_define():
return_value=(code, version), return_value=(code, version),
): ):
shell = Shell(name, command) shell = Shell(name, command)
print(shell.get_define())
assert shell.get_define() == expect assert shell.get_define() == expect

1
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py

@ -68,6 +68,7 @@ def test_spark_get_define(mock_resource):
"flag": "YES", "flag": "YES",
"taskPriority": "MEDIUM", "taskPriority": "MEDIUM",
"workerGroup": "default", "workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0, "failRetryTimes": 0,
"failRetryInterval": 1, "failRetryInterval": 1,
"timeoutFlag": "CLOSE", "timeoutFlag": "CLOSE",

1
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py

@ -152,6 +152,7 @@ def test_sql_get_define(mock_datasource):
"flag": "YES", "flag": "YES",
"taskPriority": "MEDIUM", "taskPriority": "MEDIUM",
"workerGroup": "default", "workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0, "failRetryTimes": 0,
"failRetryInterval": 1, "failRetryInterval": 1,
"timeoutFlag": "CLOSE", "timeoutFlag": "CLOSE",

1
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py

@ -99,6 +99,7 @@ def test_sub_process_get_define(mock_process_definition):
"flag": "YES", "flag": "YES",
"taskPriority": "MEDIUM", "taskPriority": "MEDIUM",
"workerGroup": "default", "workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0, "failRetryTimes": 0,
"failRetryInterval": 1, "failRetryInterval": 1,
"timeoutFlag": "CLOSE", "timeoutFlag": "CLOSE",

1
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py

@ -250,6 +250,7 @@ def test_switch_get_define(mock_task_code_version):
"flag": "YES", "flag": "YES",
"taskPriority": "MEDIUM", "taskPriority": "MEDIUM",
"workerGroup": "default", "workerGroup": "default",
"environmentCode": None,
"failRetryTimes": 0, "failRetryTimes": 0,
"failRetryInterval": 1, "failRetryInterval": 1,
"timeoutFlag": "CLOSE", "timeoutFlag": "CLOSE",

Loading…
Cancel
Save