Browse Source

[chore][python] Change name from process definition to workflow (#12918)

only change its name in python gateway server code, incluing

* Function name: all related to process definition
* Parameter name and comment related

ref: apache/dolphinscheduler-sdk-python#22
3.2.0-release
Jay Chung 2 years ago committed by GitHub
parent
commit
f20c9b3102
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 108
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

108
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -189,7 +189,7 @@ public class PythonGateway {
ProcessDefinition processDefinition = ProcessDefinition processDefinition =
processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName); processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
// In the case project exists, but current process definition still not created, we should also return the init // In the case project exists, but current workflow still not created, we should also return the init
// version of it // version of it
if (processDefinition == null) { if (processDefinition == null) {
result.put("code", CodeGenerateUtils.getInstance().genCode()); result.put("code", CodeGenerateUtils.getInstance().genCode());
@ -210,20 +210,20 @@ public class PythonGateway {
} }
/** /**
* create or update process definition. * create or update workflow.
* If process definition do not exists in Project=`projectCode` would create a new one * If workflow do not exists in Project=`projectCode` would create a new one
* If process definition already exists in Project=`projectCode` would update it * If workflow already exists in Project=`projectCode` would update it
* *
* @param userName user name who create or update process definition * @param userName user name who create or update workflow
* @param projectName project name which process definition belongs to * @param projectName project name which workflow belongs to
* @param name process definition name * @param name workflow name
* @param description description * @param description description
* @param globalParams global params * @param globalParams global params
* @param schedule schedule for process definition, will not set schedule if null, * @param schedule schedule for workflow, will not set schedule if null,
* and if would always fresh exists schedule if not null * and if would always fresh exists schedule if not null
* @param warningType warning type * @param warningType warning type
* @param warningGroupId warning group id * @param warningGroupId warning group id
* @param timeout timeout for process definition working, if running time longer than timeout, * @param timeout timeout for workflow working, if running time longer than timeout,
* task will mark as fail * task will mark as fail
* @param workerGroup run task in which worker group * @param workerGroup run task in which worker group
* @param tenantCode tenantCode * @param tenantCode tenantCode
@ -232,7 +232,7 @@ public class PythonGateway {
* @param otherParamsJson otherParamsJson handle other params * @param otherParamsJson otherParamsJson handle other params
* @return create result code * @return create result code
*/ */
public Long createOrUpdateProcessDefinition(String userName, public Long createOrUpdateWorkflow(String userName,
String projectName, String projectName,
String name, String name,
String description, String description,
@ -252,13 +252,13 @@ public class PythonGateway {
Project project = projectMapper.queryByName(projectName); Project project = projectMapper.queryByName(projectName);
long projectCode = project.getCode(); long projectCode = project.getCode();
ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, name); ProcessDefinition processDefinition = getWorkflow(user, projectCode, name);
ProcessExecutionTypeEnum executionTypeEnum = ProcessExecutionTypeEnum.valueOf(executionType); ProcessExecutionTypeEnum executionTypeEnum = ProcessExecutionTypeEnum.valueOf(executionType);
long processDefinitionCode; long processDefinitionCode;
// create or update process definition // create or update workflow
if (processDefinition != null) { if (processDefinition != null) {
processDefinitionCode = processDefinition.getCode(); processDefinitionCode = processDefinition.getCode();
// make sure process definition offline which could edit // make sure workflow offline which could edit
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode,
ReleaseState.OFFLINE); ReleaseState.OFFLINE);
processDefinitionService.updateProcessDefinition(user, projectCode, name, processDefinitionService.updateProcessDefinition(user, projectCode, name,
@ -274,7 +274,7 @@ public class PythonGateway {
processDefinitionCode = processDefinition.getCode(); processDefinitionCode = processDefinition.getCode();
} }
// Fresh process definition schedule // Fresh workflow schedule
if (schedule != null) { if (schedule != null) {
createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType,
warningGroupId); warningGroupId);
@ -285,23 +285,23 @@ public class PythonGateway {
} }
/** /**
* get process definition * get workflow
* *
* @param user user who create or update schedule * @param user user who create or update schedule
* @param projectCode project which process definition belongs to * @param projectCode project which workflow belongs to
* @param processDefinitionName process definition name * @param workflowName workflow name
*/ */
private ProcessDefinition getProcessDefinition(User user, long projectCode, String processDefinitionName) { private ProcessDefinition getWorkflow(User user, long projectCode, String workflowName) {
Map<String, Object> verifyProcessDefinitionExists = Map<String, Object> verifyProcessDefinitionExists =
processDefinitionService.verifyProcessDefinitionName(user, projectCode, processDefinitionName, 0); processDefinitionService.verifyProcessDefinitionName(user, projectCode, workflowName, 0);
Status verifyStatus = (Status) verifyProcessDefinitionExists.get(Constants.STATUS); Status verifyStatus = (Status) verifyProcessDefinitionExists.get(Constants.STATUS);
ProcessDefinition processDefinition = null; ProcessDefinition processDefinition = null;
if (verifyStatus == Status.PROCESS_DEFINITION_NAME_EXIST) { if (verifyStatus == Status.PROCESS_DEFINITION_NAME_EXIST) {
processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName); processDefinition = processDefinitionMapper.queryByDefineName(projectCode, workflowName);
} else if (verifyStatus != Status.SUCCESS) { } else if (verifyStatus != Status.SUCCESS) {
String msg = String msg =
"Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST."; "Verify workflow exists status is invalid, neither SUCCESS or WORKFLOW_NAME_EXIST.";
logger.error(msg); logger.error(msg);
throw new RuntimeException(msg); throw new RuntimeException(msg);
} }
@ -310,13 +310,13 @@ public class PythonGateway {
} }
/** /**
* create or update process definition schedule. * create or update workflow schedule.
* It would always use latest schedule define in workflow-as-code, and set schedule online when * It would always use latest schedule define in workflow-as-code, and set schedule online when
* it's not null * it's not null
* *
* @param user user who create or update schedule * @param user user who create or update schedule
* @param projectCode project which process definition belongs to * @param projectCode project which workflow belongs to
* @param processDefinitionCode process definition code * @param workflowCode workflow code
* @param schedule schedule expression * @param schedule schedule expression
* @param workerGroup work group * @param workerGroup work group
* @param warningType warning type * @param warningType warning type
@ -324,24 +324,24 @@ public class PythonGateway {
*/ */
private void createOrUpdateSchedule(User user, private void createOrUpdateSchedule(User user,
long projectCode, long projectCode,
long processDefinitionCode, long workflowCode,
String schedule, String schedule,
String workerGroup, String workerGroup,
String warningType, String warningType,
int warningGroupId) { int warningGroupId) {
Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode); Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(workflowCode);
// create or update schedule // create or update schedule
int scheduleId; int scheduleId;
if (scheduleObj == null) { if (scheduleObj == null) {
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode,
ReleaseState.ONLINE); ReleaseState.ONLINE);
Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, workflowCode,
schedule, WarningType.valueOf(warningType), schedule, WarningType.valueOf(warningType),
warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
scheduleId = (int) result.get("scheduleId"); scheduleId = (int) result.get("scheduleId");
} else { } else {
scheduleId = scheduleObj.getId(); scheduleId = scheduleObj.getId();
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode,
ReleaseState.OFFLINE); ReleaseState.OFFLINE);
schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType), schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType),
warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
@ -349,9 +349,9 @@ public class PythonGateway {
schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE); schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
} }
public void execProcessInstance(String userName, public void execWorkflowInstance(String userName,
String projectName, String projectName,
String processDefinitionName, String workflowName,
String cronTime, String cronTime,
String workerGroup, String workerGroup,
String warningType, String warningType,
@ -360,9 +360,9 @@ public class PythonGateway {
User user = usersService.queryUser(userName); User user = usersService.queryUser(userName);
Project project = projectMapper.queryByName(projectName); Project project = projectMapper.queryByName(projectName);
ProcessDefinition processDefinition = ProcessDefinition processDefinition =
processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName); processDefinitionMapper.queryByDefineName(project.getCode(), workflowName);
// make sure process definition online // make sure workflow online
processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(), processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(),
ReleaseState.ONLINE); ReleaseState.ONLINE);
@ -391,7 +391,7 @@ public class PythonGateway {
// side object // side object
/* /*
* Grant project's permission to user. Use when project's created user not current but Python API use it to change * Grant project's permission to user. Use when project's created user not current but Python API use it to change
* process definition. * workflow.
*/ */
private Integer grantProjectToUser(Project project, User user) { private Integer grantProjectToUser(Project project, User user) {
Date now = new Date(); Date now = new Date();
@ -512,31 +512,31 @@ public class PythonGateway {
} }
/** /**
* Get processDefinition by given processDefinitionName name. It return map contain processDefinition id, name, code. * Get workflow object by given workflow name. It returns map contain workflow id, name, code.
* Useful in Python API create subProcess task which need processDefinition information. * Useful in Python API create subProcess task which need workflow information.
* *
* @param userName user who create or update schedule * @param userName user who create or update schedule
* @param projectName project name which process definition belongs to * @param projectName project name which workflow belongs to
* @param processDefinitionName process definition name * @param workflowName workflow name
*/ */
public Map<String, Object> getProcessDefinitionInfo(String userName, String projectName, public Map<String, Object> getWorkflowInfo(String userName, String projectName,
String processDefinitionName) { String workflowName) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
User user = usersService.queryUser(userName); User user = usersService.queryUser(userName);
Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST); Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST);
long projectCode = project.getCode(); long projectCode = project.getCode();
ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, processDefinitionName); ProcessDefinition processDefinition = getWorkflow(user, projectCode, workflowName);
// get process definition info // get workflow info
if (processDefinition != null) { if (processDefinition != null) {
// make sure process definition online // make sure workflow online
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinition.getCode(), processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinition.getCode(),
ReleaseState.ONLINE); ReleaseState.ONLINE);
result.put("id", processDefinition.getId()); result.put("id", processDefinition.getId());
result.put("name", processDefinition.getName()); result.put("name", processDefinition.getName());
result.put("code", processDefinition.getCode()); result.put("code", processDefinition.getCode());
} else { } else {
String msg = String.format("Can not find valid process definition by name %s", processDefinitionName); String msg = String.format("Can not find valid workflow by name %s", workflowName);
logger.error(msg); logger.error(msg);
throw new IllegalArgumentException(msg); throw new IllegalArgumentException(msg);
} }
@ -545,14 +545,14 @@ public class PythonGateway {
} }
/** /**
* Get project, process definition, task code. * Get project, workflow, task code.
* Useful in Python API create dependent task which need processDefinition information. * Useful in Python API create dependent task which need workflow information.
* *
* @param projectName project name which process definition belongs to * @param projectName project name which workflow belongs to
* @param processDefinitionName process definition name * @param workflowName workflow name
* @param taskName task name * @param taskName task name
*/ */
public Map<String, Object> getDependentInfo(String projectName, String processDefinitionName, String taskName) { public Map<String, Object> getDependentInfo(String projectName, String workflowName, String taskName) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName); Project project = projectMapper.queryByName(projectName);
@ -565,9 +565,9 @@ public class PythonGateway {
result.put("projectCode", projectCode); result.put("projectCode", projectCode);
ProcessDefinition processDefinition = ProcessDefinition processDefinition =
processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName); processDefinitionMapper.queryByDefineName(projectCode, workflowName);
if (processDefinition == null) { if (processDefinition == null) {
String msg = String.format("Can not find valid process definition by name %s", processDefinitionName); String msg = String.format("Can not find valid workflow by name %s", workflowName);
logger.error(msg); logger.error(msg);
throw new IllegalArgumentException(msg); throw new IllegalArgumentException(msg);
} }
@ -582,8 +582,8 @@ public class PythonGateway {
} }
/** /**
* Get resource by given program type and full name. It return map contain resource id, name. * Get resource by given program type and full name. It returns map contain resource id, name.
* Useful in Python API create flink or spark task which need processDefinition information. * Useful in Python API create flink or spark task which need workflow information.
* *
* @param programType program type one of SCALA, JAVA and PYTHON * @param programType program type one of SCALA, JAVA and PYTHON
* @param fullName full name of the resource * @param fullName full name of the resource
@ -628,7 +628,7 @@ public class PythonGateway {
/** /**
* Get resource by given resource type and full name. It return map contain resource id, name. * Get resource by given resource type and full name. It return map contain resource id, name.
* Useful in Python API create task which need processDefinition information. * Useful in Python API create task which need workflow information.
* *
* @param userName user who query resource * @param userName user who query resource
* @param fullName full name of the resource * @param fullName full name of the resource

Loading…
Cancel
Save