|
|
|
@ -59,6 +59,8 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
|
|
|
|
|
import org.apache.dolphinscheduler.server.config.PythonGatewayConfig; |
|
|
|
|
import org.apache.dolphinscheduler.spi.enums.ResourceType; |
|
|
|
|
|
|
|
|
|
import org.apache.commons.collections.CollectionUtils; |
|
|
|
|
|
|
|
|
|
import java.net.InetAddress; |
|
|
|
|
import java.net.UnknownHostException; |
|
|
|
|
import java.util.Date; |
|
|
|
@ -66,7 +68,6 @@ import java.util.HashMap;
|
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.Map; |
|
|
|
|
import java.util.Objects; |
|
|
|
|
import java.util.TimeZone; |
|
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
@ -74,7 +75,6 @@ import javax.annotation.PostConstruct;
|
|
|
|
|
import org.slf4j.Logger; |
|
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
|
import org.springframework.beans.factory.annotation.Value; |
|
|
|
|
import org.springframework.boot.SpringApplication; |
|
|
|
|
import org.springframework.boot.autoconfigure.SpringBootApplication; |
|
|
|
|
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; |
|
|
|
@ -82,8 +82,6 @@ import org.springframework.context.annotation.ComponentScan;
|
|
|
|
|
|
|
|
|
|
import py4j.GatewayServer; |
|
|
|
|
|
|
|
|
|
import org.apache.commons.collections.CollectionUtils; |
|
|
|
|
|
|
|
|
|
@SpringBootApplication |
|
|
|
|
@ComponentScan(value = "org.apache.dolphinscheduler") |
|
|
|
|
public class PythonGatewayServer extends SpringBootServletInitializer { |
|
|
|
@ -148,14 +146,6 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
|
|
|
|
|
@Autowired |
|
|
|
|
private ProjectUserMapper projectUserMapper; |
|
|
|
|
|
|
|
|
|
@Value("${spring.jackson.time-zone:UTC}") |
|
|
|
|
private String timezone; |
|
|
|
|
|
|
|
|
|
@PostConstruct |
|
|
|
|
public void init() { |
|
|
|
|
TimeZone.setDefault(TimeZone.getTimeZone(timezone)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO replace this user to build in admin user if we make sure build in one could not be change
|
|
|
|
|
private final User dummyAdminUser = new User() { |
|
|
|
|
{ |
|
|
|
@ -207,19 +197,19 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
|
|
|
|
|
* If process definition do not exists in Project=`projectCode` would create a new one |
|
|
|
|
* If process definition already exists in Project=`projectCode` would update it |
|
|
|
|
* |
|
|
|
|
* @param userName user name who create or update process definition |
|
|
|
|
* @param projectName project name which process definition belongs to |
|
|
|
|
* @param name process definition name |
|
|
|
|
* @param description description |
|
|
|
|
* @param globalParams global params |
|
|
|
|
* @param schedule schedule for process definition, will not set schedule if null, |
|
|
|
|
* and if would always fresh exists schedule if not null |
|
|
|
|
* @param locations locations json object about all tasks |
|
|
|
|
* @param timeout timeout for process definition working, if running time longer than timeout, |
|
|
|
|
* task will mark as fail |
|
|
|
|
* @param workerGroup run task in which worker group |
|
|
|
|
* @param tenantCode tenantCode |
|
|
|
|
* @param taskRelationJson relation json for nodes |
|
|
|
|
* @param userName user name who create or update process definition |
|
|
|
|
* @param projectName project name which process definition belongs to |
|
|
|
|
* @param name process definition name |
|
|
|
|
* @param description description |
|
|
|
|
* @param globalParams global params |
|
|
|
|
* @param schedule schedule for process definition, will not set schedule if null, |
|
|
|
|
* and if would always fresh exists schedule if not null |
|
|
|
|
* @param locations locations json object about all tasks |
|
|
|
|
* @param timeout timeout for process definition working, if running time longer than timeout, |
|
|
|
|
* task will mark as fail |
|
|
|
|
* @param workerGroup run task in which worker group |
|
|
|
|
* @param tenantCode tenantCode |
|
|
|
|
* @param taskRelationJson relation json for nodes |
|
|
|
|
* @param taskDefinitionJson taskDefinitionJson |
|
|
|
|
* @return create result code |
|
|
|
|
*/ |
|
|
|
@ -248,10 +238,10 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
|
|
|
|
|
// make sure process definition offline which could edit
|
|
|
|
|
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE); |
|
|
|
|
Map<String, Object> result = processDefinitionService.updateProcessDefinition(user, projectCode, name, processDefinitionCode, description, globalParams, |
|
|
|
|
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson, executionType); |
|
|
|
|
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson, executionType); |
|
|
|
|
} else { |
|
|
|
|
Map<String, Object> result = processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams, |
|
|
|
|
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson, executionType); |
|
|
|
|
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson, executionType); |
|
|
|
|
processDefinition = (ProcessDefinition) result.get(Constants.DATA_LIST); |
|
|
|
|
processDefinitionCode = processDefinition.getCode(); |
|
|
|
|
} |
|
|
|
@ -266,8 +256,9 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* get process definition |
|
|
|
|
* @param user user who create or update schedule |
|
|
|
|
* @param projectCode project which process definition belongs to |
|
|
|
|
* |
|
|
|
|
* @param user user who create or update schedule |
|
|
|
|
* @param projectCode project which process definition belongs to |
|
|
|
|
* @param processDefinitionName process definition name |
|
|
|
|
*/ |
|
|
|
|
private ProcessDefinition getProcessDefinition(User user, long projectCode, String processDefinitionName) { |
|
|
|
@ -291,11 +282,11 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
|
|
|
|
|
* It would always use latest schedule define in workflow-as-code, and set schedule online when |
|
|
|
|
* it's not null |
|
|
|
|
* |
|
|
|
|
* @param user user who create or update schedule |
|
|
|
|
* @param projectCode project which process definition belongs to |
|
|
|
|
* @param user user who create or update schedule |
|
|
|
|
* @param projectCode project which process definition belongs to |
|
|
|
|
* @param processDefinitionCode process definition code |
|
|
|
|
* @param schedule schedule expression |
|
|
|
|
* @param workerGroup work group |
|
|
|
|
* @param schedule schedule expression |
|
|
|
|
* @param workerGroup work group |
|
|
|
|
*/ |
|
|
|
|
private void createOrUpdateSchedule(User user, |
|
|
|
|
long projectCode, |
|
|
|
@ -308,13 +299,13 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
|
|
|
|
|
if (scheduleObj == null) { |
|
|
|
|
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE); |
|
|
|
|
Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, DEFAULT_WARNING_TYPE, |
|
|
|
|
DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); |
|
|
|
|
DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); |
|
|
|
|
scheduleId = (int) result.get("scheduleId"); |
|
|
|
|
} else { |
|
|
|
|
scheduleId = scheduleObj.getId(); |
|
|
|
|
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE); |
|
|
|
|
schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, DEFAULT_WARNING_TYPE, |
|
|
|
|
DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); |
|
|
|
|
DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); |
|
|
|
|
} |
|
|
|
|
schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE); |
|
|
|
|
} |
|
|
|
@ -334,24 +325,24 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
|
|
|
|
|
processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(), ReleaseState.ONLINE); |
|
|
|
|
|
|
|
|
|
executorService.execProcessInstance(user, |
|
|
|
|
project.getCode(), |
|
|
|
|
processDefinition.getCode(), |
|
|
|
|
cronTime, |
|
|
|
|
null, |
|
|
|
|
DEFAULT_FAILURE_STRATEGY, |
|
|
|
|
null, |
|
|
|
|
DEFAULT_TASK_DEPEND_TYPE, |
|
|
|
|
DEFAULT_WARNING_TYPE, |
|
|
|
|
DEFAULT_WARNING_GROUP_ID, |
|
|
|
|
DEFAULT_RUN_MODE, |
|
|
|
|
DEFAULT_PRIORITY, |
|
|
|
|
workerGroup, |
|
|
|
|
DEFAULT_ENVIRONMENT_CODE, |
|
|
|
|
timeout, |
|
|
|
|
null, |
|
|
|
|
null, |
|
|
|
|
DEFAULT_DRY_RUN, |
|
|
|
|
COMPLEMENT_DEPENDENT_MODE |
|
|
|
|
project.getCode(), |
|
|
|
|
processDefinition.getCode(), |
|
|
|
|
cronTime, |
|
|
|
|
null, |
|
|
|
|
DEFAULT_FAILURE_STRATEGY, |
|
|
|
|
null, |
|
|
|
|
DEFAULT_TASK_DEPEND_TYPE, |
|
|
|
|
DEFAULT_WARNING_TYPE, |
|
|
|
|
DEFAULT_WARNING_GROUP_ID, |
|
|
|
|
DEFAULT_RUN_MODE, |
|
|
|
|
DEFAULT_PRIORITY, |
|
|
|
|
workerGroup, |
|
|
|
|
DEFAULT_ENVIRONMENT_CODE, |
|
|
|
|
timeout, |
|
|
|
|
null, |
|
|
|
|
null, |
|
|
|
|
DEFAULT_DRY_RUN, |
|
|
|
|
COMPLEMENT_DEPENDENT_MODE |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -468,8 +459,8 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
|
|
|
|
|
* Get processDefinition by given processDefinitionName name. It return map contain processDefinition id, name, code. |
|
|
|
|
* Useful in Python API create subProcess task which need processDefinition information. |
|
|
|
|
* |
|
|
|
|
* @param userName user who create or update schedule |
|
|
|
|
* @param projectName project name which process definition belongs to |
|
|
|
|
* @param userName user who create or update schedule |
|
|
|
|
* @param projectName project name which process definition belongs to |
|
|
|
|
* @param processDefinitionName process definition name |
|
|
|
|
*/ |
|
|
|
|
public Map<String, Object> getProcessDefinitionInfo(String userName, String projectName, String processDefinitionName) { |
|
|
|
@ -499,9 +490,9 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
|
|
|
|
|
* Get project, process definition, task code. |
|
|
|
|
* Useful in Python API create dependent task which need processDefinition information. |
|
|
|
|
* |
|
|
|
|
* @param projectName project name which process definition belongs to |
|
|
|
|
* @param projectName project name which process definition belongs to |
|
|
|
|
* @param processDefinitionName process definition name |
|
|
|
|
* @param taskName task name |
|
|
|
|
* @param taskName task name |
|
|
|
|
*/ |
|
|
|
|
public Map<String, Object> getDependentInfo(String projectName, String processDefinitionName, String taskName) { |
|
|
|
|
Map<String, Object> result = new HashMap<>(); |
|
|
|
@ -535,7 +526,7 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
|
|
|
|
|
* Useful in Python API create flink or spark task which need processDefinition information. |
|
|
|
|
* |
|
|
|
|
* @param programType program type one of SCALA, JAVA and PYTHON |
|
|
|
|
* @param fullName full name of the resource |
|
|
|
|
* @param fullName full name of the resource |
|
|
|
|
*/ |
|
|
|
|
public Map<String, Object> getResourcesFileInfo(String programType, String fullName) { |
|
|
|
|
Map<String, Object> result = new HashMap<>(); |
|
|
|
@ -561,14 +552,14 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
|
|
|
|
|
InetAddress gatewayHost = InetAddress.getByName(pythonGatewayConfig.getGatewayServerAddress()); |
|
|
|
|
InetAddress pythonHost = InetAddress.getByName(pythonGatewayConfig.getPythonAddress()); |
|
|
|
|
server = new GatewayServer( |
|
|
|
|
this, |
|
|
|
|
pythonGatewayConfig.getGatewayServerPort(), |
|
|
|
|
pythonGatewayConfig.getPythonPort(), |
|
|
|
|
gatewayHost, |
|
|
|
|
pythonHost, |
|
|
|
|
pythonGatewayConfig.getConnectTimeout(), |
|
|
|
|
pythonGatewayConfig.getReadTimeout(), |
|
|
|
|
null |
|
|
|
|
this, |
|
|
|
|
pythonGatewayConfig.getGatewayServerPort(), |
|
|
|
|
pythonGatewayConfig.getPythonPort(), |
|
|
|
|
gatewayHost, |
|
|
|
|
pythonHost, |
|
|
|
|
pythonGatewayConfig.getConnectTimeout(), |
|
|
|
|
pythonGatewayConfig.getReadTimeout(), |
|
|
|
|
null |
|
|
|
|
); |
|
|
|
|
GatewayServer.turnLoggingOn(); |
|
|
|
|
logger.info("PythonGatewayServer started on: " + gatewayHost.toString()); |
|
|
|
|