Browse Source

[Improvement-3369][api] Introduce executor and datasource service interface for clear code (#4759)

pull/3/MERGE
Shiwen Cheng 4 years ago committed by GitHub
parent
commit
15a5b05883
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
  2. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
  3. 538
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java
  4. 499
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  5. 660
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
  6. 579
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  7. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
  8. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
  9. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
  10. 4
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java

@ -35,7 +35,6 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import java.text.ParseException;
import java.util.Map;
import org.slf4j.Logger;
@ -121,7 +120,7 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "timeout", required = false) Integer timeout,
@RequestParam(value = "startParams", required = false) String startParams) throws ParseException {
@RequestParam(value = "startParams", required = false) String startParams) {
logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, "
+ "failure policy: {}, node name: {}, node dep: {}, notify type: {}, "
+ "notify group id: {}, run mode: {},process instance priority:{}, workerGroup: {}, timeout: {}, startParams: {} ",

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java

@ -322,9 +322,7 @@ public class ResourcesController extends BaseController {
@RequestParam(value = "programType",required = false) ProgramType programType
) {
String programTypeName = programType == null ? "" : programType.name();
String userName = loginUser.getUserName();
userName = userName.replaceAll("[\n|\r|\t]", "_");
logger.info("query resource list, login user:{}, resource type:{}, program type:{}", userName,programTypeName);
logger.info("query resource list, resource type:{}, program type:{}", type, programTypeName);
Map<String, Object> result = resourceService.queryResourceByProgramType(loginUser, type,programType);
return returnDataList(result);
}
@ -641,9 +639,7 @@ public class ResourcesController extends BaseController {
@ApiException(QUERY_DATASOURCE_BY_TYPE_ERROR)
public Result<Object> queryUdfFuncList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("type") UdfType type) {
String userName = loginUser.getUserName();
userName = userName.replaceAll("[\n|\r|\t]", "_");
logger.info("query udf func list, user:{}, type:{}", userName, type);
logger.info("query udf func list, type:{}", type);
Map<String, Object> result = udfFuncService.queryUdfFuncList(loginUser, type.ordinal());
return returnDataList(result);
}

538
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java

@ -17,67 +17,17 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbConnectType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.dao.datasource.OracleDataSource;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* datasource service
*/
@Service
public class DataSourceService extends BaseService {
private static final Logger logger = LoggerFactory.getLogger(DataSourceService.class);
public static final String NAME = "name";
public static final String NOTE = "note";
public static final String TYPE = "type";
public static final String HOST = "host";
public static final String PORT = "port";
public static final String PRINCIPAL = "principal";
public static final String DATABASE = "database";
public static final String USER_NAME = "userName";
public static final String OTHER = "other";
@Autowired
private DataSourceMapper dataSourceMapper;
@Autowired
private DataSourceUserMapper datasourceUserMapper;
public interface DataSourceService {
/**
* create data source
@ -89,37 +39,7 @@ public class DataSourceService extends BaseService {
* @param parameter datasource parameters
* @return create result code
*/
public Result<Object> createDataSource(User loginUser, String name, String desc, DbType type, String parameter) {
Result<Object> result = new Result<>();
// check name can use or not
if (checkName(name)) {
putMsg(result, Status.DATASOURCE_EXIST);
return result;
}
Result<Object> isConnection = checkConnection(type, parameter);
if (Status.SUCCESS.getCode() != isConnection.getCode()) {
return result;
}
// build datasource
DataSource dataSource = new DataSource();
Date now = new Date();
dataSource.setName(name.trim());
dataSource.setNote(desc);
dataSource.setUserId(loginUser.getId());
dataSource.setUserName(loginUser.getUserName());
dataSource.setType(type);
dataSource.setConnectionParams(parameter);
dataSource.setCreateTime(now);
dataSource.setUpdateTime(now);
dataSourceMapper.insert(dataSource);
putMsg(result, Status.SUCCESS);
return result;
}
Result<Object> createDataSource(User loginUser, String name, String desc, DbType type, String parameter);
/**
* updateProcessInstance datasource
@ -132,59 +52,7 @@ public class DataSourceService extends BaseService {
* @param id data source id
* @return update result code
*/
public Result<Object> updateDataSource(int id, User loginUser, String name, String desc, DbType type, String parameter) {
Result<Object> result = new Result<>();
// determine whether the data source exists
DataSource dataSource = dataSourceMapper.selectById(id);
if (dataSource == null) {
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
if (!hasPerm(loginUser, dataSource.getUserId())) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
//check name can use or not
if (!name.trim().equals(dataSource.getName()) && checkName(name)) {
putMsg(result, Status.DATASOURCE_EXIST);
return result;
}
//check password,if the password is not updated, set to the old password.
ObjectNode paramObject = JSONUtils.parseObject(parameter);
String password = paramObject.path(Constants.PASSWORD).asText();
if (StringUtils.isBlank(password)) {
String oldConnectionParams = dataSource.getConnectionParams();
ObjectNode oldParams = JSONUtils.parseObject(oldConnectionParams);
paramObject.put(Constants.PASSWORD, oldParams.path(Constants.PASSWORD).asText());
}
// connectionParams json
String connectionParams = paramObject.toString();
Result<Object> isConnection = checkConnection(type, parameter);
if (Status.SUCCESS.getCode() != isConnection.getCode()) {
return result;
}
Date now = new Date();
dataSource.setName(name.trim());
dataSource.setNote(desc);
dataSource.setUserName(loginUser.getUserName());
dataSource.setType(type);
dataSource.setConnectionParams(connectionParams);
dataSource.setUpdateTime(now);
dataSourceMapper.updateById(dataSource);
putMsg(result, Status.SUCCESS);
return result;
}
private boolean checkName(String name) {
List<DataSource> queryDataSource = dataSourceMapper.queryDataSourceByName(name.trim());
return queryDataSource != null && queryDataSource.size() > 0;
}
Result<Object> updateDataSource(int id, User loginUser, String name, String desc, DbType type, String parameter);
/**
* updateProcessInstance datasource
@ -192,91 +60,7 @@ public class DataSourceService extends BaseService {
* @param id datasource id
* @return data source detail
*/
public Map<String, Object> queryDataSource(int id) {
Map<String, Object> result = new HashMap<String, Object>(5);
DataSource dataSource = dataSourceMapper.selectById(id);
if (dataSource == null) {
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
// type
String dataSourceType = dataSource.getType().toString();
// name
String dataSourceName = dataSource.getName();
// desc
String desc = dataSource.getNote();
// parameter
String parameter = dataSource.getConnectionParams();
BaseDataSource datasourceForm = DataSourceFactory.getDatasource(dataSource.getType(), parameter);
DbConnectType connectType = null;
String hostSeperator = Constants.DOUBLE_SLASH;
if (DbType.ORACLE.equals(dataSource.getType())) {
connectType = ((OracleDataSource) datasourceForm).getConnectType();
if (DbConnectType.ORACLE_SID.equals(connectType)) {
hostSeperator = Constants.AT_SIGN;
}
}
String database = datasourceForm.getDatabase();
// jdbc connection params
String other = datasourceForm.getOther();
String address = datasourceForm.getAddress();
String[] hostsPorts = getHostsAndPort(address, hostSeperator);
// ip host
String host = hostsPorts[0];
// prot
String port = hostsPorts[1];
String separator = "";
switch (dataSource.getType()) {
case HIVE:
case SQLSERVER:
separator = ";";
break;
case MYSQL:
case POSTGRESQL:
case CLICKHOUSE:
case ORACLE:
case PRESTO:
separator = "&";
break;
default:
separator = "&";
break;
}
Map<String, String> otherMap = new LinkedHashMap<String, String>();
if (other != null) {
String[] configs = other.split(separator);
for (String config : configs) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
}
Map<String, Object> map = new HashMap<>(10);
map.put(NAME, dataSourceName);
map.put(NOTE, desc);
map.put(TYPE, dataSourceType);
if (connectType != null) {
map.put(Constants.ORACLE_DB_CONNECT_TYPE, connectType);
}
map.put(HOST, host);
map.put(PORT, port);
map.put(PRINCIPAL, datasourceForm.getPrincipal());
map.put(Constants.KERBEROS_KRB5_CONF_PATH, datasourceForm.getJavaSecurityKrb5Conf());
map.put(Constants.KERBEROS_KEY_TAB_USERNAME, datasourceForm.getLoginUserKeytabUsername());
map.put(Constants.KERBEROS_KEY_TAB_PATH, datasourceForm.getLoginUserKeytabPath());
map.put(DATABASE, database);
map.put(USER_NAME, datasourceForm.getUser());
map.put(OTHER, otherMap);
result.put(Constants.DATA_LIST, map);
putMsg(result, Status.SUCCESS);
return result;
}
Map<String, Object> queryDataSource(int id);
/**
* query datasource list by keyword
@ -287,44 +71,7 @@ public class DataSourceService extends BaseService {
* @param pageSize page size
* @return data source list page
*/
public Map<String, Object> queryDataSourceListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
Map<String, Object> result = new HashMap<>();
IPage<DataSource> dataSourceList = null;
Page<DataSource> dataSourcePage = new Page(pageNo, pageSize);
if (isAdmin(loginUser)) {
dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, 0, searchVal);
} else {
dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, loginUser.getId(), searchVal);
}
List<DataSource> dataSources = dataSourceList != null ? dataSourceList.getRecords() : new ArrayList<>();
handlePasswd(dataSources);
PageInfo pageInfo = new PageInfo<Resource>(pageNo, pageSize);
pageInfo.setTotalCount((int) (dataSourceList != null ? dataSourceList.getTotal() : 0L));
pageInfo.setLists(dataSources);
result.put(Constants.DATA_LIST, pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* handle datasource connection password for safety
*
* @param dataSourceList
*/
private void handlePasswd(List<DataSource> dataSourceList) {
for (DataSource dataSource : dataSourceList) {
String connectionParams = dataSource.getConnectionParams();
ObjectNode object = JSONUtils.parseObject(connectionParams);
object.put(Constants.PASSWORD, Constants.XXXXXX);
dataSource.setConnectionParams(object.toString());
}
}
Map<String, Object> queryDataSourceListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize);
/**
* query data resource list
@ -333,22 +80,7 @@ public class DataSourceService extends BaseService {
* @param type data source type
* @return data source list page
*/
public Map<String, Object> queryDataSourceList(User loginUser, Integer type) {
Map<String, Object> result = new HashMap<>();
List<DataSource> datasourceList;
if (isAdmin(loginUser)) {
datasourceList = dataSourceMapper.listAllDataSourceByType(type);
} else {
datasourceList = dataSourceMapper.queryDataSourceByType(loginUser.getId(), type);
}
result.put(Constants.DATA_LIST, datasourceList);
putMsg(result, Status.SUCCESS);
return result;
}
Map<String, Object> queryDataSourceList(User loginUser, Integer type);
/**
* verify datasource exists
@ -356,18 +88,7 @@ public class DataSourceService extends BaseService {
* @param name datasource name
* @return true if data datasource not exists, otherwise return false
*/
public Result<Object> verifyDataSourceName(String name) {
Result<Object> result = new Result<>();
List<DataSource> dataSourceList = dataSourceMapper.queryDataSourceByName(name);
if (dataSourceList != null && dataSourceList.size() > 0) {
logger.error("datasource name:{} has exist, can't create again.", name);
putMsg(result, Status.DATASOURCE_EXIST);
} else {
putMsg(result, Status.SUCCESS);
}
return result;
}
Result<Object> verifyDataSourceName(String name);
/**
* check connection
@ -376,25 +97,7 @@ public class DataSourceService extends BaseService {
* @param parameter data source parameters
* @return true if connect successfully, otherwise false
*/
public Result<Object> checkConnection(DbType type, String parameter) {
Result<Object> result = new Result<>();
BaseDataSource datasource = DataSourceFactory.getDatasource(type, parameter);
if (datasource == null) {
putMsg(result, Status.DATASOURCE_TYPE_NOT_EXIST, type);
return result;
}
try (Connection connection = datasource.getConnection()) {
if (connection == null) {
putMsg(result, Status.CONNECTION_TEST_FAILURE);
return result;
}
putMsg(result, Status.SUCCESS);
return result;
} catch (Exception e) {
logger.error("datasource test connection error, dbType:{}, jdbcUrl:{}, message:{}.", type, datasource.getJdbcUrl(), e.getMessage());
return new Result<>(Status.CONNECTION_TEST_FAILURE.getCode(),e.getMessage());
}
}
Result<Object> checkConnection(DbType type, String parameter);
/**
* test connection
@ -402,15 +105,7 @@ public class DataSourceService extends BaseService {
* @param id datasource id
* @return connect result code
*/
public Result<Object> connectionTest(int id) {
DataSource dataSource = dataSourceMapper.selectById(id);
if (dataSource == null) {
Result<Object> result = new Result<>();
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
return checkConnection(dataSource.getType(), dataSource.getConnectionParams());
}
Result<Object> connectionTest(int id);
/**
* build paramters
@ -425,116 +120,10 @@ public class DataSourceService extends BaseService {
* @param principal principal
* @return datasource parameter
*/
public String buildParameter(DbType type, String host,
String buildParameter(DbType type, String host,
String port, String database, String principal, String userName,
String password, DbConnectType connectType, String other,
String javaSecurityKrb5Conf, String loginUserKeytabUsername, String loginUserKeytabPath) {
String address = buildAddress(type, host, port, connectType);
Map<String, Object> parameterMap = new LinkedHashMap<String, Object>(6);
String jdbcUrl;
if (DbType.SQLSERVER == type) {
jdbcUrl = address + ";databaseName=" + database;
} else {
jdbcUrl = address + "/" + database;
}
if (Constants.ORACLE.equals(type.name())) {
parameterMap.put(Constants.ORACLE_DB_CONNECT_TYPE, connectType);
}
if (CommonUtils.getKerberosStartupState()
&& (type == DbType.HIVE || type == DbType.SPARK)) {
jdbcUrl += ";principal=" + principal;
}
String separator = "";
if (Constants.MYSQL.equals(type.name())
|| Constants.POSTGRESQL.equals(type.name())
|| Constants.CLICKHOUSE.equals(type.name())
|| Constants.ORACLE.equals(type.name())
|| Constants.PRESTO.equals(type.name())) {
separator = "&";
} else if (Constants.HIVE.equals(type.name())
|| Constants.SPARK.equals(type.name())
|| Constants.DB2.equals(type.name())
|| Constants.SQLSERVER.equals(type.name())) {
separator = ";";
}
parameterMap.put(TYPE, connectType);
parameterMap.put(Constants.ADDRESS, address);
parameterMap.put(Constants.DATABASE, database);
parameterMap.put(Constants.JDBC_URL, jdbcUrl);
parameterMap.put(Constants.USER, userName);
parameterMap.put(Constants.PASSWORD, CommonUtils.encodePassword(password));
if (CommonUtils.getKerberosStartupState()
&& (type == DbType.HIVE || type == DbType.SPARK)) {
parameterMap.put(Constants.PRINCIPAL, principal);
parameterMap.put(Constants.KERBEROS_KRB5_CONF_PATH, javaSecurityKrb5Conf);
parameterMap.put(Constants.KERBEROS_KEY_TAB_USERNAME, loginUserKeytabUsername);
parameterMap.put(Constants.KERBEROS_KEY_TAB_PATH, loginUserKeytabPath);
}
Map<String, String> map = JSONUtils.toMap(other);
if (map != null) {
StringBuilder otherSb = new StringBuilder();
for (Map.Entry<String, String> entry: map.entrySet()) {
otherSb.append(String.format("%s=%s%s", entry.getKey(), entry.getValue(), separator));
}
if (!Constants.DB2.equals(type.name())) {
otherSb.deleteCharAt(otherSb.length() - 1);
}
parameterMap.put(Constants.OTHER, otherSb);
}
if (logger.isDebugEnabled()) {
logger.info("parameters map:{}", JSONUtils.toJsonString(parameterMap));
}
return JSONUtils.toJsonString(parameterMap);
}
private String buildAddress(DbType type, String host, String port, DbConnectType connectType) {
StringBuilder sb = new StringBuilder();
if (Constants.MYSQL.equals(type.name())) {
sb.append(Constants.JDBC_MYSQL);
sb.append(host).append(":").append(port);
} else if (Constants.POSTGRESQL.equals(type.name())) {
sb.append(Constants.JDBC_POSTGRESQL);
sb.append(host).append(":").append(port);
} else if (Constants.HIVE.equals(type.name()) || Constants.SPARK.equals(type.name())) {
sb.append(Constants.JDBC_HIVE_2);
String[] hostArray = host.split(",");
if (hostArray.length > 0) {
for (String zkHost : hostArray) {
sb.append(String.format("%s:%s,", zkHost, port));
}
sb.deleteCharAt(sb.length() - 1);
}
} else if (Constants.CLICKHOUSE.equals(type.name())) {
sb.append(Constants.JDBC_CLICKHOUSE);
sb.append(host).append(":").append(port);
} else if (Constants.ORACLE.equals(type.name())) {
if (connectType == DbConnectType.ORACLE_SID) {
sb.append(Constants.JDBC_ORACLE_SID);
} else {
sb.append(Constants.JDBC_ORACLE_SERVICE_NAME);
}
sb.append(host).append(":").append(port);
} else if (Constants.SQLSERVER.equals(type.name())) {
sb.append(Constants.JDBC_SQLSERVER);
sb.append(host).append(":").append(port);
} else if (Constants.DB2.equals(type.name())) {
sb.append(Constants.JDBC_DB2);
sb.append(host).append(":").append(port);
} else if (Constants.PRESTO.equals(type.name())) {
sb.append(Constants.JDBC_PRESTO);
sb.append(host).append(":").append(port);
}
return sb.toString();
}
String javaSecurityKrb5Conf, String loginUserKeytabUsername, String loginUserKeytabPath);
/**
* delete datasource
@ -543,30 +132,7 @@ public class DataSourceService extends BaseService {
* @param datasourceId data source id
* @return delete result code
*/
@Transactional(rollbackFor = RuntimeException.class)
public Result<Object> delete(User loginUser, int datasourceId) {
Result<Object> result = new Result<>();
try {
//query datasource by id
DataSource dataSource = dataSourceMapper.selectById(datasourceId);
if (dataSource == null) {
logger.error("resource id {} not exist", datasourceId);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
if (!hasPerm(loginUser, dataSource.getUserId())) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
dataSourceMapper.deleteById(datasourceId);
datasourceUserMapper.deleteByDatasourceId(datasourceId);
putMsg(result, Status.SUCCESS);
} catch (Exception e) {
logger.error("delete datasource error", e);
throw new RuntimeException("delete datasource error");
}
return result;
}
Result<Object> delete(User loginUser, int datasourceId);
/**
* unauthorized datasource
@ -575,38 +141,7 @@ public class DataSourceService extends BaseService {
* @param userId user id
* @return unauthed data source result code
*/
public Map<String, Object> unauthDatasource(User loginUser, Integer userId) {
Map<String, Object> result = new HashMap<>();
//only admin operate
if (!isAdmin(loginUser)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
/**
* query all data sources except userId
*/
List<DataSource> resultList = new ArrayList<>();
List<DataSource> datasourceList = dataSourceMapper.queryDatasourceExceptUserId(userId);
Set<DataSource> datasourceSet = null;
if (datasourceList != null && datasourceList.size() > 0) {
datasourceSet = new HashSet<>(datasourceList);
List<DataSource> authedDataSourceList = dataSourceMapper.queryAuthedDatasource(userId);
Set<DataSource> authedDataSourceSet = null;
if (authedDataSourceList != null && authedDataSourceList.size() > 0) {
authedDataSourceSet = new HashSet<>(authedDataSourceList);
datasourceSet.removeAll(authedDataSourceSet);
}
resultList = new ArrayList<>(datasourceSet);
}
result.put(Constants.DATA_LIST, resultList);
putMsg(result, Status.SUCCESS);
return result;
}
Map<String, Object> unauthDatasource(User loginUser, Integer userId);
/**
* authorized datasource
@ -615,50 +150,5 @@ public class DataSourceService extends BaseService {
* @param userId user id
* @return authorized result code
*/
public Map<String, Object> authedDatasource(User loginUser, Integer userId) {
Map<String, Object> result = new HashMap<>();
if (!isAdmin(loginUser)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
List<DataSource> authedDatasourceList = dataSourceMapper.queryAuthedDatasource(userId);
result.put(Constants.DATA_LIST, authedDatasourceList);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* get host and port by address
*
* @param address address
* @return sting array: [host,port]
*/
private String[] getHostsAndPort(String address) {
return getHostsAndPort(address, Constants.DOUBLE_SLASH);
}
/**
* get host and port by address
*
* @param address address
* @param separator separator
* @return sting array: [host,port]
*/
private String[] getHostsAndPort(String address, String separator) {
String[] result = new String[2];
String[] tmpArray = address.split(separator);
String hostsAndPorts = tmpArray[tmpArray.length - 1];
StringBuilder hosts = new StringBuilder();
String[] hostPortArray = hostsAndPorts.split(Constants.COMMA);
String port = hostPortArray[0].split(Constants.COLON)[1];
for (String hostPort : hostPortArray) {
hosts.append(hostPort.split(Constants.COLON)[0]).append(Constants.COMMA);
}
hosts.deleteCharAt(hosts.length() - 1);
result[0] = hosts.toString();
result[1] = port;
return result;
}
Map<String, Object> authedDatasource(User loginUser, Integer userId);
}

499
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -17,82 +17,22 @@
package org.apache.dolphinscheduler.api.service;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* executor service
*/
@Service
public class ExecutorService extends BaseService {
private static final Logger logger = LoggerFactory.getLogger(ExecutorService.class);
@Autowired
private ProjectMapper projectMapper;
@Autowired
private ProjectService projectService;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@Autowired
private MonitorService monitorService;
@Autowired
private ProcessInstanceMapper processInstanceMapper;
@Autowired
private ProcessService processService;
public interface ExecutorService {
/**
* execute process instance
@ -113,80 +53,14 @@ public class ExecutorService extends BaseService {
* @param timeout timeout
* @param startParams the global param values which pass to new process instance
* @return execute process instance code
* @throws ParseException Parse Exception
*/
public Map<String, Object> execProcessInstance(User loginUser, String projectName,
Map<String, Object> execProcessInstance(User loginUser, String projectName,
int processDefinitionId, String cronTime, CommandType commandType,
FailureStrategy failureStrategy, String startNodeList,
TaskDependType taskDependType, WarningType warningType, int warningGroupId,
RunMode runMode,
Priority processInstancePriority, String workerGroup, Integer timeout,
Map<String, String> startParams) throws ParseException {
Map<String, Object> result = new HashMap<>();
// timeout is invalid
if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) {
putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR);
return result;
}
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResultAndAuth = checkResultAndAuth(loginUser, projectName, project);
if (checkResultAndAuth != null) {
return checkResultAndAuth;
}
// check process define release state
ProcessDefinition processDefinition = processDefinitionMapper.selectById(processDefinitionId);
result = checkProcessDefinitionValid(processDefinition, processDefinitionId);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
if (!checkTenantSuitable(processDefinition)) {
logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
return result;
}
// check master exists
if (!checkMasterExists(result)) {
return result;
}
/**
* create command
*/
int create = this.createCommand(commandType, processDefinitionId,
taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(),
warningGroupId, runMode, processInstancePriority, workerGroup, startParams);
if (create > 0) {
processDefinition.setWarningGroupId(warningGroupId);
processDefinitionMapper.updateById(processDefinition);
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.START_PROCESS_INSTANCE_ERROR);
}
return result;
}
/**
* check whether master exists
*
* @param result result
* @return master exists return true , otherwise return false
*/
private boolean checkMasterExists(Map<String, Object> result) {
// check master server exists
List<Server> masterServers = monitorService.getServerListFromZK(true);
// no master
if (masterServers.size() == 0) {
putMsg(result, Status.MASTER_NOT_EXISTS);
return false;
}
return true;
}
Map<String, String> startParams);
/**
* check whether the process definition can be executed
@ -195,19 +69,7 @@ public class ExecutorService extends BaseService {
* @param processDefineId process definition id
* @return check result code
*/
public Map<String, Object> checkProcessDefinitionValid(ProcessDefinition processDefinition, int processDefineId) {
Map<String, Object> result = new HashMap<>();
if (processDefinition == null) {
// check process definition exists
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId);
} else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
// check process definition online
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefineId);
} else {
result.put(Constants.STATUS, Status.SUCCESS);
}
return result;
}
Map<String, Object> checkProcessDefinitionValid(ProcessDefinition processDefinition, int processDefineId);
/**
* do action to process instancepause, stop, repeat, recover from pause, recover from stop
@ -218,194 +80,7 @@ public class ExecutorService extends BaseService {
* @param executeType execute type
* @return execute result code
*/
public Map<String, Object> execute(User loginUser, String projectName, Integer processInstanceId, ExecuteType executeType) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = checkResultAndAuth(loginUser, projectName, project);
if (checkResult != null) {
return checkResult;
}
// check master exists
if (!checkMasterExists(result)) {
return result;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
if (processInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) {
result = checkProcessDefinitionValid(processDefinition, processInstance.getProcessDefinitionId());
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
}
checkResult = checkExecuteType(processInstance, executeType);
Status status = (Status) checkResult.get(Constants.STATUS);
if (status != Status.SUCCESS) {
return checkResult;
}
if (!checkTenantSuitable(processDefinition)) {
logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
}
switch (executeType) {
case REPEAT_RUNNING:
result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING);
break;
case RECOVER_SUSPENDED_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RECOVER_SUSPENDED_PROCESS);
break;
case START_FAILURE_TASK_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.START_FAILURE_TASK_PROCESS);
break;
case STOP:
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
} else {
result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP);
}
break;
case PAUSE:
if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
} else {
result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE);
}
break;
default:
logger.error("unknown execute type : {}", executeType);
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");
break;
}
return result;
}
/**
* check tenant suitable
*
* @param processDefinition process definition
* @return true if tenant suitable, otherwise return false
*/
private boolean checkTenantSuitable(ProcessDefinition processDefinition) {
// checkTenantExists();
Tenant tenant = processService.getTenantForProcess(processDefinition.getTenantId(),
processDefinition.getUserId());
return tenant != null;
}
/**
* Check the state of process instance and the type of operation match
*
* @param processInstance process instance
* @param executeType execute type
* @return check result code
*/
private Map<String, Object> checkExecuteType(ProcessInstance processInstance, ExecuteType executeType) {
Map<String, Object> result = new HashMap<>();
ExecutionStatus executionStatus = processInstance.getState();
boolean checkResult = false;
switch (executeType) {
case PAUSE:
case STOP:
if (executionStatus.typeIsRunning()) {
checkResult = true;
}
break;
case REPEAT_RUNNING:
if (executionStatus.typeIsFinished()) {
checkResult = true;
}
break;
case START_FAILURE_TASK_PROCESS:
if (executionStatus.typeIsFailure()) {
checkResult = true;
}
break;
case RECOVER_SUSPENDED_PROCESS:
if (executionStatus.typeIsPause() || executionStatus.typeIsCancel()) {
checkResult = true;
}
break;
default:
break;
}
if (!checkResult) {
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), executionStatus.toString(), executeType.toString());
} else {
putMsg(result, Status.SUCCESS);
}
return result;
}
/**
* prepare to update process instance command type and status
*
* @param processInstance process instance
* @param commandType command type
* @param executionStatus execute status
* @return update result
*/
private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) {
Map<String, Object> result = new HashMap<>();
processInstance.setCommandType(commandType);
processInstance.addHistoryCmd(commandType);
processInstance.setState(executionStatus);
int update = processService.updateProcessInstance(processInstance);
// determine whether the process is normal
if (update > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
}
return result;
}
/**
* insert command, used in the implementation of the page, re run, recovery (pause / failure) execution
*
* @param loginUser login user
* @param instanceId instance id
* @param processDefinitionId process definition id
* @param commandType command type
* @return insert result code
*/
private Map<String, Object> insertCommand(User loginUser, Integer instanceId, Integer processDefinitionId, CommandType commandType) {
Map<String, Object> result = new HashMap<>();
Command command = new Command();
command.setCommandType(commandType);
command.setProcessDefinitionId(processDefinitionId);
command.setCommandParam(String.format("{\"%s\":%d}",
CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId));
command.setExecutorId(loginUser.getId());
if (!processService.verifyIsNeedCreateCommand(command)) {
putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionId);
return result;
}
int create = processService.createCommand(command);
if (create > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
}
return result;
}
Map<String, Object> execute(User loginUser, String projectName, Integer processInstanceId, ExecuteType executeType);
/**
* check if sub processes are offline before starting process definition
@ -413,167 +88,5 @@ public class ExecutorService extends BaseService {
* @param processDefineId process definition id
* @return check result code
*/
public Map<String, Object> startCheckByProcessDefinedId(int processDefineId) {
Map<String, Object> result = new HashMap<>();
if (processDefineId == 0) {
logger.error("process definition id is null");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "process definition id");
}
List<Integer> ids = new ArrayList<>();
processService.recurseFindSubProcessId(processDefineId, ids);
Integer[] idArray = ids.toArray(new Integer[ids.size()]);
if (!ids.isEmpty()) {
List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryDefinitionListByIdList(idArray);
if (processDefinitionList != null) {
for (ProcessDefinition processDefinition : processDefinitionList) {
/**
* if there is no online process, exit directly
*/
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
logger.info("not release process definition id: {} , name : {}",
processDefinition.getId(), processDefinition.getName());
return result;
}
}
}
}
putMsg(result, Status.SUCCESS);
return result;
}
/**
* create command
*
* @param commandType commandType
* @param processDefineId processDefineId
* @param nodeDep nodeDep
* @param failureStrategy failureStrategy
* @param startNodeList startNodeList
* @param schedule schedule
* @param warningType warningType
* @param executorId executorId
* @param warningGroupId warningGroupId
* @param runMode runMode
* @param processInstancePriority processInstancePriority
* @param workerGroup workerGroup
* @return command id
*/
private int createCommand(CommandType commandType, int processDefineId,
TaskDependType nodeDep, FailureStrategy failureStrategy,
String startNodeList, String schedule, WarningType warningType,
int executorId, int warningGroupId,
RunMode runMode, Priority processInstancePriority, String workerGroup,
Map<String, String> startParams) throws ParseException {
/**
* instantiate command schedule instance
*/
Command command = new Command();
Map<String, String> cmdParam = new HashMap<>();
if (commandType == null) {
command.setCommandType(CommandType.START_PROCESS);
} else {
command.setCommandType(commandType);
}
command.setProcessDefinitionId(processDefineId);
if (nodeDep != null) {
command.setTaskDependType(nodeDep);
}
if (failureStrategy != null) {
command.setFailureStrategy(failureStrategy);
}
if (StringUtils.isNotEmpty(startNodeList)) {
cmdParam.put(CMD_PARAM_START_NODE_NAMES, startNodeList);
}
if (warningType != null) {
command.setWarningType(warningType);
}
if (startParams != null && startParams.size() > 0) {
cmdParam.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(startParams));
}
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
command.setExecutorId(executorId);
command.setWarningGroupId(warningGroupId);
command.setProcessInstancePriority(processInstancePriority);
command.setWorkerGroup(workerGroup);
Date start = null;
Date end = null;
if (StringUtils.isNotEmpty(schedule)) {
String[] interval = schedule.split(",");
if (interval.length == 2) {
start = DateUtils.getScheduleDate(interval[0]);
end = DateUtils.getScheduleDate(interval[1]);
}
}
// determine whether to complement
if (commandType == CommandType.COMPLEMENT_DATA) {
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
if (null != start && null != end && !start.after(end)) {
if (runMode == RunMode.RUN_MODE_SERIAL) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
return processService.createCommand(command);
} else if (runMode == RunMode.RUN_MODE_PARALLEL) {
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefineId);
List<Date> listDate = new LinkedList<>();
if (!CollectionUtils.isEmpty(schedules)) {
for (Schedule item : schedules) {
listDate.addAll(CronUtils.getSelfFireDateList(start, end, item.getCrontab()));
}
}
if (!CollectionUtils.isEmpty(listDate)) {
// loop by schedule date
for (Date date : listDate) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(date));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(date));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
processService.createCommand(command);
}
return listDate.size();
} else {
// loop by day
int runCunt = 0;
while (!start.after(end)) {
runCunt += 1;
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
processService.createCommand(command);
start = DateUtils.getSomeDay(start, 1);
}
return runCunt;
}
}
} else {
logger.error("there is not valid schedule date for the process definition: id:{},date:{}",
processDefineId, schedule);
}
} else {
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
return processService.createCommand(command);
}
return 0;
}
/**
* check result and auth
*/
private Map<String, Object> checkResultAndAuth(User loginUser, String projectName, Project project) {
// check project auth
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status status = (Status) checkResult.get(Constants.STATUS);
if (status != Status.SUCCESS) {
return checkResult;
}
return null;
}
Map<String, Object> startCheckByProcessDefinedId(int processDefineId);
}

