diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index c1e75a73f5..abca855ee1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -35,7 +35,6 @@ import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.User; -import java.text.ParseException; import java.util.Map; import org.slf4j.Logger; @@ -121,7 +120,7 @@ public class ExecutorController extends BaseController { @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority, @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup, @RequestParam(value = "timeout", required = false) Integer timeout, - @RequestParam(value = "startParams", required = false) String startParams) throws ParseException { + @RequestParam(value = "startParams", required = false) String startParams) { logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, " + "failure policy: {}, node name: {}, node dep: {}, notify type: {}, " + "notify group id: {}, run mode: {},process instance priority:{}, workerGroup: {}, timeout: {}, startParams: {} ", diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java index 52fd023c35..c631c8e3e4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java @@ -322,9 +322,7 @@ public class ResourcesController extends BaseController { @RequestParam(value = "programType",required = false) ProgramType programType ) { String programTypeName = programType == null ? "" : programType.name(); - String userName = loginUser.getUserName(); - userName = userName.replaceAll("[\n|\r|\t]", "_"); - logger.info("query resource list, login user:{}, resource type:{}, program type:{}", userName,programTypeName); + logger.info("query resource list, resource type:{}, program type:{}", type, programTypeName); Map result = resourceService.queryResourceByProgramType(loginUser, type,programType); return returnDataList(result); } @@ -641,9 +639,7 @@ public class ResourcesController extends BaseController { @ApiException(QUERY_DATASOURCE_BY_TYPE_ERROR) public Result queryUdfFuncList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam("type") UdfType type) { - String userName = loginUser.getUserName(); - userName = userName.replaceAll("[\n|\r|\t]", "_"); - logger.info("query udf func list, user:{}, type:{}", userName, type); + logger.info("query udf func list, type:{}", type); Map result = udfFuncService.queryUdfFuncList(loginUser, type.ordinal()); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java index 2ca9cbea6a..8d2a023064 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java @@ -17,67 +17,17 @@ package org.apache.dolphinscheduler.api.service; -import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.DbConnectType; import org.apache.dolphinscheduler.common.enums.DbType; -import org.apache.dolphinscheduler.common.utils.CommonUtils; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; -import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; -import org.apache.dolphinscheduler.dao.datasource.OracleDataSource; -import org.apache.dolphinscheduler.dao.entity.DataSource; -import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; -import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper; -import java.sql.Connection; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.fasterxml.jackson.databind.node.ObjectNode; /** * datasource service */ -@Service -public class DataSourceService extends BaseService { - - private static final Logger logger = LoggerFactory.getLogger(DataSourceService.class); - - public static final String NAME = "name"; - public static final String NOTE = "note"; - public static final String TYPE = "type"; - public static final String HOST = "host"; - public static final String PORT = "port"; - public static final String PRINCIPAL = "principal"; - public static final String DATABASE = "database"; - public static final String USER_NAME = "userName"; - public static final String OTHER = "other"; - - @Autowired - private DataSourceMapper dataSourceMapper; - - @Autowired - private DataSourceUserMapper datasourceUserMapper; +public interface DataSourceService { /** * create data source @@ -89,37 +39,7 @@ public class DataSourceService extends BaseService { * @param parameter datasource parameters * @return create result code */ - public Result createDataSource(User loginUser, String name, String desc, DbType type, String parameter) { - - Result result = new Result<>(); - // check name can use or not - if (checkName(name)) { - putMsg(result, Status.DATASOURCE_EXIST); - return result; - } - Result isConnection = checkConnection(type, parameter); - if (Status.SUCCESS.getCode() != isConnection.getCode()) { - return result; - } - - // build datasource - DataSource dataSource = new DataSource(); - Date now = new Date(); - - dataSource.setName(name.trim()); - dataSource.setNote(desc); - dataSource.setUserId(loginUser.getId()); - dataSource.setUserName(loginUser.getUserName()); - dataSource.setType(type); - dataSource.setConnectionParams(parameter); - dataSource.setCreateTime(now); - dataSource.setUpdateTime(now); - dataSourceMapper.insert(dataSource); - - putMsg(result, Status.SUCCESS); - - return result; - } + Result createDataSource(User loginUser, String name, String desc, DbType type, String parameter); /** * updateProcessInstance datasource @@ -132,59 +52,7 @@ public class DataSourceService extends BaseService { * @param id data source id * @return update result code */ - public Result updateDataSource(int id, User loginUser, String name, String desc, DbType type, String parameter) { - - Result result = new Result<>(); - // determine whether the data source exists - DataSource dataSource = dataSourceMapper.selectById(id); - if (dataSource == null) { - putMsg(result, Status.RESOURCE_NOT_EXIST); - return result; - } - - if (!hasPerm(loginUser, dataSource.getUserId())) { - putMsg(result, Status.USER_NO_OPERATION_PERM); - return result; - } - - //check name can use or not - if (!name.trim().equals(dataSource.getName()) && checkName(name)) { - putMsg(result, Status.DATASOURCE_EXIST); - return result; - } - //check password,if the password is not updated, set to the old password. - ObjectNode paramObject = JSONUtils.parseObject(parameter); - String password = paramObject.path(Constants.PASSWORD).asText(); - if (StringUtils.isBlank(password)) { - String oldConnectionParams = dataSource.getConnectionParams(); - ObjectNode oldParams = JSONUtils.parseObject(oldConnectionParams); - paramObject.put(Constants.PASSWORD, oldParams.path(Constants.PASSWORD).asText()); - } - // connectionParams json - String connectionParams = paramObject.toString(); - - Result isConnection = checkConnection(type, parameter); - if (Status.SUCCESS.getCode() != isConnection.getCode()) { - return result; - } - - Date now = new Date(); - - dataSource.setName(name.trim()); - dataSource.setNote(desc); - dataSource.setUserName(loginUser.getUserName()); - dataSource.setType(type); - dataSource.setConnectionParams(connectionParams); - dataSource.setUpdateTime(now); - dataSourceMapper.updateById(dataSource); - putMsg(result, Status.SUCCESS); - return result; - } - - private boolean checkName(String name) { - List queryDataSource = dataSourceMapper.queryDataSourceByName(name.trim()); - return queryDataSource != null && queryDataSource.size() > 0; - } + Result updateDataSource(int id, User loginUser, String name, String desc, DbType type, String parameter); /** * updateProcessInstance datasource @@ -192,91 +60,7 @@ public class DataSourceService extends BaseService { * @param id datasource id * @return data source detail */ - public Map queryDataSource(int id) { - - Map result = new HashMap(5); - DataSource dataSource = dataSourceMapper.selectById(id); - if (dataSource == null) { - putMsg(result, Status.RESOURCE_NOT_EXIST); - return result; - } - // type - String dataSourceType = dataSource.getType().toString(); - // name - String dataSourceName = dataSource.getName(); - // desc - String desc = dataSource.getNote(); - // parameter - String parameter = dataSource.getConnectionParams(); - - BaseDataSource datasourceForm = DataSourceFactory.getDatasource(dataSource.getType(), parameter); - DbConnectType connectType = null; - String hostSeperator = Constants.DOUBLE_SLASH; - if (DbType.ORACLE.equals(dataSource.getType())) { - connectType = ((OracleDataSource) datasourceForm).getConnectType(); - if (DbConnectType.ORACLE_SID.equals(connectType)) { - hostSeperator = Constants.AT_SIGN; - } - } - String database = datasourceForm.getDatabase(); - // jdbc connection params - String other = datasourceForm.getOther(); - String address = datasourceForm.getAddress(); - - String[] hostsPorts = getHostsAndPort(address, hostSeperator); - // ip host - String host = hostsPorts[0]; - // prot - String port = hostsPorts[1]; - String separator = ""; - - switch (dataSource.getType()) { - case HIVE: - case SQLSERVER: - separator = ";"; - break; - case MYSQL: - case POSTGRESQL: - case CLICKHOUSE: - case ORACLE: - case PRESTO: - separator = "&"; - break; - default: - separator = "&"; - break; - } - - Map otherMap = new LinkedHashMap(); - if (other != null) { - String[] configs = other.split(separator); - for (String config : configs) { - otherMap.put(config.split("=")[0], config.split("=")[1]); - } - - } - - Map map = new HashMap<>(10); - map.put(NAME, dataSourceName); - map.put(NOTE, desc); - map.put(TYPE, dataSourceType); - if (connectType != null) { - map.put(Constants.ORACLE_DB_CONNECT_TYPE, connectType); - } - - map.put(HOST, host); - map.put(PORT, port); - map.put(PRINCIPAL, datasourceForm.getPrincipal()); - map.put(Constants.KERBEROS_KRB5_CONF_PATH, datasourceForm.getJavaSecurityKrb5Conf()); - map.put(Constants.KERBEROS_KEY_TAB_USERNAME, datasourceForm.getLoginUserKeytabUsername()); - map.put(Constants.KERBEROS_KEY_TAB_PATH, datasourceForm.getLoginUserKeytabPath()); - map.put(DATABASE, database); - map.put(USER_NAME, datasourceForm.getUser()); - map.put(OTHER, otherMap); - result.put(Constants.DATA_LIST, map); - putMsg(result, Status.SUCCESS); - return result; - } + Map queryDataSource(int id); /** * query datasource list by keyword @@ -287,44 +71,7 @@ public class DataSourceService extends BaseService { * @param pageSize page size * @return data source list page */ - public Map queryDataSourceListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { - Map result = new HashMap<>(); - IPage dataSourceList = null; - Page dataSourcePage = new Page(pageNo, pageSize); - - if (isAdmin(loginUser)) { - dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, 0, searchVal); - } else { - dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, loginUser.getId(), searchVal); - } - - List dataSources = dataSourceList != null ? dataSourceList.getRecords() : new ArrayList<>(); - handlePasswd(dataSources); - PageInfo pageInfo = new PageInfo(pageNo, pageSize); - pageInfo.setTotalCount((int) (dataSourceList != null ? dataSourceList.getTotal() : 0L)); - pageInfo.setLists(dataSources); - result.put(Constants.DATA_LIST, pageInfo); - putMsg(result, Status.SUCCESS); - - return result; - } - - /** - * handle datasource connection password for safety - * - * @param dataSourceList - */ - private void handlePasswd(List dataSourceList) { - - for (DataSource dataSource : dataSourceList) { - - String connectionParams = dataSource.getConnectionParams(); - ObjectNode object = JSONUtils.parseObject(connectionParams); - object.put(Constants.PASSWORD, Constants.XXXXXX); - dataSource.setConnectionParams(object.toString()); - - } - } + Map queryDataSourceListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize); /** * query data resource list @@ -333,22 +80,7 @@ public class DataSourceService extends BaseService { * @param type data source type * @return data source list page */ - public Map queryDataSourceList(User loginUser, Integer type) { - Map result = new HashMap<>(); - - List datasourceList; - - if (isAdmin(loginUser)) { - datasourceList = dataSourceMapper.listAllDataSourceByType(type); - } else { - datasourceList = dataSourceMapper.queryDataSourceByType(loginUser.getId(), type); - } - - result.put(Constants.DATA_LIST, datasourceList); - putMsg(result, Status.SUCCESS); - - return result; - } + Map queryDataSourceList(User loginUser, Integer type); /** * verify datasource exists @@ -356,18 +88,7 @@ public class DataSourceService extends BaseService { * @param name datasource name * @return true if data datasource not exists, otherwise return false */ - public Result verifyDataSourceName(String name) { - Result result = new Result<>(); - List dataSourceList = dataSourceMapper.queryDataSourceByName(name); - if (dataSourceList != null && dataSourceList.size() > 0) { - logger.error("datasource name:{} has exist, can't create again.", name); - putMsg(result, Status.DATASOURCE_EXIST); - } else { - putMsg(result, Status.SUCCESS); - } - - return result; - } + Result verifyDataSourceName(String name); /** * check connection @@ -376,25 +97,7 @@ public class DataSourceService extends BaseService { * @param parameter data source parameters * @return true if connect successfully, otherwise false */ - public Result checkConnection(DbType type, String parameter) { - Result result = new Result<>(); - BaseDataSource datasource = DataSourceFactory.getDatasource(type, parameter); - if (datasource == null) { - putMsg(result, Status.DATASOURCE_TYPE_NOT_EXIST, type); - return result; - } - try (Connection connection = datasource.getConnection()) { - if (connection == null) { - putMsg(result, Status.CONNECTION_TEST_FAILURE); - return result; - } - putMsg(result, Status.SUCCESS); - return result; - } catch (Exception e) { - logger.error("datasource test connection error, dbType:{}, jdbcUrl:{}, message:{}.", type, datasource.getJdbcUrl(), e.getMessage()); - return new Result<>(Status.CONNECTION_TEST_FAILURE.getCode(),e.getMessage()); - } - } + Result checkConnection(DbType type, String parameter); /** * test connection @@ -402,15 +105,7 @@ public class DataSourceService extends BaseService { * @param id datasource id * @return connect result code */ - public Result connectionTest(int id) { - DataSource dataSource = dataSourceMapper.selectById(id); - if (dataSource == null) { - Result result = new Result<>(); - putMsg(result, Status.RESOURCE_NOT_EXIST); - return result; - } - return checkConnection(dataSource.getType(), dataSource.getConnectionParams()); - } + Result connectionTest(int id); /** * build paramters @@ -425,116 +120,10 @@ public class DataSourceService extends BaseService { * @param principal principal * @return datasource parameter */ - public String buildParameter(DbType type, String host, - String port, String database, String principal, String userName, - String password, DbConnectType connectType, String other, - String javaSecurityKrb5Conf, String loginUserKeytabUsername, String loginUserKeytabPath) { - - String address = buildAddress(type, host, port, connectType); - Map parameterMap = new LinkedHashMap(6); - String jdbcUrl; - if (DbType.SQLSERVER == type) { - jdbcUrl = address + ";databaseName=" + database; - } else { - jdbcUrl = address + "/" + database; - } - - if (Constants.ORACLE.equals(type.name())) { - parameterMap.put(Constants.ORACLE_DB_CONNECT_TYPE, connectType); - } - - if (CommonUtils.getKerberosStartupState() - && (type == DbType.HIVE || type == DbType.SPARK)) { - jdbcUrl += ";principal=" + principal; - } - - String separator = ""; - if (Constants.MYSQL.equals(type.name()) - || Constants.POSTGRESQL.equals(type.name()) - || Constants.CLICKHOUSE.equals(type.name()) - || Constants.ORACLE.equals(type.name()) - || Constants.PRESTO.equals(type.name())) { - separator = "&"; - } else if (Constants.HIVE.equals(type.name()) - || Constants.SPARK.equals(type.name()) - || Constants.DB2.equals(type.name()) - || Constants.SQLSERVER.equals(type.name())) { - separator = ";"; - } - - parameterMap.put(TYPE, connectType); - parameterMap.put(Constants.ADDRESS, address); - parameterMap.put(Constants.DATABASE, database); - parameterMap.put(Constants.JDBC_URL, jdbcUrl); - parameterMap.put(Constants.USER, userName); - parameterMap.put(Constants.PASSWORD, CommonUtils.encodePassword(password)); - if (CommonUtils.getKerberosStartupState() - && (type == DbType.HIVE || type == DbType.SPARK)) { - parameterMap.put(Constants.PRINCIPAL, principal); - parameterMap.put(Constants.KERBEROS_KRB5_CONF_PATH, javaSecurityKrb5Conf); - parameterMap.put(Constants.KERBEROS_KEY_TAB_USERNAME, loginUserKeytabUsername); - parameterMap.put(Constants.KERBEROS_KEY_TAB_PATH, loginUserKeytabPath); - } - - Map map = JSONUtils.toMap(other); - if (map != null) { - StringBuilder otherSb = new StringBuilder(); - for (Map.Entry entry: map.entrySet()) { - otherSb.append(String.format("%s=%s%s", entry.getKey(), entry.getValue(), separator)); - } - if (!Constants.DB2.equals(type.name())) { - otherSb.deleteCharAt(otherSb.length() - 1); - } - parameterMap.put(Constants.OTHER, otherSb); - } - - if (logger.isDebugEnabled()) { - logger.info("parameters map:{}", JSONUtils.toJsonString(parameterMap)); - } - return JSONUtils.toJsonString(parameterMap); - - } - - private String buildAddress(DbType type, String host, String port, DbConnectType connectType) { - StringBuilder sb = new StringBuilder(); - if (Constants.MYSQL.equals(type.name())) { - sb.append(Constants.JDBC_MYSQL); - sb.append(host).append(":").append(port); - } else if (Constants.POSTGRESQL.equals(type.name())) { - sb.append(Constants.JDBC_POSTGRESQL); - sb.append(host).append(":").append(port); - } else if (Constants.HIVE.equals(type.name()) || Constants.SPARK.equals(type.name())) { - sb.append(Constants.JDBC_HIVE_2); - String[] hostArray = host.split(","); - if (hostArray.length > 0) { - for (String zkHost : hostArray) { - sb.append(String.format("%s:%s,", zkHost, port)); - } - sb.deleteCharAt(sb.length() - 1); - } - } else if (Constants.CLICKHOUSE.equals(type.name())) { - sb.append(Constants.JDBC_CLICKHOUSE); - sb.append(host).append(":").append(port); - } else if (Constants.ORACLE.equals(type.name())) { - if (connectType == DbConnectType.ORACLE_SID) { - sb.append(Constants.JDBC_ORACLE_SID); - } else { - sb.append(Constants.JDBC_ORACLE_SERVICE_NAME); - } - sb.append(host).append(":").append(port); - } else if (Constants.SQLSERVER.equals(type.name())) { - sb.append(Constants.JDBC_SQLSERVER); - sb.append(host).append(":").append(port); - } else if (Constants.DB2.equals(type.name())) { - sb.append(Constants.JDBC_DB2); - sb.append(host).append(":").append(port); - } else if (Constants.PRESTO.equals(type.name())) { - sb.append(Constants.JDBC_PRESTO); - sb.append(host).append(":").append(port); - } - - return sb.toString(); - } + String buildParameter(DbType type, String host, + String port, String database, String principal, String userName, + String password, DbConnectType connectType, String other, + String javaSecurityKrb5Conf, String loginUserKeytabUsername, String loginUserKeytabPath); /** * delete datasource @@ -543,30 +132,7 @@ public class DataSourceService extends BaseService { * @param datasourceId data source id * @return delete result code */ - @Transactional(rollbackFor = RuntimeException.class) - public Result delete(User loginUser, int datasourceId) { - Result result = new Result<>(); - try { - //query datasource by id - DataSource dataSource = dataSourceMapper.selectById(datasourceId); - if (dataSource == null) { - logger.error("resource id {} not exist", datasourceId); - putMsg(result, Status.RESOURCE_NOT_EXIST); - return result; - } - if (!hasPerm(loginUser, dataSource.getUserId())) { - putMsg(result, Status.USER_NO_OPERATION_PERM); - return result; - } - dataSourceMapper.deleteById(datasourceId); - datasourceUserMapper.deleteByDatasourceId(datasourceId); - putMsg(result, Status.SUCCESS); - } catch (Exception e) { - logger.error("delete datasource error", e); - throw new RuntimeException("delete datasource error"); - } - return result; - } + Result delete(User loginUser, int datasourceId); /** * unauthorized datasource @@ -575,38 +141,7 @@ public class DataSourceService extends BaseService { * @param userId user id * @return unauthed data source result code */ - public Map unauthDatasource(User loginUser, Integer userId) { - - Map result = new HashMap<>(); - //only admin operate - if (!isAdmin(loginUser)) { - putMsg(result, Status.USER_NO_OPERATION_PERM); - return result; - } - - /** - * query all data sources except userId - */ - List resultList = new ArrayList<>(); - List datasourceList = dataSourceMapper.queryDatasourceExceptUserId(userId); - Set datasourceSet = null; - if (datasourceList != null && datasourceList.size() > 0) { - datasourceSet = new HashSet<>(datasourceList); - - List authedDataSourceList = dataSourceMapper.queryAuthedDatasource(userId); - - Set authedDataSourceSet = null; - if (authedDataSourceList != null && authedDataSourceList.size() > 0) { - authedDataSourceSet = new HashSet<>(authedDataSourceList); - datasourceSet.removeAll(authedDataSourceSet); - - } - resultList = new ArrayList<>(datasourceSet); - } - result.put(Constants.DATA_LIST, resultList); - putMsg(result, Status.SUCCESS); - return result; - } + Map unauthDatasource(User loginUser, Integer userId); /** * authorized datasource @@ -615,50 +150,5 @@ public class DataSourceService extends BaseService { * @param userId user id * @return authorized result code */ - public Map authedDatasource(User loginUser, Integer userId) { - Map result = new HashMap<>(); - - if (!isAdmin(loginUser)) { - putMsg(result, Status.USER_NO_OPERATION_PERM); - return result; - } - - List authedDatasourceList = dataSourceMapper.queryAuthedDatasource(userId); - result.put(Constants.DATA_LIST, authedDatasourceList); - putMsg(result, Status.SUCCESS); - return result; - } - - /** - * get host and port by address - * - * @param address address - * @return sting array: [host,port] - */ - private String[] getHostsAndPort(String address) { - return getHostsAndPort(address, Constants.DOUBLE_SLASH); - } - - /** - * get host and port by address - * - * @param address address - * @param separator separator - * @return sting array: [host,port] - */ - private String[] getHostsAndPort(String address, String separator) { - String[] result = new String[2]; - String[] tmpArray = address.split(separator); - String hostsAndPorts = tmpArray[tmpArray.length - 1]; - StringBuilder hosts = new StringBuilder(); - String[] hostPortArray = hostsAndPorts.split(Constants.COMMA); - String port = hostPortArray[0].split(Constants.COLON)[1]; - for (String hostPort : hostPortArray) { - hosts.append(hostPort.split(Constants.COLON)[0]).append(Constants.COMMA); - } - hosts.deleteCharAt(hosts.length() - 1); - result[0] = hosts.toString(); - result[1] = port; - return result; - } + Map authedDatasource(User loginUser, Integer userId); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 77be0a048c..6bed9790b3 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -17,82 +17,22 @@ package org.apache.dolphinscheduler.api.service; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT; - import org.apache.dolphinscheduler.api.enums.ExecuteType; -import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; -import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.WarningType; -import org.apache.dolphinscheduler.common.model.Server; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.Project; -import org.apache.dolphinscheduler.dao.entity.Schedule; -import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; -import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; -import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; -import java.text.ParseException; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - /** * executor service */ -@Service -public class ExecutorService extends BaseService { - - private static final Logger logger = LoggerFactory.getLogger(ExecutorService.class); - - @Autowired - private ProjectMapper projectMapper; - - @Autowired - private ProjectService projectService; - - @Autowired - private ProcessDefinitionMapper processDefinitionMapper; - - @Autowired - private MonitorService monitorService; - - - @Autowired - private ProcessInstanceMapper processInstanceMapper; - - - @Autowired - private ProcessService processService; +public interface ExecutorService { /** * execute process instance @@ -113,80 +53,14 @@ public class ExecutorService extends BaseService { * @param timeout timeout * @param startParams the global param values which pass to new process instance * @return execute process instance code - * @throws ParseException Parse Exception - */ - public Map execProcessInstance(User loginUser, String projectName, - int processDefinitionId, String cronTime, CommandType commandType, - FailureStrategy failureStrategy, String startNodeList, - TaskDependType taskDependType, WarningType warningType, int warningGroupId, - RunMode runMode, - Priority processInstancePriority, String workerGroup, Integer timeout, - Map startParams) throws ParseException { - Map result = new HashMap<>(); - // timeout is invalid - if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) { - putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR); - return result; - } - Project project = projectMapper.queryByName(projectName); - Map checkResultAndAuth = checkResultAndAuth(loginUser, projectName, project); - if (checkResultAndAuth != null) { - return checkResultAndAuth; - } - - // check process define release state - ProcessDefinition processDefinition = processDefinitionMapper.selectById(processDefinitionId); - result = checkProcessDefinitionValid(processDefinition, processDefinitionId); - if (result.get(Constants.STATUS) != Status.SUCCESS) { - return result; - } - - if (!checkTenantSuitable(processDefinition)) { - logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ", - processDefinition.getId(), processDefinition.getName()); - putMsg(result, Status.TENANT_NOT_SUITABLE); - return result; - } - - // check master exists - if (!checkMasterExists(result)) { - return result; - } - - /** - * create command - */ - int create = this.createCommand(commandType, processDefinitionId, - taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), - warningGroupId, runMode, processInstancePriority, workerGroup, startParams); - - if (create > 0) { - processDefinition.setWarningGroupId(warningGroupId); - processDefinitionMapper.updateById(processDefinition); - putMsg(result, Status.SUCCESS); - } else { - putMsg(result, Status.START_PROCESS_INSTANCE_ERROR); - } - return result; - } - - /** - * check whether master exists - * - * @param result result - * @return master exists return true , otherwise return false */ - private boolean checkMasterExists(Map result) { - // check master server exists - List masterServers = monitorService.getServerListFromZK(true); - - // no master - if (masterServers.size() == 0) { - putMsg(result, Status.MASTER_NOT_EXISTS); - return false; - } - return true; - } + Map execProcessInstance(User loginUser, String projectName, + int processDefinitionId, String cronTime, CommandType commandType, + FailureStrategy failureStrategy, String startNodeList, + TaskDependType taskDependType, WarningType warningType, int warningGroupId, + RunMode runMode, + Priority processInstancePriority, String workerGroup, Integer timeout, + Map startParams); /** * check whether the process definition can be executed @@ -195,19 +69,7 @@ public class ExecutorService extends BaseService { * @param processDefineId process definition id * @return check result code */ - public Map checkProcessDefinitionValid(ProcessDefinition processDefinition, int processDefineId) { - Map result = new HashMap<>(); - if (processDefinition == null) { - // check process definition exists - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId); - } else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { - // check process definition online - putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefineId); - } else { - result.put(Constants.STATUS, Status.SUCCESS); - } - return result; - } + Map checkProcessDefinitionValid(ProcessDefinition processDefinition, int processDefineId); /** * do action to process instance:pause, stop, repeat, recover from pause, recover from stop @@ -218,194 +80,7 @@ public class ExecutorService extends BaseService { * @param executeType execute type * @return execute result code */ - public Map execute(User loginUser, String projectName, Integer processInstanceId, ExecuteType executeType) { - Map result = new HashMap<>(); - Project project = projectMapper.queryByName(projectName); - - Map checkResult = checkResultAndAuth(loginUser, projectName, project); - if (checkResult != null) { - return checkResult; - } - - // check master exists - if (!checkMasterExists(result)) { - return result; - } - - ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); - if (processInstance == null) { - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); - return result; - } - - ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); - if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) { - result = checkProcessDefinitionValid(processDefinition, processInstance.getProcessDefinitionId()); - if (result.get(Constants.STATUS) != Status.SUCCESS) { - return result; - } - } - - checkResult = checkExecuteType(processInstance, executeType); - Status status = (Status) checkResult.get(Constants.STATUS); - if (status != Status.SUCCESS) { - return checkResult; - } - if (!checkTenantSuitable(processDefinition)) { - logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ", - processDefinition.getId(), processDefinition.getName()); - putMsg(result, Status.TENANT_NOT_SUITABLE); - } - - switch (executeType) { - case REPEAT_RUNNING: - result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING); - break; - case RECOVER_SUSPENDED_PROCESS: - result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RECOVER_SUSPENDED_PROCESS); - break; - case START_FAILURE_TASK_PROCESS: - result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.START_FAILURE_TASK_PROCESS); - break; - case STOP: - if (processInstance.getState() == ExecutionStatus.READY_STOP) { - putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState()); - } else { - result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP); - } - break; - case PAUSE: - if (processInstance.getState() == ExecutionStatus.READY_PAUSE) { - putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState()); - } else { - result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE); - } - break; - default: - logger.error("unknown execute type : {}", executeType); - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type"); - - break; - } - return result; - } - - /** - * check tenant suitable - * - * @param processDefinition process definition - * @return true if tenant suitable, otherwise return false - */ - private boolean checkTenantSuitable(ProcessDefinition processDefinition) { - // checkTenantExists(); - Tenant tenant = processService.getTenantForProcess(processDefinition.getTenantId(), - processDefinition.getUserId()); - return tenant != null; - } - - /** - * Check the state of process instance and the type of operation match - * - * @param processInstance process instance - * @param executeType execute type - * @return check result code - */ - private Map checkExecuteType(ProcessInstance processInstance, ExecuteType executeType) { - - Map result = new HashMap<>(); - ExecutionStatus executionStatus = processInstance.getState(); - boolean checkResult = false; - switch (executeType) { - case PAUSE: - case STOP: - if (executionStatus.typeIsRunning()) { - checkResult = true; - } - break; - case REPEAT_RUNNING: - if (executionStatus.typeIsFinished()) { - checkResult = true; - } - break; - case START_FAILURE_TASK_PROCESS: - if (executionStatus.typeIsFailure()) { - checkResult = true; - } - break; - case RECOVER_SUSPENDED_PROCESS: - if (executionStatus.typeIsPause() || executionStatus.typeIsCancel()) { - checkResult = true; - } - break; - default: - break; - } - if (!checkResult) { - putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), executionStatus.toString(), executeType.toString()); - } else { - putMsg(result, Status.SUCCESS); - } - return result; - } - - /** - * prepare to update process instance command type and status - * - * @param processInstance process instance - * @param commandType command type - * @param executionStatus execute status - * @return update result - */ - private Map updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) { - Map result = new HashMap<>(); - - processInstance.setCommandType(commandType); - processInstance.addHistoryCmd(commandType); - processInstance.setState(executionStatus); - int update = processService.updateProcessInstance(processInstance); - - // determine whether the process is normal - if (update > 0) { - putMsg(result, Status.SUCCESS); - } else { - putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR); - } - return result; - } - - /** - * insert command, used in the implementation of the page, re run, recovery (pause / failure) execution - * - * @param loginUser login user - * @param instanceId instance id - * @param processDefinitionId process definition id - * @param commandType command type - * @return insert result code - */ - private Map insertCommand(User loginUser, Integer instanceId, Integer processDefinitionId, CommandType commandType) { - Map result = new HashMap<>(); - Command command = new Command(); - command.setCommandType(commandType); - command.setProcessDefinitionId(processDefinitionId); - command.setCommandParam(String.format("{\"%s\":%d}", - CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId)); - command.setExecutorId(loginUser.getId()); - - if (!processService.verifyIsNeedCreateCommand(command)) { - putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionId); - return result; - } - - int create = processService.createCommand(command); - - if (create > 0) { - putMsg(result, Status.SUCCESS); - } else { - putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR); - } - - return result; - } + Map execute(User loginUser, String projectName, Integer processInstanceId, ExecuteType executeType); /** * check if sub processes are offline before starting process definition @@ -413,167 +88,5 @@ public class ExecutorService extends BaseService { * @param processDefineId process definition id * @return check result code */ - public Map startCheckByProcessDefinedId(int processDefineId) { - Map result = new HashMap<>(); - - if (processDefineId == 0) { - logger.error("process definition id is null"); - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "process definition id"); - } - List ids = new ArrayList<>(); - processService.recurseFindSubProcessId(processDefineId, ids); - Integer[] idArray = ids.toArray(new Integer[ids.size()]); - if (!ids.isEmpty()) { - List processDefinitionList = processDefinitionMapper.queryDefinitionListByIdList(idArray); - if (processDefinitionList != null) { - for (ProcessDefinition processDefinition : processDefinitionList) { - /** - * if there is no online process, exit directly - */ - if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { - putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName()); - logger.info("not release process definition id: {} , name : {}", - processDefinition.getId(), processDefinition.getName()); - return result; - } - } - } - } - putMsg(result, Status.SUCCESS); - return result; - } - - /** - * create command - * - * @param commandType commandType - * @param processDefineId processDefineId - * @param nodeDep nodeDep - * @param failureStrategy failureStrategy - * @param startNodeList startNodeList - * @param schedule schedule - * @param warningType warningType - * @param executorId executorId - * @param warningGroupId warningGroupId - * @param runMode runMode - * @param processInstancePriority processInstancePriority - * @param workerGroup workerGroup - * @return command id - */ - private int createCommand(CommandType commandType, int processDefineId, - TaskDependType nodeDep, FailureStrategy failureStrategy, - String startNodeList, String schedule, WarningType warningType, - int executorId, int warningGroupId, - RunMode runMode, Priority processInstancePriority, String workerGroup, - Map startParams) throws ParseException { - - /** - * instantiate command schedule instance - */ - Command command = new Command(); - - Map cmdParam = new HashMap<>(); - if (commandType == null) { - command.setCommandType(CommandType.START_PROCESS); - } else { - command.setCommandType(commandType); - } - command.setProcessDefinitionId(processDefineId); - if (nodeDep != null) { - command.setTaskDependType(nodeDep); - } - if (failureStrategy != null) { - command.setFailureStrategy(failureStrategy); - } - - if (StringUtils.isNotEmpty(startNodeList)) { - cmdParam.put(CMD_PARAM_START_NODE_NAMES, startNodeList); - } - if (warningType != null) { - command.setWarningType(warningType); - } - if (startParams != null && startParams.size() > 0) { - cmdParam.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(startParams)); - } - command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - command.setExecutorId(executorId); - command.setWarningGroupId(warningGroupId); - command.setProcessInstancePriority(processInstancePriority); - command.setWorkerGroup(workerGroup); - - Date start = null; - Date end = null; - if (StringUtils.isNotEmpty(schedule)) { - String[] interval = schedule.split(","); - if (interval.length == 2) { - start = DateUtils.getScheduleDate(interval[0]); - end = DateUtils.getScheduleDate(interval[1]); - } - } - - // determine whether to complement - if (commandType == CommandType.COMPLEMENT_DATA) { - runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode; - if (null != start && null != end && !start.after(end)) { - if (runMode == RunMode.RUN_MODE_SERIAL) { - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end)); - command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - return processService.createCommand(command); - } else if (runMode == RunMode.RUN_MODE_PARALLEL) { - List schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefineId); - List listDate = new LinkedList<>(); - if (!CollectionUtils.isEmpty(schedules)) { - for (Schedule item : schedules) { - listDate.addAll(CronUtils.getSelfFireDateList(start, end, item.getCrontab())); - } - } - if (!CollectionUtils.isEmpty(listDate)) { - // loop by schedule date - for (Date date : listDate) { - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(date)); - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(date)); - command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - processService.createCommand(command); - } - return listDate.size(); - } else { - // loop by day - int runCunt = 0; - while (!start.after(end)) { - runCunt += 1; - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start)); - command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - processService.createCommand(command); - start = DateUtils.getSomeDay(start, 1); - } - return runCunt; - } - } - } else { - logger.error("there is not valid schedule date for the process definition: id:{},date:{}", - processDefineId, schedule); - } - } else { - command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - return processService.createCommand(command); - } - - return 0; - } - - /** - * check result and auth - */ - private Map checkResultAndAuth(User loginUser, String projectName, Project project) { - // check project auth - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status status = (Status) checkResult.get(Constants.STATUS); - if (status != Status.SUCCESS) { - return checkResult; - } - return null; - } - + Map startCheckByProcessDefinedId(int processDefineId); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java new file mode 100644 index 0000000000..cdb5197716 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java @@ -0,0 +1,660 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.BaseService; +import org.apache.dolphinscheduler.api.service.DataSourceService; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.DbConnectType; +import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; +import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; +import org.apache.dolphinscheduler.dao.datasource.OracleDataSource; +import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; +import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper; + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.fasterxml.jackson.databind.node.ObjectNode; + +/** + * datasource service impl + */ +@Service +public class DataSourceServiceImpl extends BaseService implements DataSourceService { + + private static final Logger logger = LoggerFactory.getLogger(DataSourceServiceImpl.class); + + public static final String NAME = "name"; + public static final String NOTE = "note"; + public static final String TYPE = "type"; + public static final String HOST = "host"; + public static final String PORT = "port"; + public static final String PRINCIPAL = "principal"; + public static final String DATABASE = "database"; + public static final String USER_NAME = "userName"; + public static final String OTHER = "other"; + + @Autowired + private DataSourceMapper dataSourceMapper; + + @Autowired + private DataSourceUserMapper datasourceUserMapper; + + /** + * create data source + * + * @param loginUser login user + * @param name data source name + * @param desc data source description + * @param type data source type + * @param parameter datasource parameters + * @return create result code + */ + public Result createDataSource(User loginUser, String name, String desc, DbType type, String parameter) { + + Result result = new Result<>(); + // check name can use or not + if (checkName(name)) { + putMsg(result, Status.DATASOURCE_EXIST); + return result; + } + Result isConnection = checkConnection(type, parameter); + if (Status.SUCCESS.getCode() != isConnection.getCode()) { + return result; + } + + // build datasource + DataSource dataSource = new DataSource(); + Date now = new Date(); + + dataSource.setName(name.trim()); + dataSource.setNote(desc); + dataSource.setUserId(loginUser.getId()); + dataSource.setUserName(loginUser.getUserName()); + dataSource.setType(type); + dataSource.setConnectionParams(parameter); + dataSource.setCreateTime(now); + dataSource.setUpdateTime(now); + dataSourceMapper.insert(dataSource); + + putMsg(result, Status.SUCCESS); + + return result; + } + + /** + * updateProcessInstance datasource + * + * @param loginUser login user + * @param name data source name + * @param desc data source description + * @param type data source type + * @param parameter datasource parameters + * @param id data source id + * @return update result code + */ + public Result updateDataSource(int id, User loginUser, String name, String desc, DbType type, String parameter) { + + Result result = new Result<>(); + // determine whether the data source exists + DataSource dataSource = dataSourceMapper.selectById(id); + if (dataSource == null) { + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + + if (!hasPerm(loginUser, dataSource.getUserId())) { + putMsg(result, Status.USER_NO_OPERATION_PERM); + return result; + } + + //check name can use or not + if (!name.trim().equals(dataSource.getName()) && checkName(name)) { + putMsg(result, Status.DATASOURCE_EXIST); + return result; + } + //check password,if the password is not updated, set to the old password. + ObjectNode paramObject = JSONUtils.parseObject(parameter); + String password = paramObject.path(Constants.PASSWORD).asText(); + if (StringUtils.isBlank(password)) { + String oldConnectionParams = dataSource.getConnectionParams(); + ObjectNode oldParams = JSONUtils.parseObject(oldConnectionParams); + paramObject.put(Constants.PASSWORD, oldParams.path(Constants.PASSWORD).asText()); + } + // connectionParams json + String connectionParams = paramObject.toString(); + + Result isConnection = checkConnection(type, parameter); + if (Status.SUCCESS.getCode() != isConnection.getCode()) { + return result; + } + + Date now = new Date(); + + dataSource.setName(name.trim()); + dataSource.setNote(desc); + dataSource.setUserName(loginUser.getUserName()); + dataSource.setType(type); + dataSource.setConnectionParams(connectionParams); + dataSource.setUpdateTime(now); + dataSourceMapper.updateById(dataSource); + putMsg(result, Status.SUCCESS); + return result; + } + + private boolean checkName(String name) { + List queryDataSource = dataSourceMapper.queryDataSourceByName(name.trim()); + return queryDataSource != null && !queryDataSource.isEmpty(); + } + + /** + * updateProcessInstance datasource + * + * @param id datasource id + * @return data source detail + */ + public Map queryDataSource(int id) { + + Map result = new HashMap<>(); + DataSource dataSource = dataSourceMapper.selectById(id); + if (dataSource == null) { + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + // type + String dataSourceType = dataSource.getType().toString(); + // name + String dataSourceName = dataSource.getName(); + // desc + String desc = dataSource.getNote(); + // parameter + String parameter = dataSource.getConnectionParams(); + + BaseDataSource datasourceForm = DataSourceFactory.getDatasource(dataSource.getType(), parameter); + DbConnectType connectType = null; + String hostSeperator = Constants.DOUBLE_SLASH; + if (DbType.ORACLE.equals(dataSource.getType())) { + connectType = ((OracleDataSource) datasourceForm).getConnectType(); + if (DbConnectType.ORACLE_SID.equals(connectType)) { + hostSeperator = Constants.AT_SIGN; + } + } + String database = datasourceForm.getDatabase(); + // jdbc connection params + String other = datasourceForm.getOther(); + String address = datasourceForm.getAddress(); + + String[] hostsPorts = getHostsAndPort(address, hostSeperator); + // ip host + String host = hostsPorts[0]; + // prot + String port = hostsPorts[1]; + String separator = ""; + + switch (dataSource.getType()) { + case HIVE: + case SQLSERVER: + separator = ";"; + break; + case MYSQL: + case POSTGRESQL: + case CLICKHOUSE: + case ORACLE: + case PRESTO: + separator = "&"; + break; + default: + separator = "&"; + break; + } + + Map otherMap = new LinkedHashMap<>(); + if (other != null) { + String[] configs = other.split(separator); + for (String config : configs) { + otherMap.put(config.split("=")[0], config.split("=")[1]); + } + + } + + Map map = new HashMap<>(); + map.put(NAME, dataSourceName); + map.put(NOTE, desc); + map.put(TYPE, dataSourceType); + if (connectType != null) { + map.put(Constants.ORACLE_DB_CONNECT_TYPE, connectType); + } + + map.put(HOST, host); + map.put(PORT, port); + map.put(PRINCIPAL, datasourceForm.getPrincipal()); + map.put(Constants.KERBEROS_KRB5_CONF_PATH, datasourceForm.getJavaSecurityKrb5Conf()); + map.put(Constants.KERBEROS_KEY_TAB_USERNAME, datasourceForm.getLoginUserKeytabUsername()); + map.put(Constants.KERBEROS_KEY_TAB_PATH, datasourceForm.getLoginUserKeytabPath()); + map.put(DATABASE, database); + map.put(USER_NAME, datasourceForm.getUser()); + map.put(OTHER, otherMap); + result.put(Constants.DATA_LIST, map); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * query datasource list by keyword + * + * @param loginUser login user + * @param searchVal search value + * @param pageNo page number + * @param pageSize page size + * @return data source list page + */ + public Map queryDataSourceListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { + Map result = new HashMap<>(); + IPage dataSourceList; + Page dataSourcePage = new Page<>(pageNo, pageSize); + + if (isAdmin(loginUser)) { + dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, 0, searchVal); + } else { + dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, loginUser.getId(), searchVal); + } + + List dataSources = dataSourceList != null ? dataSourceList.getRecords() : new ArrayList<>(); + handlePasswd(dataSources); + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); + pageInfo.setTotalCount((int) (dataSourceList != null ? dataSourceList.getTotal() : 0L)); + pageInfo.setLists(dataSources); + result.put(Constants.DATA_LIST, pageInfo); + putMsg(result, Status.SUCCESS); + + return result; + } + + /** + * handle datasource connection password for safety + * + * @param dataSourceList + */ + private void handlePasswd(List dataSourceList) { + for (DataSource dataSource : dataSourceList) { + String connectionParams = dataSource.getConnectionParams(); + ObjectNode object = JSONUtils.parseObject(connectionParams); + object.put(Constants.PASSWORD, getHiddenPassword()); + dataSource.setConnectionParams(object.toString()); + } + } + + /** + * get hidden password (resolve the security hotspot) + * + * @return hidden password + */ + private String getHiddenPassword() { + return Constants.XXXXXX; + } + + /** + * query data resource list + * + * @param loginUser login user + * @param type data source type + * @return data source list page + */ + public Map queryDataSourceList(User loginUser, Integer type) { + Map result = new HashMap<>(); + + List datasourceList; + + if (isAdmin(loginUser)) { + datasourceList = dataSourceMapper.listAllDataSourceByType(type); + } else { + datasourceList = dataSourceMapper.queryDataSourceByType(loginUser.getId(), type); + } + + result.put(Constants.DATA_LIST, datasourceList); + putMsg(result, Status.SUCCESS); + + return result; + } + + /** + * verify datasource exists + * + * @param name datasource name + * @return true if data datasource not exists, otherwise return false + */ + public Result verifyDataSourceName(String name) { + Result result = new Result<>(); + List dataSourceList = dataSourceMapper.queryDataSourceByName(name); + if (dataSourceList != null && !dataSourceList.isEmpty()) { + putMsg(result, Status.DATASOURCE_EXIST); + } else { + putMsg(result, Status.SUCCESS); + } + + return result; + } + + /** + * check connection + * + * @param type data source type + * @param parameter data source parameters + * @return true if connect successfully, otherwise false + */ + public Result checkConnection(DbType type, String parameter) { + Result result = new Result<>(); + BaseDataSource datasource = DataSourceFactory.getDatasource(type, parameter); + if (datasource == null) { + putMsg(result, Status.DATASOURCE_TYPE_NOT_EXIST, type); + return result; + } + try (Connection connection = datasource.getConnection()) { + if (connection == null) { + putMsg(result, Status.CONNECTION_TEST_FAILURE); + return result; + } + putMsg(result, Status.SUCCESS); + return result; + } catch (Exception e) { + logger.error("datasource test connection error, dbType:{}, jdbcUrl:{}, message:{}.", type, datasource.getJdbcUrl(), e.getMessage()); + return new Result<>(Status.CONNECTION_TEST_FAILURE.getCode(),e.getMessage()); + } + } + + /** + * test connection + * + * @param id datasource id + * @return connect result code + */ + public Result connectionTest(int id) { + DataSource dataSource = dataSourceMapper.selectById(id); + if (dataSource == null) { + Result result = new Result<>(); + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + return checkConnection(dataSource.getType(), dataSource.getConnectionParams()); + } + + /** + * build paramters + * + * @param type data source type + * @param host data source host + * @param port data source port + * @param database data source database name + * @param userName user name + * @param password password + * @param other other parameters + * @param principal principal + * @return datasource parameter + */ + public String buildParameter(DbType type, String host, + String port, String database, String principal, String userName, + String password, DbConnectType connectType, String other, + String javaSecurityKrb5Conf, String loginUserKeytabUsername, String loginUserKeytabPath) { + + String address = buildAddress(type, host, port, connectType); + Map parameterMap = new LinkedHashMap<>(); + String jdbcUrl; + if (DbType.SQLSERVER == type) { + jdbcUrl = address + ";databaseName=" + database; + } else { + jdbcUrl = address + "/" + database; + } + + if (Constants.ORACLE.equals(type.name())) { + parameterMap.put(Constants.ORACLE_DB_CONNECT_TYPE, connectType); + } + + if (CommonUtils.getKerberosStartupState() + && (type == DbType.HIVE || type == DbType.SPARK)) { + jdbcUrl += ";principal=" + principal; + } + + String separator = ""; + if (Constants.MYSQL.equals(type.name()) + || Constants.POSTGRESQL.equals(type.name()) + || Constants.CLICKHOUSE.equals(type.name()) + || Constants.ORACLE.equals(type.name()) + || Constants.PRESTO.equals(type.name())) { + separator = "&"; + } else if (Constants.HIVE.equals(type.name()) + || Constants.SPARK.equals(type.name()) + || Constants.DB2.equals(type.name()) + || Constants.SQLSERVER.equals(type.name())) { + separator = ";"; + } + + parameterMap.put(TYPE, connectType); + parameterMap.put(Constants.ADDRESS, address); + parameterMap.put(Constants.DATABASE, database); + parameterMap.put(Constants.JDBC_URL, jdbcUrl); + parameterMap.put(Constants.USER, userName); + parameterMap.put(Constants.PASSWORD, CommonUtils.encodePassword(password)); + if (CommonUtils.getKerberosStartupState() + && (type == DbType.HIVE || type == DbType.SPARK)) { + parameterMap.put(Constants.PRINCIPAL, principal); + parameterMap.put(Constants.KERBEROS_KRB5_CONF_PATH, javaSecurityKrb5Conf); + parameterMap.put(Constants.KERBEROS_KEY_TAB_USERNAME, loginUserKeytabUsername); + parameterMap.put(Constants.KERBEROS_KEY_TAB_PATH, loginUserKeytabPath); + } + + Map map = JSONUtils.toMap(other); + if (map != null) { + StringBuilder otherSb = new StringBuilder(); + for (Map.Entry entry: map.entrySet()) { + otherSb.append(String.format("%s=%s%s", entry.getKey(), entry.getValue(), separator)); + } + if (!Constants.DB2.equals(type.name())) { + otherSb.deleteCharAt(otherSb.length() - 1); + } + parameterMap.put(Constants.OTHER, otherSb); + } + + if (logger.isDebugEnabled()) { + logger.info("parameters map:{}", JSONUtils.toJsonString(parameterMap)); + } + return JSONUtils.toJsonString(parameterMap); + + } + + private String buildAddress(DbType type, String host, String port, DbConnectType connectType) { + StringBuilder sb = new StringBuilder(); + if (Constants.MYSQL.equals(type.name())) { + sb.append(Constants.JDBC_MYSQL); + sb.append(host).append(":").append(port); + } else if (Constants.POSTGRESQL.equals(type.name())) { + sb.append(Constants.JDBC_POSTGRESQL); + sb.append(host).append(":").append(port); + } else if (Constants.HIVE.equals(type.name()) || Constants.SPARK.equals(type.name())) { + sb.append(Constants.JDBC_HIVE_2); + String[] hostArray = host.split(","); + if (hostArray.length > 0) { + for (String zkHost : hostArray) { + sb.append(String.format("%s:%s,", zkHost, port)); + } + sb.deleteCharAt(sb.length() - 1); + } + } else if (Constants.CLICKHOUSE.equals(type.name())) { + sb.append(Constants.JDBC_CLICKHOUSE); + sb.append(host).append(":").append(port); + } else if (Constants.ORACLE.equals(type.name())) { + if (connectType == DbConnectType.ORACLE_SID) { + sb.append(Constants.JDBC_ORACLE_SID); + } else { + sb.append(Constants.JDBC_ORACLE_SERVICE_NAME); + } + sb.append(host).append(":").append(port); + } else if (Constants.SQLSERVER.equals(type.name())) { + sb.append(Constants.JDBC_SQLSERVER); + sb.append(host).append(":").append(port); + } else if (Constants.DB2.equals(type.name())) { + sb.append(Constants.JDBC_DB2); + sb.append(host).append(":").append(port); + } else if (Constants.PRESTO.equals(type.name())) { + sb.append(Constants.JDBC_PRESTO); + sb.append(host).append(":").append(port); + } + + return sb.toString(); + } + + /** + * delete datasource + * + * @param loginUser login user + * @param datasourceId data source id + * @return delete result code + */ + @Transactional(rollbackFor = RuntimeException.class) + public Result delete(User loginUser, int datasourceId) { + Result result = new Result<>(); + try { + //query datasource by id + DataSource dataSource = dataSourceMapper.selectById(datasourceId); + if (dataSource == null) { + logger.error("resource id {} not exist", datasourceId); + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + if (!hasPerm(loginUser, dataSource.getUserId())) { + putMsg(result, Status.USER_NO_OPERATION_PERM); + return result; + } + dataSourceMapper.deleteById(datasourceId); + datasourceUserMapper.deleteByDatasourceId(datasourceId); + putMsg(result, Status.SUCCESS); + } catch (Exception e) { + logger.error("delete datasource error", e); + throw new RuntimeException("delete datasource error"); + } + return result; + } + + /** + * unauthorized datasource + * + * @param loginUser login user + * @param userId user id + * @return unauthed data source result code + */ + public Map unauthDatasource(User loginUser, Integer userId) { + + Map result = new HashMap<>(); + //only admin operate + if (!isAdmin(loginUser)) { + putMsg(result, Status.USER_NO_OPERATION_PERM); + return result; + } + + /** + * query all data sources except userId + */ + List resultList = new ArrayList<>(); + List datasourceList = dataSourceMapper.queryDatasourceExceptUserId(userId); + Set datasourceSet = null; + if (datasourceList != null && !datasourceList.isEmpty()) { + datasourceSet = new HashSet<>(datasourceList); + + List authedDataSourceList = dataSourceMapper.queryAuthedDatasource(userId); + + Set authedDataSourceSet = null; + if (authedDataSourceList != null && !authedDataSourceList.isEmpty()) { + authedDataSourceSet = new HashSet<>(authedDataSourceList); + datasourceSet.removeAll(authedDataSourceSet); + + } + resultList = new ArrayList<>(datasourceSet); + } + result.put(Constants.DATA_LIST, resultList); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * authorized datasource + * + * @param loginUser login user + * @param userId user id + * @return authorized result code + */ + public Map authedDatasource(User loginUser, Integer userId) { + Map result = new HashMap<>(); + + if (!isAdmin(loginUser)) { + putMsg(result, Status.USER_NO_OPERATION_PERM); + return result; + } + + List authedDatasourceList = dataSourceMapper.queryAuthedDatasource(userId); + result.put(Constants.DATA_LIST, authedDatasourceList); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * get host and port by address + * + * @param address address + * @param separator separator + * @return sting array: [host,port] + */ + private String[] getHostsAndPort(String address, String separator) { + String[] result = new String[2]; + String[] tmpArray = address.split(separator); + String hostsAndPorts = tmpArray[tmpArray.length - 1]; + StringBuilder hosts = new StringBuilder(); + String[] hostPortArray = hostsAndPorts.split(Constants.COMMA); + String port = hostPortArray[0].split(Constants.COLON)[1]; + for (String hostPort : hostPortArray) { + hosts.append(hostPort.split(Constants.COLON)[0]).append(Constants.COMMA); + } + hosts.deleteCharAt(hosts.length() - 1); + result[0] = hosts.toString(); + result[1] = port; + return result; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java new file mode 100644 index 0000000000..0dff163271 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT; + +import org.apache.dolphinscheduler.api.enums.ExecuteType; +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.BaseService; +import org.apache.dolphinscheduler.api.service.ExecutorService; +import org.apache.dolphinscheduler.api.service.MonitorService; +import org.apache.dolphinscheduler.api.service.ProjectService; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.RunMode; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * executor service impl + */ +@Service +public class ExecutorServiceImpl extends BaseService implements ExecutorService { + + private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceImpl.class); + + @Autowired + private ProjectMapper projectMapper; + + @Autowired + private ProjectService projectService; + + @Autowired + private ProcessDefinitionMapper processDefinitionMapper; + + @Autowired + private MonitorService monitorService; + + + @Autowired + private ProcessInstanceMapper processInstanceMapper; + + + @Autowired + private ProcessService processService; + + /** + * execute process instance + * + * @param loginUser login user + * @param projectName project name + * @param processDefinitionId process Definition Id + * @param cronTime cron time + * @param commandType command type + * @param failureStrategy failuer strategy + * @param startNodeList start nodelist + * @param taskDependType node dependency type + * @param warningType warning type + * @param warningGroupId notify group id + * @param processInstancePriority process instance priority + * @param workerGroup worker group name + * @param runMode run mode + * @param timeout timeout + * @param startParams the global param values which pass to new process instance + * @return execute process instance code + */ + public Map execProcessInstance(User loginUser, String projectName, + int processDefinitionId, String cronTime, CommandType commandType, + FailureStrategy failureStrategy, String startNodeList, + TaskDependType taskDependType, WarningType warningType, int warningGroupId, + RunMode runMode, + Priority processInstancePriority, String workerGroup, Integer timeout, + Map startParams) { + Map result = new HashMap<>(); + // timeout is invalid + if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) { + putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR); + return result; + } + Project project = projectMapper.queryByName(projectName); + Map checkResultAndAuth = checkResultAndAuth(loginUser, projectName, project); + if (checkResultAndAuth != null) { + return checkResultAndAuth; + } + + // check process define release state + ProcessDefinition processDefinition = processDefinitionMapper.selectById(processDefinitionId); + result = checkProcessDefinitionValid(processDefinition, processDefinitionId); + if (result.get(Constants.STATUS) != Status.SUCCESS) { + return result; + } + + if (!checkTenantSuitable(processDefinition)) { + logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ", + processDefinition.getId(), processDefinition.getName()); + putMsg(result, Status.TENANT_NOT_SUITABLE); + return result; + } + + // check master exists + if (!checkMasterExists(result)) { + return result; + } + + /** + * create command + */ + int create = this.createCommand(commandType, processDefinitionId, + taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), + warningGroupId, runMode, processInstancePriority, workerGroup, startParams); + + if (create > 0) { + processDefinition.setWarningGroupId(warningGroupId); + processDefinitionMapper.updateById(processDefinition); + putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.START_PROCESS_INSTANCE_ERROR); + } + return result; + } + + /** + * check whether master exists + * + * @param result result + * @return master exists return true , otherwise return false + */ + private boolean checkMasterExists(Map result) { + // check master server exists + List masterServers = monitorService.getServerListFromZK(true); + + // no master + if (masterServers.isEmpty()) { + putMsg(result, Status.MASTER_NOT_EXISTS); + return false; + } + return true; + } + + /** + * check whether the process definition can be executed + * + * @param processDefinition process definition + * @param processDefineId process definition id + * @return check result code + */ + public Map checkProcessDefinitionValid(ProcessDefinition processDefinition, int processDefineId) { + Map result = new HashMap<>(); + if (processDefinition == null) { + // check process definition exists + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId); + } else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { + // check process definition online + putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefineId); + } else { + result.put(Constants.STATUS, Status.SUCCESS); + } + return result; + } + + /** + * do action to process instance:pause, stop, repeat, recover from pause, recover from stop + * + * @param loginUser login user + * @param projectName project name + * @param processInstanceId process instance id + * @param executeType execute type + * @return execute result code + */ + public Map execute(User loginUser, String projectName, Integer processInstanceId, ExecuteType executeType) { + Map result = new HashMap<>(); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = checkResultAndAuth(loginUser, projectName, project); + if (checkResult != null) { + return checkResult; + } + + // check master exists + if (!checkMasterExists(result)) { + return result; + } + + ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); + if (processInstance == null) { + putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); + return result; + } + + ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); + if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) { + result = checkProcessDefinitionValid(processDefinition, processInstance.getProcessDefinitionId()); + if (result.get(Constants.STATUS) != Status.SUCCESS) { + return result; + } + } + + checkResult = checkExecuteType(processInstance, executeType); + Status status = (Status) checkResult.get(Constants.STATUS); + if (status != Status.SUCCESS) { + return checkResult; + } + if (!checkTenantSuitable(processDefinition)) { + logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ", + processDefinition.getId(), processDefinition.getName()); + putMsg(result, Status.TENANT_NOT_SUITABLE); + } + + switch (executeType) { + case REPEAT_RUNNING: + result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING); + break; + case RECOVER_SUSPENDED_PROCESS: + result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RECOVER_SUSPENDED_PROCESS); + break; + case START_FAILURE_TASK_PROCESS: + result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.START_FAILURE_TASK_PROCESS); + break; + case STOP: + if (processInstance.getState() == ExecutionStatus.READY_STOP) { + putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState()); + } else { + result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP); + } + break; + case PAUSE: + if (processInstance.getState() == ExecutionStatus.READY_PAUSE) { + putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState()); + } else { + result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE); + } + break; + default: + logger.error("unknown execute type : {}", executeType); + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type"); + + break; + } + return result; + } + + /** + * check tenant suitable + * + * @param processDefinition process definition + * @return true if tenant suitable, otherwise return false + */ + private boolean checkTenantSuitable(ProcessDefinition processDefinition) { + Tenant tenant = processService.getTenantForProcess(processDefinition.getTenantId(), + processDefinition.getUserId()); + return tenant != null; + } + + /** + * Check the state of process instance and the type of operation match + * + * @param processInstance process instance + * @param executeType execute type + * @return check result code + */ + private Map checkExecuteType(ProcessInstance processInstance, ExecuteType executeType) { + + Map result = new HashMap<>(); + ExecutionStatus executionStatus = processInstance.getState(); + boolean checkResult = false; + switch (executeType) { + case PAUSE: + case STOP: + if (executionStatus.typeIsRunning()) { + checkResult = true; + } + break; + case REPEAT_RUNNING: + if (executionStatus.typeIsFinished()) { + checkResult = true; + } + break; + case START_FAILURE_TASK_PROCESS: + if (executionStatus.typeIsFailure()) { + checkResult = true; + } + break; + case RECOVER_SUSPENDED_PROCESS: + if (executionStatus.typeIsPause() || executionStatus.typeIsCancel()) { + checkResult = true; + } + break; + default: + break; + } + if (!checkResult) { + putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), executionStatus.toString(), executeType.toString()); + } else { + putMsg(result, Status.SUCCESS); + } + return result; + } + + /** + * prepare to update process instance command type and status + * + * @param processInstance process instance + * @param commandType command type + * @param executionStatus execute status + * @return update result + */ + private Map updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) { + Map result = new HashMap<>(); + + processInstance.setCommandType(commandType); + processInstance.addHistoryCmd(commandType); + processInstance.setState(executionStatus); + int update = processService.updateProcessInstance(processInstance); + + // determine whether the process is normal + if (update > 0) { + putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR); + } + return result; + } + + /** + * insert command, used in the implementation of the page, re run, recovery (pause / failure) execution + * + * @param loginUser login user + * @param instanceId instance id + * @param processDefinitionId process definition id + * @param commandType command type + * @return insert result code + */ + private Map insertCommand(User loginUser, Integer instanceId, Integer processDefinitionId, CommandType commandType) { + Map result = new HashMap<>(); + Command command = new Command(); + command.setCommandType(commandType); + command.setProcessDefinitionId(processDefinitionId); + command.setCommandParam(String.format("{\"%s\":%d}", + CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId)); + command.setExecutorId(loginUser.getId()); + + if (!processService.verifyIsNeedCreateCommand(command)) { + putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionId); + return result; + } + + int create = processService.createCommand(command); + + if (create > 0) { + putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR); + } + + return result; + } + + /** + * check if sub processes are offline before starting process definition + * + * @param processDefineId process definition id + * @return check result code + */ + public Map startCheckByProcessDefinedId(int processDefineId) { + Map result = new HashMap<>(); + + if (processDefineId == 0) { + logger.error("process definition id is null"); + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "process definition id"); + } + List ids = new ArrayList<>(); + processService.recurseFindSubProcessId(processDefineId, ids); + Integer[] idArray = ids.toArray(new Integer[ids.size()]); + if (!ids.isEmpty()) { + List processDefinitionList = processDefinitionMapper.queryDefinitionListByIdList(idArray); + if (processDefinitionList != null) { + for (ProcessDefinition processDefinition : processDefinitionList) { + /** + * if there is no online process, exit directly + */ + if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { + putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName()); + logger.info("not release process definition id: {} , name : {}", + processDefinition.getId(), processDefinition.getName()); + return result; + } + } + } + } + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * create command + * + * @param commandType commandType + * @param processDefineId processDefineId + * @param nodeDep nodeDep + * @param failureStrategy failureStrategy + * @param startNodeList startNodeList + * @param schedule schedule + * @param warningType warningType + * @param executorId executorId + * @param warningGroupId warningGroupId + * @param runMode runMode + * @param processInstancePriority processInstancePriority + * @param workerGroup workerGroup + * @return command id + */ + private int createCommand(CommandType commandType, int processDefineId, + TaskDependType nodeDep, FailureStrategy failureStrategy, + String startNodeList, String schedule, WarningType warningType, + int executorId, int warningGroupId, + RunMode runMode, Priority processInstancePriority, String workerGroup, + Map startParams) { + + /** + * instantiate command schedule instance + */ + Command command = new Command(); + + Map cmdParam = new HashMap<>(); + if (commandType == null) { + command.setCommandType(CommandType.START_PROCESS); + } else { + command.setCommandType(commandType); + } + command.setProcessDefinitionId(processDefineId); + if (nodeDep != null) { + command.setTaskDependType(nodeDep); + } + if (failureStrategy != null) { + command.setFailureStrategy(failureStrategy); + } + + if (StringUtils.isNotEmpty(startNodeList)) { + cmdParam.put(CMD_PARAM_START_NODE_NAMES, startNodeList); + } + if (warningType != null) { + command.setWarningType(warningType); + } + if (startParams != null && startParams.size() > 0) { + cmdParam.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(startParams)); + } + command.setCommandParam(JSONUtils.toJsonString(cmdParam)); + command.setExecutorId(executorId); + command.setWarningGroupId(warningGroupId); + command.setProcessInstancePriority(processInstancePriority); + command.setWorkerGroup(workerGroup); + + Date start = null; + Date end = null; + if (StringUtils.isNotEmpty(schedule)) { + String[] interval = schedule.split(","); + if (interval.length == 2) { + start = DateUtils.getScheduleDate(interval[0]); + end = DateUtils.getScheduleDate(interval[1]); + } + } + + // determine whether to complement + if (commandType == CommandType.COMPLEMENT_DATA) { + runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode; + if (null != start && null != end && !start.after(end)) { + if (runMode == RunMode.RUN_MODE_SERIAL) { + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end)); + command.setCommandParam(JSONUtils.toJsonString(cmdParam)); + return processService.createCommand(command); + } else if (runMode == RunMode.RUN_MODE_PARALLEL) { + List schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefineId); + List listDate = new LinkedList<>(); + if (!CollectionUtils.isEmpty(schedules)) { + for (Schedule item : schedules) { + listDate.addAll(CronUtils.getSelfFireDateList(start, end, item.getCrontab())); + } + } + if (!CollectionUtils.isEmpty(listDate)) { + // loop by schedule date + for (Date date : listDate) { + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(date)); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(date)); + command.setCommandParam(JSONUtils.toJsonString(cmdParam)); + processService.createCommand(command); + } + return listDate.size(); + } else { + // loop by day + int runCunt = 0; + while (!start.after(end)) { + runCunt += 1; + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start)); + command.setCommandParam(JSONUtils.toJsonString(cmdParam)); + processService.createCommand(command); + start = DateUtils.getSomeDay(start, 1); + } + return runCunt; + } + } + } else { + logger.error("there is not valid schedule date for the process definition: id:{}", processDefineId); + } + } else { + command.setCommandParam(JSONUtils.toJsonString(cmdParam)); + return processService.createCommand(command); + } + + return 0; + } + + /** + * check result and auth + */ + private Map checkResultAndAuth(User loginUser, String projectName, Project project) { + // check project auth + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status status = (Status) checkResult.get(Constants.STATUS); + if (status != Status.SUCCESS) { + return checkResult; + } + return null; + } + +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java index 13eb1b9c2e..8b342d1669 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.impl.DataSourceServiceImpl; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.DbConnectType; @@ -57,7 +58,7 @@ public class DataSourceServiceTest { @InjectMocks - private DataSourceService dataSourceService; + private DataSourceServiceImpl dataSourceService; @Mock private DataSourceMapper dataSourceMapper; @Mock diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java index 3f25fb84d4..c9d2893e8e 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -65,7 +66,7 @@ import org.mockito.junit.MockitoJUnitRunner; public class ExecutorService2Test { @InjectMocks - private ExecutorService executorService; + private ExecutorServiceImpl executorService; @Mock private ProcessService processService; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index 57cd207c4d..fa60c78300 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl; import org.apache.dolphinscheduler.common.Constants; import org.junit.Assert; import org.junit.Ignore; @@ -39,7 +40,7 @@ public class ExecutorServiceTest { private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceTest.class); @Autowired - private ExecutorService executorService; + private ExecutorServiceImpl executorService; @Ignore @Test diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index b9065eca3e..fe6c7fdbde 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -260,8 +260,8 @@ public class ProcessService { * @param command command * @return create command result */ - public Boolean verifyIsNeedCreateCommand(Command command) { - Boolean isNeedCreate = true; + public boolean verifyIsNeedCreateCommand(Command command) { + boolean isNeedCreate = true; EnumMap cmdTypeMap = new EnumMap<>(CommandType.class); cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1); cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);