Browse Source

[chore][python] Change name from process definition to workflow (#12918)

only change its name in python gateway server code, incluing

* Function name: all related to process definition
* Parameter name and comment related

ref: apache/dolphinscheduler-sdk-python#22

(cherry picked from commit f20c9b3102)
3.1.2-release
Jay Chung 2 years ago
parent
commit
6ed1605680
  1. 132
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

132
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -82,7 +82,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import py4j.GatewayServer;
@Component
public class PythonGateway {
@ -183,8 +182,10 @@ public class PythonGateway {
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
// In the case project exists, but current process definition still not created, we should also return the init version of it
ProcessDefinition processDefinition =
processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
// In the case project exists, but current workflow still not created, we should also return the init
// version of it
if (processDefinition == null) {
result.put("code", CodeGenerateUtils.getInstance().genCode());
result.put("version", 0L);
@ -203,20 +204,20 @@ public class PythonGateway {
}
/**
* create or update process definition.
* If process definition do not exists in Project=`projectCode` would create a new one
* If process definition already exists in Project=`projectCode` would update it
* create or update workflow.
* If workflow do not exists in Project=`projectCode` would create a new one
* If workflow already exists in Project=`projectCode` would update it
*
* @param userName user name who create or update process definition
* @param projectName project name which process definition belongs to
* @param name process definition name
* @param userName user name who create or update workflow
* @param projectName project name which workflow belongs to
* @param name workflow name
* @param description description
* @param globalParams global params
* @param schedule schedule for process definition, will not set schedule if null,
* @param schedule schedule for workflow, will not set schedule if null,
* and if would always fresh exists schedule if not null
* @param warningType warning type
* @param warningGroupId warning group id
* @param timeout timeout for process definition working, if running time longer than timeout,
* @param timeout timeout for workflow working, if running time longer than timeout,
* task will mark as fail
* @param workerGroup run task in which worker group
* @param tenantCode tenantCode
@ -225,7 +226,7 @@ public class PythonGateway {
* @param otherParamsJson otherParamsJson handle other params
* @return create result code
*/
public Long createOrUpdateProcessDefinition(String userName,
public Long createOrUpdateWorkflow(String userName,
String projectName,
String name,
String description,
@ -245,13 +246,13 @@ public class PythonGateway {
Project project = projectMapper.queryByName(projectName);
long projectCode = project.getCode();
ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, name);
ProcessDefinition processDefinition = getWorkflow(user, projectCode, name);
ProcessExecutionTypeEnum executionTypeEnum = ProcessExecutionTypeEnum.valueOf(executionType);
long processDefinitionCode;
// create or update process definition
// create or update workflow
if (processDefinition != null) {
processDefinitionCode = processDefinition.getCode();
// make sure process definition offline which could edit
// make sure workflow offline which could edit
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode,
ReleaseState.OFFLINE);
processDefinitionService.updateProcessDefinition(user, projectCode, name,
@ -267,7 +268,7 @@ public class PythonGateway {
processDefinitionCode = processDefinition.getCode();
}
// Fresh process definition schedule
// Fresh workflow schedule
if (schedule != null) {
createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId);
}
@ -276,21 +277,23 @@ public class PythonGateway {
}
/**
* get process definition
* get workflow
*
* @param user user who create or update schedule
* @param projectCode project which process definition belongs to
* @param processDefinitionName process definition name
* @param projectCode project which workflow belongs to
* @param workflowName workflow name
*/
private ProcessDefinition getProcessDefinition(User user, long projectCode, String processDefinitionName) {
Map<String, Object> verifyProcessDefinitionExists = processDefinitionService.verifyProcessDefinitionName(user, projectCode, processDefinitionName, 0);
private ProcessDefinition getWorkflow(User user, long projectCode, String workflowName) {
Map<String, Object> verifyProcessDefinitionExists =
processDefinitionService.verifyProcessDefinitionName(user, projectCode, workflowName, 0);
Status verifyStatus = (Status) verifyProcessDefinitionExists.get(Constants.STATUS);
ProcessDefinition processDefinition = null;
if (verifyStatus == Status.PROCESS_DEFINITION_NAME_EXIST) {
processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName);
processDefinition = processDefinitionMapper.queryByDefineName(projectCode, workflowName);
} else if (verifyStatus != Status.SUCCESS) {
String msg = "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST.";
String msg =
"Verify workflow exists status is invalid, neither SUCCESS or WORKFLOW_NAME_EXIST.";
logger.error(msg);
throw new RuntimeException(msg);
}
@ -299,13 +302,13 @@ public class PythonGateway {
}
/**
* create or update process definition schedule.
* create or update workflow schedule.
* It would always use latest schedule define in workflow-as-code, and set schedule online when
* it's not null
*
* @param user user who create or update schedule
* @param projectCode project which process definition belongs to
* @param processDefinitionCode process definition code
* @param projectCode project which workflow belongs to
* @param workflowCode workflow code
* @param schedule schedule expression
* @param workerGroup work group
* @param warningType warning type
@ -313,43 +316,47 @@ public class PythonGateway {
*/
private void createOrUpdateSchedule(User user,
long projectCode,
long processDefinitionCode,
long workflowCode,
String schedule,
String workerGroup,
String warningType,
int warningGroupId) {
Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(workflowCode);
// create or update schedule
int scheduleId;
if (scheduleObj == null) {
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, WarningType.valueOf(warningType),
processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode,
ReleaseState.ONLINE);
Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, workflowCode,
schedule, WarningType.valueOf(warningType),
warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
scheduleId = (int) result.get("scheduleId");
} else {
scheduleId = scheduleObj.getId();
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode,
ReleaseState.OFFLINE);
schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType),
warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
}
schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
}
public void execProcessInstance(String userName,
public void execWorkflowInstance(String userName,
String projectName,
String processDefinitionName,
String workflowName,
String cronTime,
String workerGroup,
String warningType,
int warningGroupId,
Integer timeout
) {
Integer warningGroupId,
Integer timeout) {
User user = usersService.queryUser(userName);
Project project = projectMapper.queryByName(projectName);
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
ProcessDefinition processDefinition =
processDefinitionMapper.queryByDefineName(project.getCode(), workflowName);
// make sure process definition online
processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(), ReleaseState.ONLINE);
// make sure workflow online
processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(),
ReleaseState.ONLINE);
executorService.execProcessInstance(user,
project.getCode(),
@ -375,8 +382,8 @@ public class PythonGateway {
// side object
/*
Grant project's permission to user. Use when project's created user not current but
Python API use it to change process definition.
* Grant project's permission to user. Use when project's created user not current but Python API use it to change
* workflow.
*/
private Integer grantProjectToUser(Project project, User user) {
Date now = new Date();
@ -492,29 +499,31 @@ public class PythonGateway {
}
/**
* Get processDefinition by given processDefinitionName name. It return map contain processDefinition id, name, code.
* Useful in Python API create subProcess task which need processDefinition information.
* Get workflow object by given workflow name. It returns map contain workflow id, name, code.
* Useful in Python API create subProcess task which need workflow information.
*
* @param userName user who create or update schedule
* @param projectName project name which process definition belongs to
* @param processDefinitionName process definition name
* @param projectName project name which workflow belongs to
* @param workflowName workflow name
*/
public Map<String, Object> getProcessDefinitionInfo(String userName, String projectName, String processDefinitionName) {
public Map<String, Object> getWorkflowInfo(String userName, String projectName,
String workflowName) {
Map<String, Object> result = new HashMap<>();
User user = usersService.queryUser(userName);
Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST);
long projectCode = project.getCode();
ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, processDefinitionName);
// get process definition info
ProcessDefinition processDefinition = getWorkflow(user, projectCode, workflowName);
// get workflow info
if (processDefinition != null) {
// make sure process definition online
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinition.getCode(), ReleaseState.ONLINE);
// make sure workflow online
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinition.getCode(),
ReleaseState.ONLINE);
result.put("id", processDefinition.getId());
result.put("name", processDefinition.getName());
result.put("code", processDefinition.getCode());
} else {
String msg = String.format("Can not find valid process definition by name %s", processDefinitionName);
String msg = String.format("Can not find valid workflow by name %s", workflowName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
@ -523,14 +532,14 @@ public class PythonGateway {
}
/**
* Get project, process definition, task code.
* Useful in Python API create dependent task which need processDefinition information.
* Get project, workflow, task code.
* Useful in Python API create dependent task which need workflow information.
*
* @param projectName project name which process definition belongs to
* @param processDefinitionName process definition name
* @param projectName project name which workflow belongs to
* @param workflowName workflow name
* @param taskName task name
*/
public Map<String, Object> getDependentInfo(String projectName, String processDefinitionName, String taskName) {
public Map<String, Object> getDependentInfo(String projectName, String workflowName, String taskName) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
@ -542,9 +551,10 @@ public class PythonGateway {
long projectCode = project.getCode();
result.put("projectCode", projectCode);
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName);
ProcessDefinition processDefinition =
processDefinitionMapper.queryByDefineName(projectCode, workflowName);
if (processDefinition == null) {
String msg = String.format("Can not find valid process definition by name %s", processDefinitionName);
String msg = String.format("Can not find valid workflow by name %s", workflowName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
@ -558,8 +568,8 @@ public class PythonGateway {
}
/**
* Get resource by given program type and full name. It return map contain resource id, name.
* Useful in Python API create flink or spark task which need processDefinition information.
* Get resource by given program type and full name. It returns map contain resource id, name.
* Useful in Python API create flink or spark task which need workflow information.
*
* @param programType program type one of SCALA, JAVA and PYTHON
* @param fullName full name of the resource
@ -602,7 +612,7 @@ public class PythonGateway {
/**
* Get resource by given resource type and full name. It return map contain resource id, name.
* Useful in Python API create task which need processDefinition information.
* Useful in Python API create task which need workflow information.
*
* @param userName user who query resource
* @param fullName full name of the resource

Loading…
Cancel
Save