660
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java

@ -0,0 +1,660 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.DataSourceService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbConnectType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.dao.datasource.OracleDataSource;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* datasource service impl
*/
@Service
public class DataSourceServiceImpl extends BaseService implements DataSourceService {
private static final Logger logger = LoggerFactory.getLogger(DataSourceServiceImpl.class);
public static final String NAME = "name";
public static final String NOTE = "note";
public static final String TYPE = "type";
public static final String HOST = "host";
public static final String PORT = "port";
public static final String PRINCIPAL = "principal";
public static final String DATABASE = "database";
public static final String USER_NAME = "userName";
public static final String OTHER = "other";
@Autowired
private DataSourceMapper dataSourceMapper;
@Autowired
private DataSourceUserMapper datasourceUserMapper;
/**
* create data source
*
* @param loginUser login user
* @param name data source name
* @param desc data source description
* @param type data source type
* @param parameter datasource parameters
* @return create result code
*/
public Result<Object> createDataSource(User loginUser, String name, String desc, DbType type, String parameter) {
Result<Object> result = new Result<>();
// check name can use or not
if (checkName(name)) {
putMsg(result, Status.DATASOURCE_EXIST);
return result;
}
Result<Object> isConnection = checkConnection(type, parameter);
if (Status.SUCCESS.getCode() != isConnection.getCode()) {
return result;
}
// build datasource
DataSource dataSource = new DataSource();
Date now = new Date();
dataSource.setName(name.trim());
dataSource.setNote(desc);
dataSource.setUserId(loginUser.getId());
dataSource.setUserName(loginUser.getUserName());
dataSource.setType(type);
dataSource.setConnectionParams(parameter);
dataSource.setCreateTime(now);
dataSource.setUpdateTime(now);
dataSourceMapper.insert(dataSource);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* updateProcessInstance datasource
*
* @param loginUser login user
* @param name data source name
* @param desc data source description
* @param type data source type
* @param parameter datasource parameters
* @param id data source id
* @return update result code
*/
public Result<Object> updateDataSource(int id, User loginUser, String name, String desc, DbType type, String parameter) {
Result<Object> result = new Result<>();
// determine whether the data source exists
DataSource dataSource = dataSourceMapper.selectById(id);
if (dataSource == null) {
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
if (!hasPerm(loginUser, dataSource.getUserId())) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
//check name can use or not
if (!name.trim().equals(dataSource.getName()) && checkName(name)) {
putMsg(result, Status.DATASOURCE_EXIST);
return result;
}
//check password,if the password is not updated, set to the old password.
ObjectNode paramObject = JSONUtils.parseObject(parameter);
String password = paramObject.path(Constants.PASSWORD).asText();
if (StringUtils.isBlank(password)) {
String oldConnectionParams = dataSource.getConnectionParams();
ObjectNode oldParams = JSONUtils.parseObject(oldConnectionParams);
paramObject.put(Constants.PASSWORD, oldParams.path(Constants.PASSWORD).asText());
}
// connectionParams json
String connectionParams = paramObject.toString();
Result<Object> isConnection = checkConnection(type, parameter);
if (Status.SUCCESS.getCode() != isConnection.getCode()) {
return result;
}
Date now = new Date();
dataSource.setName(name.trim());
dataSource.setNote(desc);
dataSource.setUserName(loginUser.getUserName());
dataSource.setType(type);
dataSource.setConnectionParams(connectionParams);
dataSource.setUpdateTime(now);
dataSourceMapper.updateById(dataSource);
putMsg(result, Status.SUCCESS);
return result;
}
private boolean checkName(String name) {
List<DataSource> queryDataSource = dataSourceMapper.queryDataSourceByName(name.trim());
return queryDataSource != null && !queryDataSource.isEmpty();
}
/**
* updateProcessInstance datasource
*
* @param id datasource id
* @return data source detail
*/
public Map<String, Object> queryDataSource(int id) {
Map<String, Object> result = new HashMap<>();
DataSource dataSource = dataSourceMapper.selectById(id);
if (dataSource == null) {
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
// type
String dataSourceType = dataSource.getType().toString();
// name
String dataSourceName = dataSource.getName();
// desc
String desc = dataSource.getNote();
// parameter
String parameter = dataSource.getConnectionParams();
BaseDataSource datasourceForm = DataSourceFactory.getDatasource(dataSource.getType(), parameter);
DbConnectType connectType = null;
String hostSeperator = Constants.DOUBLE_SLASH;
if (DbType.ORACLE.equals(dataSource.getType())) {
connectType = ((OracleDataSource) datasourceForm).getConnectType();
if (DbConnectType.ORACLE_SID.equals(connectType)) {
hostSeperator = Constants.AT_SIGN;
}
}
String database = datasourceForm.getDatabase();
// jdbc connection params
String other = datasourceForm.getOther();
String address = datasourceForm.getAddress();
String[] hostsPorts = getHostsAndPort(address, hostSeperator);
// ip host
String host = hostsPorts[0];
// prot
String port = hostsPorts[1];
String separator = "";
switch (dataSource.getType()) {
case HIVE:
case SQLSERVER:
separator = ";";
break;
case MYSQL:
case POSTGRESQL:
case CLICKHOUSE:
case ORACLE:
case PRESTO:
separator = "&";
break;
default:
separator = "&";
break;
}
Map<String, String> otherMap = new LinkedHashMap<>();
if (other != null) {
String[] configs = other.split(separator);
for (String config : configs) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
}
Map<String, Object> map = new HashMap<>();
map.put(NAME, dataSourceName);
map.put(NOTE, desc);
map.put(TYPE, dataSourceType);
if (connectType != null) {
map.put(Constants.ORACLE_DB_CONNECT_TYPE, connectType);
}
map.put(HOST, host);
map.put(PORT, port);
map.put(PRINCIPAL, datasourceForm.getPrincipal());
map.put(Constants.KERBEROS_KRB5_CONF_PATH, datasourceForm.getJavaSecurityKrb5Conf());
map.put(Constants.KERBEROS_KEY_TAB_USERNAME, datasourceForm.getLoginUserKeytabUsername());
map.put(Constants.KERBEROS_KEY_TAB_PATH, datasourceForm.getLoginUserKeytabPath());
map.put(DATABASE, database);
map.put(USER_NAME, datasourceForm.getUser());
map.put(OTHER, otherMap);
result.put(Constants.DATA_LIST, map);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* query datasource list by keyword
*
* @param loginUser login user
* @param searchVal search value
* @param pageNo page number
* @param pageSize page size
* @return data source list page
*/
public Map<String, Object> queryDataSourceListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
Map<String, Object> result = new HashMap<>();
IPage<DataSource> dataSourceList;
Page<DataSource> dataSourcePage = new Page<>(pageNo, pageSize);
if (isAdmin(loginUser)) {
dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, 0, searchVal);
} else {
dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, loginUser.getId(), searchVal);
}
List<DataSource> dataSources = dataSourceList != null ? dataSourceList.getRecords() : new ArrayList<>();
handlePasswd(dataSources);
PageInfo<DataSource> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotalCount((int) (dataSourceList != null ? dataSourceList.getTotal() : 0L));
pageInfo.setLists(dataSources);
result.put(Constants.DATA_LIST, pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* handle datasource connection password for safety
*
* @param dataSourceList
*/
private void handlePasswd(List<DataSource> dataSourceList) {
for (DataSource dataSource : dataSourceList) {
String connectionParams = dataSource.getConnectionParams();
ObjectNode object = JSONUtils.parseObject(connectionParams);
object.put(Constants.PASSWORD, getHiddenPassword());
dataSource.setConnectionParams(object.toString());
}
}
/**
* get hidden password (resolve the security hotspot)
*
* @return hidden password
*/
private String getHiddenPassword() {
return Constants.XXXXXX;
}
/**
* query data resource list
*
* @param loginUser login user
* @param type data source type
* @return data source list page
*/
public Map<String, Object> queryDataSourceList(User loginUser, Integer type) {
Map<String, Object> result = new HashMap<>();
List<DataSource> datasourceList;
if (isAdmin(loginUser)) {
datasourceList = dataSourceMapper.listAllDataSourceByType(type);
} else {
datasourceList = dataSourceMapper.queryDataSourceByType(loginUser.getId(), type);
}
result.put(Constants.DATA_LIST, datasourceList);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* verify datasource exists
*
* @param name datasource name
* @return true if data datasource not exists, otherwise return false
*/
public Result<Object> verifyDataSourceName(String name) {
Result<Object> result = new Result<>();
List<DataSource> dataSourceList = dataSourceMapper.queryDataSourceByName(name);
if (dataSourceList != null && !dataSourceList.isEmpty()) {
putMsg(result, Status.DATASOURCE_EXIST);
} else {
putMsg(result, Status.SUCCESS);
}
return result;
}
/**
* check connection
*
* @param type data source type
* @param parameter data source parameters
* @return true if connect successfully, otherwise false
*/
public Result<Object> checkConnection(DbType type, String parameter) {
Result<Object> result = new Result<>();
BaseDataSource datasource = DataSourceFactory.getDatasource(type, parameter);
if (datasource == null) {
putMsg(result, Status.DATASOURCE_TYPE_NOT_EXIST, type);
return result;
}
try (Connection connection = datasource.getConnection()) {
if (connection == null) {
putMsg(result, Status.CONNECTION_TEST_FAILURE);
return result;
}
putMsg(result, Status.SUCCESS);
return result;
} catch (Exception e) {
logger.error("datasource test connection error, dbType:{}, jdbcUrl:{}, message:{}.", type, datasource.getJdbcUrl(), e.getMessage());
return new Result<>(Status.CONNECTION_TEST_FAILURE.getCode(),e.getMessage());
}
}
/**
* test connection
*
* @param id datasource id
* @return connect result code
*/
public Result<Object> connectionTest(int id) {
DataSource dataSource = dataSourceMapper.selectById(id);
if (dataSource == null) {
Result<Object> result = new Result<>();
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
return checkConnection(dataSource.getType(), dataSource.getConnectionParams());
}
/**
* build paramters
*
* @param type data source type
* @param host data source host
* @param port data source port
* @param database data source database name
* @param userName user name
* @param password password
* @param other other parameters
* @param principal principal
* @return datasource parameter
*/
public String buildParameter(DbType type, String host,
String port, String database, String principal, String userName,
String password, DbConnectType connectType, String other,
String javaSecurityKrb5Conf, String loginUserKeytabUsername, String loginUserKeytabPath) {
String address = buildAddress(type, host, port, connectType);
Map<String, Object> parameterMap = new LinkedHashMap<>();
String jdbcUrl;
if (DbType.SQLSERVER == type) {
jdbcUrl = address + ";databaseName=" + database;
} else {
jdbcUrl = address + "/" + database;
}
if (Constants.ORACLE.equals(type.name())) {
parameterMap.put(Constants.ORACLE_DB_CONNECT_TYPE, connectType);
}
if (CommonUtils.getKerberosStartupState()
&& (type == DbType.HIVE || type == DbType.SPARK)) {
jdbcUrl += ";principal=" + principal;
}
String separator = "";
if (Constants.MYSQL.equals(type.name())
|| Constants.POSTGRESQL.equals(type.name())
|| Constants.CLICKHOUSE.equals(type.name())
|| Constants.ORACLE.equals(type.name())
|| Constants.PRESTO.equals(type.name())) {
separator = "&";
} else if (Constants.HIVE.equals(type.name())
|| Constants.SPARK.equals(type.name())
|| Constants.DB2.equals(type.name())
|| Constants.SQLSERVER.equals(type.name())) {
separator = ";";
}
parameterMap.put(TYPE, connectType);
parameterMap.put(Constants.ADDRESS, address);
parameterMap.put(Constants.DATABASE, database);
parameterMap.put(Constants.JDBC_URL, jdbcUrl);
parameterMap.put(Constants.USER, userName);
parameterMap.put(Constants.PASSWORD, CommonUtils.encodePassword(password));
if (CommonUtils.getKerberosStartupState()
&& (type == DbType.HIVE || type == DbType.SPARK)) {
parameterMap.put(Constants.PRINCIPAL, principal);
parameterMap.put(Constants.KERBEROS_KRB5_CONF_PATH, javaSecurityKrb5Conf);
parameterMap.put(Constants.KERBEROS_KEY_TAB_USERNAME, loginUserKeytabUsername);
parameterMap.put(Constants.KERBEROS_KEY_TAB_PATH, loginUserKeytabPath);
}
Map<String, String> map = JSONUtils.toMap(other);
if (map != null) {
StringBuilder otherSb = new StringBuilder();
for (Map.Entry<String, String> entry: map.entrySet()) {
otherSb.append(String.format("%s=%s%s", entry.getKey(), entry.getValue(), separator));
}
if (!Constants.DB2.equals(type.name())) {
otherSb.deleteCharAt(otherSb.length() - 1);
}
parameterMap.put(Constants.OTHER, otherSb);
}
if (logger.isDebugEnabled()) {
logger.info("parameters map:{}", JSONUtils.toJsonString(parameterMap));
}
return JSONUtils.toJsonString(parameterMap);
}
private String buildAddress(DbType type, String host, String port, DbConnectType connectType) {
StringBuilder sb = new StringBuilder();
if (Constants.MYSQL.equals(type.name())) {
sb.append(Constants.JDBC_MYSQL);
sb.append(host).append(":").append(port);
} else if (Constants.POSTGRESQL.equals(type.name())) {
sb.append(Constants.JDBC_POSTGRESQL);
sb.append(host).append(":").append(port);
} else if (Constants.HIVE.equals(type.name()) || Constants.SPARK.equals(type.name())) {
sb.append(Constants.JDBC_HIVE_2);
String[] hostArray = host.split(",");
if (hostArray.length > 0) {
for (String zkHost : hostArray) {
sb.append(String.format("%s:%s,", zkHost, port));
}
sb.deleteCharAt(sb.length() - 1);
}
} else if (Constants.CLICKHOUSE.equals(type.name())) {
sb.append(Constants.JDBC_CLICKHOUSE);
sb.append(host).append(":").append(port);
} else if (Constants.ORACLE.equals(type.name())) {
if (connectType == DbConnectType.ORACLE_SID) {
sb.append(Constants.JDBC_ORACLE_SID);
} else {
sb.append(Constants.JDBC_ORACLE_SERVICE_NAME);
}
sb.append(host).append(":").append(port);
} else if (Constants.SQLSERVER.equals(type.name())) {
sb.append(Constants.JDBC_SQLSERVER);
sb.append(host).append(":").append(port);
} else if (Constants.DB2.equals(type.name())) {
sb.append(Constants.JDBC_DB2);
sb.append(host).append(":").append(port);
} else if (Constants.PRESTO.equals(type.name())) {
sb.append(Constants.JDBC_PRESTO);
sb.append(host).append(":").append(port);
}
return sb.toString();
}
/**
* delete datasource
*
* @param loginUser login user
* @param datasourceId data source id
* @return delete result code
*/
@Transactional(rollbackFor = RuntimeException.class)
public Result<Object> delete(User loginUser, int datasourceId) {
Result<Object> result = new Result<>();
try {
//query datasource by id
DataSource dataSource = dataSourceMapper.selectById(datasourceId);
if (dataSource == null) {
logger.error("resource id {} not exist", datasourceId);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
if (!hasPerm(loginUser, dataSource.getUserId())) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
dataSourceMapper.deleteById(datasourceId);
datasourceUserMapper.deleteByDatasourceId(datasourceId);
putMsg(result, Status.SUCCESS);
} catch (Exception e) {
logger.error("delete datasource error", e);
throw new RuntimeException("delete datasource error");
}
return result;
}
/**
* unauthorized datasource
*
* @param loginUser login user
* @param userId user id
* @return unauthed data source result code
*/
public Map<String, Object> unauthDatasource(User loginUser, Integer userId) {
Map<String, Object> result = new HashMap<>();
//only admin operate
if (!isAdmin(loginUser)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
/**
* query all data sources except userId
*/
List<DataSource> resultList = new ArrayList<>();
List<DataSource> datasourceList = dataSourceMapper.queryDatasourceExceptUserId(userId);
Set<DataSource> datasourceSet = null;
if (datasourceList != null && !datasourceList.isEmpty()) {
datasourceSet = new HashSet<>(datasourceList);
List<DataSource> authedDataSourceList = dataSourceMapper.queryAuthedDatasource(userId);
Set<DataSource> authedDataSourceSet = null;
if (authedDataSourceList != null && !authedDataSourceList.isEmpty()) {
authedDataSourceSet = new HashSet<>(authedDataSourceList);
datasourceSet.removeAll(authedDataSourceSet);
}
resultList = new ArrayList<>(datasourceSet);
}
result.put(Constants.DATA_LIST, resultList);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* authorized datasource
*
* @param loginUser login user
* @param userId user id
* @return authorized result code
*/
public Map<String, Object> authedDatasource(User loginUser, Integer userId) {
Map<String, Object> result = new HashMap<>();
if (!isAdmin(loginUser)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
List<DataSource> authedDatasourceList = dataSourceMapper.queryAuthedDatasource(userId);
result.put(Constants.DATA_LIST, authedDatasourceList);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* get host and port by address
*
* @param address address
* @param separator separator
* @return sting array: [host,port]
*/
private String[] getHostsAndPort(String address, String separator) {
String[] result = new String[2];
String[] tmpArray = address.split(separator);
String hostsAndPorts = tmpArray[tmpArray.length - 1];
StringBuilder hosts = new StringBuilder();
String[] hostPortArray = hostsAndPorts.split(Constants.COMMA);
String port = hostPortArray[0].split(Constants.COLON)[1];
for (String hostPort : hostPortArray) {
hosts.append(hostPort.split(Constants.COLON)[0]).append(Constants.COMMA);
}
hosts.deleteCharAt(hosts.length() - 1);
result[0] = hosts.toString();
result[1] = port;
return result;
}
}

579
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -0,0 +1,579 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* executor service impl
*/
@Service
public class ExecutorServiceImpl extends BaseService implements ExecutorService {
private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceImpl.class);
@Autowired
private ProjectMapper projectMapper;
@Autowired
private ProjectService projectService;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@Autowired
private MonitorService monitorService;
@Autowired
private ProcessInstanceMapper processInstanceMapper;
@Autowired
private ProcessService processService;
/**
* execute process instance
*
* @param loginUser login user
* @param projectName project name
* @param processDefinitionId process Definition Id
* @param cronTime cron time
* @param commandType command type
* @param failureStrategy failuer strategy
* @param startNodeList start nodelist
* @param taskDependType node dependency type
* @param warningType warning type
* @param warningGroupId notify group id
* @param processInstancePriority process instance priority
* @param workerGroup worker group name
* @param runMode run mode
* @param timeout timeout
* @param startParams the global param values which pass to new process instance
* @return execute process instance code
*/
public Map<String, Object> execProcessInstance(User loginUser, String projectName,
int processDefinitionId, String cronTime, CommandType commandType,
FailureStrategy failureStrategy, String startNodeList,
TaskDependType taskDependType, WarningType warningType, int warningGroupId,
RunMode runMode,
Priority processInstancePriority, String workerGroup, Integer timeout,
Map<String, String> startParams) {
Map<String, Object> result = new HashMap<>();
// timeout is invalid
if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) {
putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR);
return result;
}
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResultAndAuth = checkResultAndAuth(loginUser, projectName, project);
if (checkResultAndAuth != null) {
return checkResultAndAuth;
}
// check process define release state
ProcessDefinition processDefinition = processDefinitionMapper.selectById(processDefinitionId);
result = checkProcessDefinitionValid(processDefinition, processDefinitionId);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
if (!checkTenantSuitable(processDefinition)) {
logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
return result;
}
// check master exists
if (!checkMasterExists(result)) {
return result;
}
/**
* create command
*/
int create = this.createCommand(commandType, processDefinitionId,
taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(),
warningGroupId, runMode, processInstancePriority, workerGroup, startParams);
if (create > 0) {
processDefinition.setWarningGroupId(warningGroupId);
processDefinitionMapper.updateById(processDefinition);
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.START_PROCESS_INSTANCE_ERROR);
}
return result;
}
/**
* check whether master exists
*
* @param result result
* @return master exists return true , otherwise return false
*/
private boolean checkMasterExists(Map<String, Object> result) {
// check master server exists
List<Server> masterServers = monitorService.getServerListFromZK(true);
// no master
if (masterServers.isEmpty()) {
putMsg(result, Status.MASTER_NOT_EXISTS);
return false;
}
return true;
}
/**
* check whether the process definition can be executed
*
* @param processDefinition process definition
* @param processDefineId process definition id
* @return check result code
*/
public Map<String, Object> checkProcessDefinitionValid(ProcessDefinition processDefinition, int processDefineId) {
Map<String, Object> result = new HashMap<>();
if (processDefinition == null) {
// check process definition exists
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId);
} else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
// check process definition online
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefineId);
} else {
result.put(Constants.STATUS, Status.SUCCESS);
}
return result;
}
/**
* do action to process instancepause, stop, repeat, recover from pause, recover from stop
*
* @param loginUser login user
* @param projectName project name
* @param processInstanceId process instance id
* @param executeType execute type
* @return execute result code
*/
public Map<String, Object> execute(User loginUser, String projectName, Integer processInstanceId, ExecuteType executeType) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = checkResultAndAuth(loginUser, projectName, project);
if (checkResult != null) {
return checkResult;
}
// check master exists
if (!checkMasterExists(result)) {
return result;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
if (processInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) {
result = checkProcessDefinitionValid(processDefinition, processInstance.getProcessDefinitionId());
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
}
checkResult = checkExecuteType(processInstance, executeType);
Status status = (Status) checkResult.get(Constants.STATUS);
if (status != Status.SUCCESS) {
return checkResult;
}
if (!checkTenantSuitable(processDefinition)) {
logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
}
switch (executeType) {
case REPEAT_RUNNING:
result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING);
break;
case RECOVER_SUSPENDED_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RECOVER_SUSPENDED_PROCESS);
break;
case START_FAILURE_TASK_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.START_FAILURE_TASK_PROCESS);
break;
case STOP:
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
} else {
result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP);
}
break;
case PAUSE:
if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
} else {
result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE);
}
break;
default:
logger.error("unknown execute type : {}", executeType);
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");
break;
}
return result;
}
/**
* check tenant suitable
*
* @param processDefinition process definition
* @return true if tenant suitable, otherwise return false
*/
private boolean checkTenantSuitable(ProcessDefinition processDefinition) {
Tenant tenant = processService.getTenantForProcess(processDefinition.getTenantId(),
processDefinition.getUserId());
return tenant != null;
}
/**
* Check the state of process instance and the type of operation match
*
* @param processInstance process instance
* @param executeType execute type
* @return check result code
*/
private Map<String, Object> checkExecuteType(ProcessInstance processInstance, ExecuteType executeType) {
Map<String, Object> result = new HashMap<>();
ExecutionStatus executionStatus = processInstance.getState();
boolean checkResult = false;
switch (executeType) {
case PAUSE:
case STOP:
if (executionStatus.typeIsRunning()) {
checkResult = true;
}
break;
case REPEAT_RUNNING:
if (executionStatus.typeIsFinished()) {
checkResult = true;
}
break;
case START_FAILURE_TASK_PROCESS:
if (executionStatus.typeIsFailure()) {
checkResult = true;
}
break;
case RECOVER_SUSPENDED_PROCESS:
if (executionStatus.typeIsPause() || executionStatus.typeIsCancel()) {
checkResult = true;
}
break;
default:
break;
}
if (!checkResult) {
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), executionStatus.toString(), executeType.toString());
} else {
putMsg(result, Status.SUCCESS);
}
return result;
}
/**
* prepare to update process instance command type and status
*
* @param processInstance process instance
* @param commandType command type
* @param executionStatus execute status
* @return update result
*/
private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) {
Map<String, Object> result = new HashMap<>();
processInstance.setCommandType(commandType);
processInstance.addHistoryCmd(commandType);
processInstance.setState(executionStatus);
int update = processService.updateProcessInstance(processInstance);
// determine whether the process is normal
if (update > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
}
return result;
}
/**
* insert command, used in the implementation of the page, re run, recovery (pause / failure) execution
*
* @param loginUser login user
* @param instanceId instance id
* @param processDefinitionId process definition id
* @param commandType command type
* @return insert result code
*/
private Map<String, Object> insertCommand(User loginUser, Integer instanceId, Integer processDefinitionId, CommandType commandType) {
Map<String, Object> result = new HashMap<>();
Command command = new Command();
command.setCommandType(commandType);
command.setProcessDefinitionId(processDefinitionId);
command.setCommandParam(String.format("{\"%s\":%d}",
CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId));
command.setExecutorId(loginUser.getId());
if (!processService.verifyIsNeedCreateCommand(command)) {
putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionId);
return result;
}
int create = processService.createCommand(command);
if (create > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
}
return result;
}
/**
* check if sub processes are offline before starting process definition
*
* @param processDefineId process definition id
* @return check result code
*/
public Map<String, Object> startCheckByProcessDefinedId(int processDefineId) {
Map<String, Object> result = new HashMap<>();
if (processDefineId == 0) {
logger.error("process definition id is null");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "process definition id");
}
List<Integer> ids = new ArrayList<>();
processService.recurseFindSubProcessId(processDefineId, ids);
Integer[] idArray = ids.toArray(new Integer[ids.size()]);
if (!ids.isEmpty()) {
List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryDefinitionListByIdList(idArray);
if (processDefinitionList != null) {
for (ProcessDefinition processDefinition : processDefinitionList) {
/**
* if there is no online process, exit directly
*/
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
logger.info("not release process definition id: {} , name : {}",
processDefinition.getId(), processDefinition.getName());
return result;
}
}
}
}
putMsg(result, Status.SUCCESS);
return result;
}
/**
* create command
*
* @param commandType commandType
* @param processDefineId processDefineId
* @param nodeDep nodeDep
* @param failureStrategy failureStrategy
* @param startNodeList startNodeList
* @param schedule schedule
* @param warningType warningType
* @param executorId executorId
* @param warningGroupId warningGroupId
* @param runMode runMode
* @param processInstancePriority processInstancePriority
* @param workerGroup workerGroup
* @return command id
*/
private int createCommand(CommandType commandType, int processDefineId,
TaskDependType nodeDep, FailureStrategy failureStrategy,
String startNodeList, String schedule, WarningType warningType,
int executorId, int warningGroupId,
RunMode runMode, Priority processInstancePriority, String workerGroup,
Map<String, String> startParams) {
/**
* instantiate command schedule instance
*/
Command command = new Command();
Map<String, String> cmdParam = new HashMap<>();
if (commandType == null) {
command.setCommandType(CommandType.START_PROCESS);
} else {
command.setCommandType(commandType);
}
command.setProcessDefinitionId(processDefineId);
if (nodeDep != null) {
command.setTaskDependType(nodeDep);
}
if (failureStrategy != null) {
command.setFailureStrategy(failureStrategy);
}
if (StringUtils.isNotEmpty(startNodeList)) {
cmdParam.put(CMD_PARAM_START_NODE_NAMES, startNodeList);
}
if (warningType != null) {
command.setWarningType(warningType);
}
if (startParams != null && startParams.size() > 0) {
cmdParam.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(startParams));
}
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
command.setExecutorId(executorId);
command.setWarningGroupId(warningGroupId);
command.setProcessInstancePriority(processInstancePriority);
command.setWorkerGroup(workerGroup);
Date start = null;
Date end = null;
if (StringUtils.isNotEmpty(schedule)) {
String[] interval = schedule.split(",");
if (interval.length == 2) {
start = DateUtils.getScheduleDate(interval[0]);
end = DateUtils.getScheduleDate(interval[1]);
}
}
// determine whether to complement
if (commandType == CommandType.COMPLEMENT_DATA) {
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
if (null != start && null != end && !start.after(end)) {
if (runMode == RunMode.RUN_MODE_SERIAL) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
return processService.createCommand(command);
} else if (runMode == RunMode.RUN_MODE_PARALLEL) {
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefineId);
List<Date> listDate = new LinkedList<>();
if (!CollectionUtils.isEmpty(schedules)) {
for (Schedule item : schedules) {
listDate.addAll(CronUtils.getSelfFireDateList(start, end, item.getCrontab()));
}
}
if (!CollectionUtils.isEmpty(listDate)) {
// loop by schedule date
for (Date date : listDate) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(date));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(date));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
processService.createCommand(command);
}
return listDate.size();
} else {
// loop by day
int runCunt = 0;
while (!start.after(end)) {
runCunt += 1;
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
processService.createCommand(command);
start = DateUtils.getSomeDay(start, 1);
}
return runCunt;
}
}
} else {
logger.error("there is not valid schedule date for the process definition: id:{}", processDefineId);
}
} else {
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
return processService.createCommand(command);
}
return 0;
}
/**
* check result and auth
*/
private Map<String, Object> checkResultAndAuth(User loginUser, String projectName, Project project) {
// check project auth
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status status = (Status) checkResult.get(Constants.STATUS);
if (status != Status.SUCCESS) {
return checkResult;
}
return null;
}
}

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.DataSourceServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbConnectType;
@ -57,7 +58,7 @@ public class DataSourceServiceTest {
@InjectMocks
private DataSourceService dataSourceService;
private DataSourceServiceImpl dataSourceService;
@Mock
private DataSourceMapper dataSourceMapper;
@Mock

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java

@ -22,6 +22,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -65,7 +66,7 @@ import org.mockito.junit.MockitoJUnitRunner;
public class ExecutorService2Test {
@InjectMocks
private ExecutorService executorService;
private ExecutorServiceImpl executorService;
@Mock
private ProcessService processService;

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java

@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.junit.Assert;
import org.junit.Ignore;
@ -39,7 +40,7 @@ public class ExecutorServiceTest {
private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceTest.class);
@Autowired
private ExecutorService executorService;
private ExecutorServiceImpl executorService;
@Ignore
@Test

4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -260,8 +260,8 @@ public class ProcessService {
* @param command command
* @return create command result
*/
public Boolean verifyIsNeedCreateCommand(Command command) {
Boolean isNeedCreate = true;
public boolean verifyIsNeedCreateCommand(Command command) {
boolean isNeedCreate = true;
EnumMap<CommandType, Integer> cmdTypeMap = new EnumMap<>(CommandType.class);
cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1);
cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);

Loading…
Cancel
Save