diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/AccessTokenController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/AccessTokenController.java new file mode 100644 index 0000000000..27ac1772a0 --- /dev/null +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/AccessTokenController.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.api.controller; + + +import cn.escheduler.api.enums.Status; +import cn.escheduler.api.service.AccessTokenService; +import cn.escheduler.api.service.UsersService; +import cn.escheduler.api.utils.Constants; +import cn.escheduler.api.utils.Result; +import cn.escheduler.dao.model.User; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.*; + +import java.util.Map; + +import static cn.escheduler.api.enums.Status.*; + + +/** + * user controller + */ +@RestController +@RequestMapping("/access-token") +public class AccessTokenController extends BaseController{ + + + private static final Logger logger = LoggerFactory.getLogger(AccessTokenController.class); + + + @Autowired + private AccessTokenService accessTokenService; + + /** + * create token + * @param loginUser + * @return + */ + @PostMapping(value = "/create") + @ResponseStatus(HttpStatus.CREATED) + public Result createToken(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "userId") int userId, + @RequestParam(value = "expireTime") String expireTime, + @RequestParam(value = "token") String token){ + logger.info("login user {}, create token , userId : {} , token expire time : {} , token : {}", loginUser.getUserName(), + userId,expireTime,token); + + try { + Map result = accessTokenService.createToken(userId, expireTime, token); + return returnDataList(result); + }catch (Exception e){ + logger.error(CREATE_ACCESS_TOKEN_ERROR.getMsg(),e); + return error(CREATE_ACCESS_TOKEN_ERROR.getCode(), CREATE_ACCESS_TOKEN_ERROR.getMsg()); + } + } + + /** + * create token + * @param loginUser + * @return + */ + @PostMapping(value = "/generate") + @ResponseStatus(HttpStatus.CREATED) + public Result generateToken(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "userId") int userId, + @RequestParam(value = "expireTime") String expireTime){ + logger.info("login user {}, generate token , userId : {} , token expire time : {}",loginUser,userId,expireTime); + try { + Map result = accessTokenService.generateToken(userId, expireTime); + return returnDataList(result); + }catch (Exception e){ + logger.error(GENERATE_TOKEN_ERROR.getMsg(),e); + return error(GENERATE_TOKEN_ERROR.getCode(), GENERATE_TOKEN_ERROR.getMsg()); + } + } + + /** + * query access token list paging + * + * @param loginUser + * @param pageNo + * @param searchVal + * @param pageSize + * @return + */ + @GetMapping(value="/list-paging") + @ResponseStatus(HttpStatus.OK) + public Result queryAccessTokenList(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam("pageNo") Integer pageNo, + @RequestParam(value = "searchVal", required = false) String searchVal, + @RequestParam("pageSize") Integer pageSize){ + logger.info("login user {}, list access token paging, pageNo: {}, searchVal: {}, pageSize: {}", + loginUser.getUserName(),pageNo,searchVal,pageSize); + try{ + Map result = checkPageParams(pageNo, pageSize); + if(result.get(Constants.STATUS) != Status.SUCCESS){ + return returnDataListPaging(result); + } + result = accessTokenService.queryAccessTokenList(loginUser, searchVal, pageNo, pageSize); + return returnDataListPaging(result); + }catch (Exception e){ + logger.error(QUERY_ACCESSTOKEN_LIST_PAGING_ERROR.getMsg(),e); + return error(QUERY_ACCESSTOKEN_LIST_PAGING_ERROR.getCode(),QUERY_ACCESSTOKEN_LIST_PAGING_ERROR.getMsg()); + } + } + + /** + * delete access token by id + * @param loginUser + * @param id + * @return + */ + @PostMapping(value = "/delete") + @ResponseStatus(HttpStatus.OK) + public Result delAccessTokenById(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "id") int id) { + logger.info("login user {}, delete access token, id: {},", loginUser.getUserName(), id); + try { + Map result = accessTokenService.delAccessTokenById(loginUser, id); + return returnDataList(result); + }catch (Exception e){ + logger.error(DELETE_USER_BY_ID_ERROR.getMsg(),e); + return error(Status.DELETE_USER_BY_ID_ERROR.getCode(), Status.DELETE_USER_BY_ID_ERROR.getMsg()); + } + } + + + /** + * update token + * @param loginUser + * @return + */ + @PostMapping(value = "/update") + @ResponseStatus(HttpStatus.CREATED) + public Result updateToken(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "id") int id, + @RequestParam(value = "userId") int userId, + @RequestParam(value = "expireTime") String expireTime, + @RequestParam(value = "token") String token){ + logger.info("login user {}, update token , userId : {} , token expire time : {} , token : {}", loginUser.getUserName(), + userId,expireTime,token); + + try { + Map result = accessTokenService.updateToken(id,userId, expireTime, token); + return returnDataList(result); + }catch (Exception e){ + logger.error(CREATE_ACCESS_TOKEN_ERROR.getMsg(),e); + return error(CREATE_ACCESS_TOKEN_ERROR.getCode(), CREATE_ACCESS_TOKEN_ERROR.getMsg()); + } + } + +} diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java index 4b1a6a88f1..3ef011d323 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java @@ -46,7 +46,6 @@ public class SchedulerController extends BaseController{ private static final Logger logger = LoggerFactory.getLogger(SchedulerController.class); public static final String DEFAULT_WARNING_TYPE = "NONE"; public static final String DEFAULT_NOTIFY_GROUP_ID = "1"; - public static final String DEFAULT_MAX_TRY_TIMES = "0"; public static final String DEFAULT_FAILURE_POLICY = "CONTINUE"; diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/TaskRecordController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/TaskRecordController.java index 7c81341b95..72d7c8dbfd 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/TaskRecordController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/TaskRecordController.java @@ -68,7 +68,7 @@ public class TaskRecordController extends BaseController{ try{ logger.info("query task record list, task name:{}, state :{}, taskDate: {}, start:{}, end:{}", taskName, state, taskDate, startTime, endTime); - Map result = taskRecordService.queryTaskRecordListPaging(taskName, startTime, taskDate, sourceTable, destTable, endTime,state, pageNo, pageSize); + Map result = taskRecordService.queryTaskRecordListPaging(false, taskName, startTime, taskDate, sourceTable, destTable, endTime,state, pageNo, pageSize); return returnDataListPaging(result); }catch (Exception e){ logger.error(QUERY_TASK_RECORD_LIST_PAGING_ERROR.getMsg(),e); @@ -77,4 +77,36 @@ public class TaskRecordController extends BaseController{ } + /** + * query history task record list paging + * + * @param loginUser + * @return + */ + @GetMapping("/history-list-paging") + @ResponseStatus(HttpStatus.OK) + public Result queryHistoryTaskRecordListPaging(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "taskName", required = false) String taskName, + @RequestParam(value = "state", required = false) String state, + @RequestParam(value = "sourceTable", required = false) String sourceTable, + @RequestParam(value = "destTable", required = false) String destTable, + @RequestParam(value = "taskDate", required = false) String taskDate, + @RequestParam(value = "startDate", required = false) String startTime, + @RequestParam(value = "endDate", required = false) String endTime, + @RequestParam("pageNo") Integer pageNo, + @RequestParam("pageSize") Integer pageSize + ){ + + try{ + logger.info("query hisotry task record list, task name:{}, state :{}, taskDate: {}, start:{}, end:{}", + taskName, state, taskDate, startTime, endTime); + Map result = taskRecordService.queryTaskRecordListPaging(true, taskName, startTime, taskDate, sourceTable, destTable, endTime,state, pageNo, pageSize); + return returnDataListPaging(result); + }catch (Exception e){ + logger.error(QUERY_TASK_RECORD_LIST_PAGING_ERROR.getMsg(),e); + return error(QUERY_TASK_RECORD_LIST_PAGING_ERROR.getCode(), QUERY_TASK_RECORD_LIST_PAGING_ERROR.getMsg()); + } + + } + } diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/UsersController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/UsersController.java index d217314625..d72e192f51 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/UsersController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/UsersController.java @@ -64,13 +64,14 @@ public class UsersController extends BaseController{ @RequestParam(value = "userName") String userName, @RequestParam(value = "userPassword") String userPassword, @RequestParam(value = "tenantId") int tenantId, + @RequestParam(value = "queue") String queue, @RequestParam(value = "email") String email, @RequestParam(value = "phone", required = false) String phone) { - logger.info("login user {}, create user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, proxyUsers: {}", - loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone); + logger.info("login user {}, create user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, user queue: {}", + loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone,queue); try { - Map result = usersService.createUser(loginUser, userName, userPassword, email, tenantId, phone); + Map result = usersService.createUser(loginUser, userName, userPassword,email,tenantId, phone,queue); return returnDataList(result); }catch (Exception e){ logger.error(CREATE_USER_ERROR.getMsg(),e); @@ -127,13 +128,14 @@ public class UsersController extends BaseController{ @RequestParam(value = "id") int id, @RequestParam(value = "userName") String userName, @RequestParam(value = "userPassword") String userPassword, + @RequestParam(value = "queue") String queue, @RequestParam(value = "email") String email, @RequestParam(value = "tenantId") int tenantId, @RequestParam(value = "phone", required = false) String phone) { - logger.info("login user {}, updateProcessInstance user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, proxyUsers: {}", - loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone); + logger.info("login user {}, updateProcessInstance user, userName: {}, email: {}, tenantId: {}, userPassword: {}, phone: {}, user queue: {}", + loginUser.getUserName(), userName, email, tenantId, Constants.PASSWORD_DEFAULT, phone,queue); try { - Map result = usersService.updateUser(id,userName,userPassword,email,tenantId,phone); + Map result = usersService.updateUser(id,userName,userPassword,email,tenantId,phone,queue); return returnDataList(result); }catch (Exception e){ logger.error(UPDATE_USER_ERROR.getMsg(),e); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java b/escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java index 253e8edb52..9f8b7efc14 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java @@ -199,6 +199,12 @@ public enum Status { HDFS_NOT_STARTUP(60001,"hdfs not startup"), + + + CREATE_ACCESS_TOKEN_ERROR(70001,"create access token error"), + GENERATE_TOKEN_ERROR(70002,"generate token error"), + QUERY_ACCESSTOKEN_LIST_PAGING_ERROR(70003,"query access token list paging error"), + ; private int code; diff --git a/escheduler-api/src/main/java/cn/escheduler/api/interceptor/LoginHandlerInterceptor.java b/escheduler-api/src/main/java/cn/escheduler/api/interceptor/LoginHandlerInterceptor.java index 7f287bf725..f3836dc467 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/interceptor/LoginHandlerInterceptor.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/interceptor/LoginHandlerInterceptor.java @@ -22,6 +22,7 @@ import cn.escheduler.dao.mapper.UserMapper; import cn.escheduler.dao.model.Session; import cn.escheduler.dao.model.User; import org.apache.commons.httpclient.HttpStatus; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -64,37 +65,31 @@ public class LoginHandlerInterceptor implements HandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) { - Session session = sessionService.getSession(request); - - if(logger.isDebugEnabled()){ - logger.debug("session info : " + session); - } - - if (session == null) { - response.setStatus(HttpStatus.SC_UNAUTHORIZED); - logger.info("session info is null "); - return false; - } - - if(logger.isDebugEnabled()){ - logger.debug("session id: {}", session.getId()); + // get token + String token = request.getHeader("token"); + User user = null; + if (StringUtils.isEmpty(token)){ + Session session = sessionService.getSession(request); + + if (session == null) { + response.setStatus(HttpStatus.SC_UNAUTHORIZED); + logger.info("session info is null "); + return false; + } + + //get user object from session + user = userMapper.queryById(session.getUserId()); + }else { + user = userMapper.queryUserByToken(token); } - //get user object from session - User user = userMapper.queryById(session.getUserId()); - - if(logger.isDebugEnabled()){ - logger.info("user info : " + user); - } - - + // if user is null if (user == null) { response.setStatus(HttpStatus.SC_UNAUTHORIZED); + logger.info("user does not exist"); return false; } - request.setAttribute(Constants.SESSION_USER, user); - return true; } diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/AccessTokenService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/AccessTokenService.java new file mode 100644 index 0000000000..4adde8ad60 --- /dev/null +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/AccessTokenService.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.api.service; + +import cn.escheduler.api.enums.Status; +import cn.escheduler.api.utils.CheckUtils; +import cn.escheduler.api.utils.Constants; +import cn.escheduler.api.utils.PageInfo; +import cn.escheduler.api.utils.Result; +import cn.escheduler.common.enums.UserType; +import cn.escheduler.common.utils.*; +import cn.escheduler.dao.mapper.*; +import cn.escheduler.dao.model.*; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.*; + +/** + * user service + */ +@Service +public class AccessTokenService extends BaseService { + + private static final Logger logger = LoggerFactory.getLogger(AccessTokenService.class); + + @Autowired + private AccessTokenMapper accessTokenMapper; + + + /** + * query access token list + * + * @param loginUser + * @param searchVal + * @param pageNo + * @param pageSize + * @return + */ + public Map queryAccessTokenList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { + Map result = new HashMap<>(5); + + if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM, Constants.STATUS)) { + return result; + } + + Integer count = accessTokenMapper.countAccessTokenPaging(searchVal); + + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); + + List accessTokenList = accessTokenMapper.queryAccessTokenPaging(searchVal, pageInfo.getStart(), pageSize); + + pageInfo.setTotalCount(count); + pageInfo.setLists(accessTokenList); + result.put(Constants.DATA_LIST, pageInfo); + putMsg(result, Status.SUCCESS); + + return result; + } + + /** + * check + * + * @param result + * @param bool + * @param userNoOperationPerm + * @param status + * @return + */ + private boolean check(Map result, boolean bool, Status userNoOperationPerm, String status) { + //only admin can operate + if (bool) { + result.put(Constants.STATUS, userNoOperationPerm); + result.put(status, userNoOperationPerm.getMsg()); + return true; + } + return false; + } + + + /** + * create token + * + * @param userId + * @param expireTime + * @param token + * @return + */ + public Map createToken(int userId, String expireTime, String token) { + Map result = new HashMap<>(5); + + AccessToken accessToken = new AccessToken(); + accessToken.setUserId(userId); + accessToken.setExpireTime(DateUtils.stringToDate(expireTime)); + accessToken.setToken(token); + accessToken.setCreateTime(new Date()); + accessToken.setUpdateTime(new Date()); + + // insert + int insert = accessTokenMapper.insert(accessToken); + + if (insert > 0) { + putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.CREATE_ALERT_GROUP_ERROR); + } + + return result; + } + + /** + * generate token + * @param userId + * @param expireTime + * @return + */ + public Map generateToken(int userId, String expireTime) { + Map result = new HashMap<>(5); + String token = EncryptionUtils.getMd5(userId + expireTime + String.valueOf(System.currentTimeMillis())); + result.put(Constants.DATA_LIST, token); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * delete access token + * @param loginUser + * @param id + * @return + */ + public Map delAccessTokenById(User loginUser, int id) { + Map result = new HashMap<>(5); + //only admin can operate + if (!isAdmin(loginUser)) { + putMsg(result, Status.USER_NOT_EXIST, id); + return result; + } + + accessTokenMapper.delete(id); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * update token by id + * @param id + * @param userId + * @param expireTime + * @param token + * @return + */ + public Map updateToken(int id,int userId, String expireTime, String token) { + Map result = new HashMap<>(5); + AccessToken accessToken = new AccessToken(); + accessToken.setId(id); + accessToken.setUserId(userId); + accessToken.setExpireTime(DateUtils.stringToDate(expireTime)); + accessToken.setToken(token); + accessToken.setUpdateTime(new Date()); + + accessTokenMapper.update(accessToken); + + putMsg(result, Status.SUCCESS); + return result; + } +} diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java index d5371e5f0a..a5c72758fb 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java @@ -38,6 +38,7 @@ import org.springframework.transaction.annotation.Transactional; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.SQLException; import java.util.*; /** @@ -209,12 +210,13 @@ public class DataSourceService extends BaseService{ switch (dataSource.getType()) { case HIVE: + case SQLSERVER: separator = ";"; break; case MYSQL: - separator = "&"; - break; case POSTGRESQL: + case CLICKHOUSE: + case ORACLE: separator = "&"; break; default: @@ -367,6 +369,18 @@ public class DataSourceService extends BaseService{ datasource = JSONObject.parseObject(parameter, SparkDataSource.class); Class.forName(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER); break; + case CLICKHOUSE: + datasource = JSONObject.parseObject(parameter, ClickHouseDataSource.class); + Class.forName(Constants.COM_CLICKHOUSE_JDBC_DRIVER); + break; + case ORACLE: + datasource = JSONObject.parseObject(parameter, OracleDataSource.class); + Class.forName(Constants.COM_ORACLE_JDBC_DRIVER); + break; + case SQLSERVER: + datasource = JSONObject.parseObject(parameter, SQLServerDataSource.class); + Class.forName(Constants.COM_SQLSERVER_JDBC_DRIVER); + break; default: break; } @@ -392,6 +406,11 @@ public class DataSourceService extends BaseService{ Connection con = getConnection(type, parameter); if (con != null) { isConnection = true; + try { + con.close(); + } catch (SQLException e) { + logger.error("close connection fail at DataSourceService::checkConnection()", e); + } } return isConnection; } @@ -428,9 +447,14 @@ public class DataSourceService extends BaseService{ String address = buildAddress(type, host, port); String jdbcUrl = address + "/" + database; String separator = ""; - if (Constants.MYSQL.equals(type.name()) || Constants.POSTGRESQL.equals(type.name())) { + if (Constants.MYSQL.equals(type.name()) + || Constants.POSTGRESQL.equals(type.name()) + || Constants.CLICKHOUSE.equals(type.name()) + || Constants.ORACLE.equals(type.name())) { separator = "&"; - } else if (Constants.HIVE.equals(type.name()) || Constants.SPARK.equals(type.name())) { + } else if (Constants.HIVE.equals(type.name()) + || Constants.SPARK.equals(type.name()) + || Constants.SQLSERVER.equals(type.name())) { separator = ";"; } @@ -479,6 +503,15 @@ public class DataSourceService extends BaseService{ } sb.deleteCharAt(sb.length() - 1); } + } else if (Constants.CLICKHOUSE.equals(type.name())) { + sb.append(Constants.JDBC_CLICKHOUSE); + sb.append(host).append(":").append(port); + } else if (Constants.ORACLE.equals(type.name())) { + sb.append(Constants.JDBC_ORACLE); + sb.append(host).append(":").append(port); + } else if (Constants.SQLSERVER.equals(type.name())) { + sb.append(Constants.JDBC_SQLSERVER); + sb.append(host).append(":").append(port); } return sb.toString(); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/TaskRecordService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/TaskRecordService.java index b5a93a390e..52306acd6c 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/TaskRecordService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/TaskRecordService.java @@ -29,6 +29,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static cn.escheduler.common.Constants.*; + /** * task record service */ @@ -51,7 +53,7 @@ public class TaskRecordService extends BaseService{ * @param pageSize * @return */ - public Map queryTaskRecordListPaging(String taskName, String startDate, + public Map queryTaskRecordListPaging(boolean isHistory, String taskName, String startDate, String taskDate, String sourceTable, String destTable, String endDate, String state, Integer pageNo, Integer pageSize) { @@ -69,8 +71,9 @@ public class TaskRecordService extends BaseService{ map.put("offset", pageInfo.getStart().toString()); map.put("pageSize", pageInfo.getPageSize().toString()); - int count = TaskRecordDao.countTaskRecord(map); - List recordList = TaskRecordDao.queryAllTaskRecord(map); + String table = isHistory ? TASK_RECORD_TABLE_HISTORY_HIVE_LOG : TASK_RECORD_TABLE_HIVE_LOG; + int count = TaskRecordDao.countTaskRecord(map, table); + List recordList = TaskRecordDao.queryAllTaskRecord(map, table); pageInfo.setTotalCount(count); pageInfo.setLists(recordList); result.put(Constants.DATA_LIST, pageInfo); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java index 8bf815ca87..82e1a850f5 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java @@ -87,7 +87,8 @@ public class UsersService extends BaseService { String userPassword, String email, int tenantId, - String phone) throws Exception { + String phone, + String queue) throws Exception { Map result = new HashMap<>(5); result = CheckUtils.checkUserParams(userName, userPassword, email, phone); @@ -114,6 +115,7 @@ public class UsersService extends BaseService { user.setUserType(UserType.GENERAL_USER); user.setCreateTime(now); user.setUpdateTime(now); + user.setQueue(queue); // save user userMapper.insert(user); @@ -194,7 +196,13 @@ public class UsersService extends BaseService { * @param phone * @return */ - public Map updateUser(int userId, String userName, String userPassword, String email, int tenantId, String phone) throws Exception { + public Map updateUser(int userId, + String userName, + String userPassword, + String email, + int tenantId, + String phone, + String queue) throws Exception { Map result = new HashMap<>(5); result.put(Constants.STATUS, false); @@ -218,6 +226,7 @@ public class UsersService extends BaseService { if (StringUtils.isNotEmpty(email)) { user.setEmail(email); } + user.setQueue(queue); user.setPhone(phone); user.setUpdateTime(now); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/utils/Constants.java b/escheduler-api/src/main/java/cn/escheduler/api/utils/Constants.java index bdeb6d689e..79cf3e5b3f 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/utils/Constants.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/utils/Constants.java @@ -82,6 +82,9 @@ public class Constants { public static final String ORG_POSTGRESQL_DRIVER = "org.postgresql.Driver"; public static final String COM_MYSQL_JDBC_DRIVER = "com.mysql.jdbc.Driver"; public static final String ORG_APACHE_HIVE_JDBC_HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver"; + public static final String COM_CLICKHOUSE_JDBC_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver"; + public static final String COM_ORACLE_JDBC_DRIVER = "oracle.jdbc.driver.OracleDriver"; + public static final String COM_SQLSERVER_JDBC_DRIVER = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; /** * database type @@ -90,6 +93,9 @@ public class Constants { public static final String POSTGRESQL = "POSTGRESQL"; public static final String HIVE = "HIVE"; public static final String SPARK = "SPARK"; + public static final String CLICKHOUSE = "CLICKHOUSE"; + public static final String ORACLE = "ORACLE"; + public static final String SQLSERVER = "SQLSERVER"; /** * jdbc url @@ -97,6 +103,9 @@ public class Constants { public static final String JDBC_MYSQL = "jdbc:mysql://"; public static final String JDBC_POSTGRESQL = "jdbc:postgresql://"; public static final String JDBC_HIVE_2 = "jdbc:hive2://"; + public static final String JDBC_CLICKHOUSE = "jdbc:clickhouse://"; + public static final String JDBC_ORACLE = "jdbc:oracle:thin:@//"; + public static final String JDBC_SQLSERVER = "jdbc:sqlserver://"; public static final String ADDRESS = "address"; diff --git a/escheduler-api/src/test/java/cn/escheduler/api/HttpClientTest.java b/escheduler-api/src/test/java/cn/escheduler/api/HttpClientTest.java new file mode 100644 index 0000000000..81a94bc3f0 --- /dev/null +++ b/escheduler-api/src/test/java/cn/escheduler/api/HttpClientTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.api; + +import java.io.File; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import cn.escheduler.common.utils.EncryptionUtils; +import org.apache.commons.io.FileUtils; +import org.apache.http.NameValuePair; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +public class HttpClientTest { + + private static final Logger logger = LoggerFactory.getLogger(HttpClientTest.class); + + public static void main(String[] args) throws Exception { +// doGETParamPathVariableAndChinese(); +// doGETParam(); +// doPOSTParam(); + + String md5 = EncryptionUtils.getMd5(String.valueOf(System.currentTimeMillis()) + "张三"); + System.out.println(md5); + System.out.println(md5.length()); + } + + public static void doPOSTParam()throws Exception{ + // create Httpclient + CloseableHttpClient httpclient = HttpClients.createDefault(); + // 创建http POST请求 + HttpPost httpPost = new HttpPost("http://127.0.0.1:12345/escheduler/projects/create"); + httpPost.setHeader("token", "123"); + // set parameters + List parameters = new ArrayList(); + parameters.add(new BasicNameValuePair("projectName", "qzw")); + parameters.add(new BasicNameValuePair("desc", "qzw")); + + UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(parameters); + httpPost.setEntity(formEntity); + + + CloseableHttpResponse response = null; + try { + // execute + response = httpclient.execute(httpPost); + // eponse status code 200 + if (response.getStatusLine().getStatusCode() == 200) { + String content = EntityUtils.toString(response.getEntity(), "UTF-8"); + System.out.println(content); + } + } finally { + if (response != null) { + response.close(); + } + httpclient.close(); + } + } + + /** + * + * @throws Exception + */ + public static void doGETParamPathVariableAndChinese()throws Exception{ + // create HttpClient + CloseableHttpClient httpclient = HttpClients.createDefault(); + + List parameters = new ArrayList(); +// parameters.add(new BasicNameValuePair("pageSize", "10")); + + // define the parameters of the request + URI uri = new URIBuilder("http://127.0.0.1:12345/escheduler/projects/%E5%85%A8%E9%83%A8%E6%B5%81%E7%A8%8B%E6%B5%8B%E8%AF%95/process/list") + .build(); + + // create http GET request + HttpGet httpGet = new HttpGet(uri); + httpGet.setHeader("token","123"); + //response object + CloseableHttpResponse response = null; + try { + // execute http get request + response = httpclient.execute(httpGet); + // reponse status code 200 + if (response.getStatusLine().getStatusCode() == 200) { + String content = EntityUtils.toString(response.getEntity(), "UTF-8"); + logger.info("start--------------->"); + logger.info(content); + logger.info("end----------------->"); + } + } finally { + if (response != null) { + response.close(); + } + httpclient.close(); + } + } + + /** + * + * @throws Exception + */ + public static void doGETParam()throws Exception{ + // create HttpClient + CloseableHttpClient httpclient = HttpClients.createDefault(); + + List parameters = new ArrayList(); + parameters.add(new BasicNameValuePair("processInstanceId", "41415")); + + // define the parameters of the request + URI uri = new URIBuilder("http://127.0.0.1:12345/escheduler/projects/%E5%85%A8%E9%83%A8%E6%B5%81%E7%A8%8B%E6%B5%8B%E8%AF%95/instance/view-variables") + .setParameters(parameters) + .build(); + + // create http GET request + HttpGet httpGet = new HttpGet(uri); + httpGet.setHeader("token","123"); + //response object + CloseableHttpResponse response = null; + try { + // execute http get request + response = httpclient.execute(httpGet); + // reponse status code 200 + if (response.getStatusLine().getStatusCode() == 200) { + String content = EntityUtils.toString(response.getEntity(), "UTF-8"); + logger.info("start--------------->"); + logger.info(content); + logger.info("end----------------->"); + } + } finally { + if (response != null) { + response.close(); + } + httpclient.close(); + } + } + +} diff --git a/escheduler-common/pom.xml b/escheduler-common/pom.xml index e06b344c4f..c0daa0bce7 100644 --- a/escheduler-common/pom.xml +++ b/escheduler-common/pom.xml @@ -371,6 +371,32 @@ com.github.oshi oshi-core + + + ru.yandex.clickhouse + clickhouse-jdbc + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + + + + com.microsoft.sqlserver + mssql-jdbc + + + com.microsoft.azure + azure-keyvault + + + diff --git a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java index e0e0c399e9..4e9a11c841 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java @@ -463,6 +463,10 @@ public final class Constants { public static final String TASK_RECORD_PWD = "task.record.datasource.password"; + public static String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd"; + + public static String TASK_RECORD_TABLE_HISTORY_HIVE_LOG = "eamp_hive_hist_log_hd"; + public static final String STATUS = "status"; @@ -602,15 +606,29 @@ public final class Constants { public static final String JDBC_POSTGRESQL_CLASS_NAME = "org.postgresql.Driver"; /** - * postgresql + * hive */ public static final String JDBC_HIVE_CLASS_NAME = "org.apache.hive.jdbc.HiveDriver"; /** - * postgresql + * spark */ public static final String JDBC_SPARK_CLASS_NAME = "org.apache.hive.jdbc.HiveDriver"; + /** + * ClickHouse + */ + public static final String JDBC_CLICKHOUSE_CLASS_NAME = "ru.yandex.clickhouse.ClickHouseDriver"; + + /** + * Oracle + */ + public static final String JDBC_ORACLE_CLASS_NAME = "oracle.jdbc.driver.OracleDriver"; + + /** + * Oracle + */ + public static final String JDBC_SQLSERVER_CLASS_NAME = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; /** * spark params constant @@ -812,4 +830,9 @@ public final class Constants { public static final String CONTENT = "content"; public static final String DEPENDENT_SPLIT = ":||"; public static final String DEPENDENT_ALL = "ALL"; + + + /** + * + */ } diff --git a/escheduler-common/src/main/java/cn/escheduler/common/enums/DbType.java b/escheduler-common/src/main/java/cn/escheduler/common/enums/DbType.java index 70f767444f..bf0ebba60c 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/enums/DbType.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/enums/DbType.java @@ -25,6 +25,9 @@ public enum DbType { * 1 postgresql * 2 hive * 3 spark + * 4 clickhouse + * 5 oracle + * 6 sqlserver */ - MYSQL, POSTGRESQL, HIVE, SPARK + MYSQL, POSTGRESQL, HIVE, SPARK, CLICKHOUSE, ORACLE, SQLSERVER } diff --git a/escheduler-common/src/main/java/cn/escheduler/common/job/db/ClickHouseDataSource.java b/escheduler-common/src/main/java/cn/escheduler/common/job/db/ClickHouseDataSource.java new file mode 100644 index 0000000000..b4df4d8f5a --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/job/db/ClickHouseDataSource.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.job.db; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +/** + * data source of ClickHouse + */ +public class ClickHouseDataSource extends BaseDataSource { + private static final Logger logger = LoggerFactory.getLogger(ClickHouseDataSource.class); + + /** + * gets the JDBC url for the data source connection + * @return + */ + @Override + public String getJdbcUrl() { + String jdbcUrl = getAddress(); + if (jdbcUrl.lastIndexOf("/") != (jdbcUrl.length() - 1)) { + jdbcUrl += "/"; + } + + jdbcUrl += getDatabase(); + + if (StringUtils.isNotEmpty(getOther())) { + jdbcUrl += "?" + getOther(); + } + + return jdbcUrl; + } + + /** + * test whether the data source can be connected successfully + * @throws Exception + */ + @Override + public void isConnectable() throws Exception { + Connection con = null; + try { + Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); + con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword()); + } finally { + if (con != null) { + try { + con.close(); + } catch (SQLException e) { + logger.error("ClickHouse datasource try conn close conn error", e); + throw e; + } + } + } + + } +} diff --git a/escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java b/escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java index 06858fade3..316be26d89 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java @@ -39,6 +39,12 @@ public class DataSourceFactory { return JSONUtils.parseObject(parameter, HiveDataSource.class); case SPARK: return JSONUtils.parseObject(parameter, SparkDataSource.class); + case CLICKHOUSE: + return JSONUtils.parseObject(parameter, ClickHouseDataSource.class); + case ORACLE: + return JSONUtils.parseObject(parameter, OracleDataSource.class); + case SQLSERVER: + return JSONUtils.parseObject(parameter, SQLServerDataSource.class); default: return null; } diff --git a/escheduler-common/src/main/java/cn/escheduler/common/job/db/OracleDataSource.java b/escheduler-common/src/main/java/cn/escheduler/common/job/db/OracleDataSource.java new file mode 100644 index 0000000000..5e245a590c --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/job/db/OracleDataSource.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.job.db; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +/** + * data source of Oracle + */ +public class OracleDataSource extends BaseDataSource { + private static final Logger logger = LoggerFactory.getLogger(OracleDataSource.class); + + /** + * gets the JDBC url for the data source connection + * @return + */ + @Override + public String getJdbcUrl() { + String jdbcUrl = getAddress(); + if (jdbcUrl.lastIndexOf("/") != (jdbcUrl.length() - 1)) { + jdbcUrl += "/"; + } + + jdbcUrl += getDatabase(); + + if (StringUtils.isNotEmpty(getOther())) { + jdbcUrl += "?" + getOther(); + } + + return jdbcUrl; + } + + /** + * test whether the data source can be connected successfully + * @throws Exception + */ + @Override + public void isConnectable() throws Exception { + Connection con = null; + try { + Class.forName("oracle.jdbc.driver.OracleDriver"); + con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword()); + } finally { + if (con != null) { + try { + con.close(); + } catch (SQLException e) { + logger.error("Oracle datasource try conn close conn error", e); + throw e; + } + } + } + + } +} diff --git a/escheduler-common/src/main/java/cn/escheduler/common/job/db/SQLServerDataSource.java b/escheduler-common/src/main/java/cn/escheduler/common/job/db/SQLServerDataSource.java new file mode 100644 index 0000000000..f4d202a76e --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/job/db/SQLServerDataSource.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.job.db; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +/** + * data source of SQL Server + */ +public class SQLServerDataSource extends BaseDataSource { + private static final Logger logger = LoggerFactory.getLogger(SQLServerDataSource.class); + + /** + * gets the JDBC url for the data source connection + * @return + */ + @Override + public String getJdbcUrl() { + String jdbcUrl = getAddress(); + jdbcUrl += ";databaseName=" + getDatabase(); + + if (StringUtils.isNotEmpty(getOther())) { + jdbcUrl += ";" + getOther(); + } + + return jdbcUrl; + } + + /** + * test whether the data source can be connected successfully + * @throws Exception + */ + @Override + public void isConnectable() throws Exception { + Connection con = null; + try { + Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver"); + con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword()); + } finally { + if (con != null) { + try { + con.close(); + } catch (SQLException e) { + logger.error("SQL Server datasource try conn close conn error", e); + throw e; + } + } + } + + } +} diff --git a/escheduler-common/src/main/java/cn/escheduler/common/utils/DateUtils.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/DateUtils.java index a40725fe16..11dc6bfbf6 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/utils/DateUtils.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/DateUtils.java @@ -72,7 +72,7 @@ public class DateUtils { public static Date parse(String date,String format){ try { return new SimpleDateFormat(format).parse(date); - } catch (ParseException e) { + } catch (Exception e) { logger.error("error while parse date:" + date, e); } return null; diff --git a/escheduler-common/src/main/java/cn/escheduler/common/utils/FileUtils.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/FileUtils.java index a07b689871..205f894e04 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/utils/FileUtils.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/FileUtils.java @@ -368,5 +368,61 @@ public class FileUtils { org.apache.commons.io.FileUtils.forceDelete(new File(filename)); } + /** + * Gets all the parent subdirectories of the parentDir directory + * @param parentDir + * @return + */ + public static File[] getAllDir(String parentDir){ + if(parentDir == null || "".equals(parentDir)) { + throw new RuntimeException("parentDir can not be empty"); + } + + File file = new File(parentDir); + if(!file.exists() || !file.isDirectory()) { + throw new RuntimeException("parentDir not exist, or is not a directory:"+parentDir); + } + + File[] schemaDirs = file.listFiles(new FileFilter() { + + @Override + public boolean accept(File pathname) { + if (pathname.isDirectory()) { + return true; + } + else { + return false; + } + } + }); + + return schemaDirs; + } + + /** + * Get Content + * @param inputStream + * @return + * @throws IOException + */ + public static String readFile2Str(InputStream inputStream) throws IOException{ + String all_content=null; + try { + all_content = new String(); + InputStream ins = inputStream; + ByteArrayOutputStream outputstream = new ByteArrayOutputStream(); + byte[] str_b = new byte[1024]; + int i = -1; + while ((i=ins.read(str_b)) > 0) { + outputstream.write(str_b,0,i); + } + all_content = outputstream.toString(); + return all_content; + } catch (Exception e) { + logger.error(e.getMessage(),e); + throw new RuntimeException(e); + } + } + } diff --git a/escheduler-common/src/main/java/cn/escheduler/common/utils/MysqlUtil.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/MysqlUtil.java new file mode 100644 index 0000000000..d2d1ef203d --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/MysqlUtil.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; + +public class MysqlUtil { + + public static final Logger logger = LoggerFactory.getLogger(MysqlUtil.class); + + private static MysqlUtil instance; + + MysqlUtil() { + } + + public static MysqlUtil getInstance() { + if (null == instance) { + syncInit(); + } + return instance; + } + + private static synchronized void syncInit() { + if (instance == null) { + instance = new MysqlUtil(); + } + } + + public void release(ResultSet rs, Statement stmt, Connection conn) { + try { + if (rs != null) { + rs.close(); + rs = null; + } + } catch (SQLException e) { + logger.error(e.getMessage(),e); + throw new RuntimeException(e); + } finally { + try { + if (stmt != null) { + stmt.close(); + stmt = null; + } + } catch (SQLException e) { + logger.error(e.getMessage(),e); + throw new RuntimeException(e); + } finally { + try { + if (conn != null) { + conn.close(); + conn = null; + } + } catch (SQLException e) { + logger.error(e.getMessage(),e); + throw new RuntimeException(e); + } + } + } + } + + public static void realeaseResource(ResultSet rs, PreparedStatement ps, Connection conn) { + MysqlUtil.getInstance().release(rs,ps,conn); + if (null != rs) { + try { + rs.close(); + } catch (SQLException e) { + logger.error(e.getMessage(),e); + } + } + + if (null != ps) { + try { + ps.close(); + } catch (SQLException e) { + logger.error(e.getMessage(),e); + } + } + + if (null != conn) { + try { + conn.close(); + } catch (SQLException e) { + logger.error(e.getMessage(),e); + } + } + } +} diff --git a/escheduler-common/src/main/java/cn/escheduler/common/utils/SchemaUtils.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/SchemaUtils.java new file mode 100644 index 0000000000..18f10b1bcd --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/SchemaUtils.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.utils; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Metadata related common classes + * + */ +public class SchemaUtils { + + private static final Logger logger = LoggerFactory.getLogger(SchemaUtils.class); + private static Pattern p = Pattern.compile("\\s*|\t|\r|\n"); + + /** + * 获取所有upgrade目录下的可升级的schema + * Gets upgradable schemas for all upgrade directories + * @return + */ + @SuppressWarnings("unchecked") + public static List getAllSchemaList() { + List schemaDirList = new ArrayList<>(); + File[] schemaDirArr = FileUtils.getAllDir("sql/upgrade"); + if(schemaDirArr == null || schemaDirArr.length == 0) { + return null; + } + + for(File file : schemaDirArr) { + schemaDirList.add(file.getName()); + } + + Collections.sort(schemaDirList , new Comparator() { + @Override + public int compare(Object o1 , Object o2){ + try { + String dir1 = String.valueOf(o1); + String dir2 = String.valueOf(o2); + String version1 = dir1.split("_")[0]; + String version2 = dir2.split("_")[0]; + if(version1.equals(version2)) { + return 0; + } + + if(SchemaUtils.isAGreatVersion(version1, version2)) { + return 1; + } + + return -1; + + } catch (Exception e) { + logger.error(e.getMessage(),e); + throw new RuntimeException(e); + } + } + }); + + return schemaDirList; + } + + /** + * 判断schemaVersion是否比version版本高 + * Determine whether schemaVersion is higher than version + * @param schemaVersion + * @param version + * @return + */ + public static boolean isAGreatVersion(String schemaVersion, String version) { + if(StringUtils.isEmpty(schemaVersion) || StringUtils.isEmpty(version)) { + throw new RuntimeException("schemaVersion or version is empty"); + } + + String[] schemaVersionArr = schemaVersion.split("\\."); + String[] versionArr = version.split("\\."); + int arrLength = schemaVersionArr.length < versionArr.length ? schemaVersionArr.length : versionArr.length; + for(int i = 0 ; i < arrLength ; i++) { + if(Integer.valueOf(schemaVersionArr[i]) > Integer.valueOf(versionArr[i])) { + return true; + }else if(Integer.valueOf(schemaVersionArr[i]) < Integer.valueOf(versionArr[i])) { + return false; + } + } + + // 说明直到第arrLength-1个元素,两个版本号都一样,此时谁的arrLength大,谁的版本号就大 + // If the version and schema version is the same from 0 up to the arrlength-1 element,whoever has a larger arrLength has a larger version number + return schemaVersionArr.length > versionArr.length; + } + + /** + * Gets the current software version number of the system + * @return + */ + public static String getSoftVersion() { + String soft_version; + try { + soft_version = FileUtils.readFile2Str(new FileInputStream(new File("sql/soft_version"))); + soft_version = replaceBlank(soft_version); + } catch (FileNotFoundException e) { + logger.error(e.getMessage(),e); + throw new RuntimeException("Failed to get the product version description file. The file could not be found", e); + } catch (IOException e) { + logger.error(e.getMessage(),e); + throw new RuntimeException("Failed to get product version number description file, failed to read the file", e); + } + return soft_version; + } + + /** + * 去掉字符串中的空格回车换行和制表符 + * Strips the string of space carriage returns and tabs + * @param str + * @return + */ + public static String replaceBlank(String str) { + String dest = ""; + if (str!=null) { + + Matcher m = p.matcher(str); + dest = m.replaceAll(""); + } + return dest; + } +} diff --git a/escheduler-common/src/main/java/cn/escheduler/common/utils/ScriptRunner.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/ScriptRunner.java new file mode 100644 index 0000000000..a74d7e268c --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/ScriptRunner.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.LineNumberReader; +import java.io.Reader; +import java.sql.*; + +/* + * Slightly modified version of the com.ibatis.common.jdbc.ScriptRunner class + * from the iBATIS Apache project. Only removed dependency on Resource class + * and a constructor + */ +/* + * Copyright 2004 Clinton Begin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Tool to run database scripts + */ +public class ScriptRunner { + + public static final Logger logger = LoggerFactory.getLogger(ScriptRunner.class); + + private static final String DEFAULT_DELIMITER = ";"; + + private Connection connection; + + private boolean stopOnError; + private boolean autoCommit; + + private String delimiter = DEFAULT_DELIMITER; + private boolean fullLineDelimiter = false; + + /** + * Default constructor + */ + public ScriptRunner(Connection connection, boolean autoCommit, boolean stopOnError) { + this.connection = connection; + this.autoCommit = autoCommit; + this.stopOnError = stopOnError; + } + + public static void main(String[] args) { + String dbName = "db_mmu"; + String appKey = dbName.substring(dbName.lastIndexOf("_")+1, dbName.length()); + System.out.println(appKey); + } + + public void setDelimiter(String delimiter, boolean fullLineDelimiter) { + this.delimiter = delimiter; + this.fullLineDelimiter = fullLineDelimiter; + } + + /** + * Runs an SQL script (read in using the Reader parameter) + * + * @param reader + * - the source of the script + */ + public void runScript(Reader reader) throws IOException, SQLException { + try { + boolean originalAutoCommit = connection.getAutoCommit(); + try { + if (originalAutoCommit != this.autoCommit) { + connection.setAutoCommit(this.autoCommit); + } + runScript(connection, reader); + } finally { + connection.setAutoCommit(originalAutoCommit); + } + } catch (IOException e) { + throw e; + } catch (SQLException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException("Error running script. Cause: " + e, e); + } + } + + public void runScript(Reader reader, String dbName) throws IOException, SQLException { + try { + boolean originalAutoCommit = connection.getAutoCommit(); + try { + if (originalAutoCommit != this.autoCommit) { + connection.setAutoCommit(this.autoCommit); + } + runScript(connection, reader, dbName); + } finally { + connection.setAutoCommit(originalAutoCommit); + } + } catch (IOException e) { + throw e; + } catch (SQLException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException("Error running script. Cause: " + e, e); + } + } + + /** + * Runs an SQL script (read in using the Reader parameter) using the connection + * passed in + * + * @param conn + * - the connection to use for the script + * @param reader + * - the source of the script + * @throws SQLException + * if any SQL errors occur + * @throws IOException + * if there is an error reading from the Reader + */ + private void runScript(Connection conn, Reader reader) throws IOException, SQLException { + StringBuffer command = null; + try { + LineNumberReader lineReader = new LineNumberReader(reader); + String line = null; + while ((line = lineReader.readLine()) != null) { + if (command == null) { + command = new StringBuffer(); + } + String trimmedLine = line.trim(); + if (trimmedLine.startsWith("--")) { + logger.info(trimmedLine); + } else if (trimmedLine.length() < 1 || trimmedLine.startsWith("//")) { + // Do nothing + } else if (trimmedLine.length() < 1 || trimmedLine.startsWith("--")) { + // Do nothing + + } else if (trimmedLine.startsWith("delimiter")) { + String newDelimiter = trimmedLine.split(" ")[1]; + this.setDelimiter(newDelimiter, fullLineDelimiter); + + } else if (!fullLineDelimiter && trimmedLine.endsWith(getDelimiter()) + || fullLineDelimiter && trimmedLine.equals(getDelimiter())) { + command.append(line.substring(0, line.lastIndexOf(getDelimiter()))); + command.append(" "); + Statement statement = conn.createStatement(); + + // logger.info(command.toString()); + + boolean hasResults = false; + logger.info("sql:"+command.toString()); + if (stopOnError) { + hasResults = statement.execute(command.toString()); + } else { + try { + statement.execute(command.toString()); + } catch (SQLException e) { + logger.error(e.getMessage(),e); + throw e; + } + } + + ResultSet rs = statement.getResultSet(); + if (hasResults && rs != null) { + ResultSetMetaData md = rs.getMetaData(); + int cols = md.getColumnCount(); + for (int i = 0; i < cols; i++) { + String name = md.getColumnLabel(i); + logger.info(name + "\t"); + } + logger.info(""); + while (rs.next()) { + for (int i = 0; i < cols; i++) { + String value = rs.getString(i); + logger.info(value + "\t"); + } + logger.info(""); + } + } + + command = null; + try { + statement.close(); + } catch (Exception e) { + // Ignore to workaround a bug in Jakarta DBCP + } + Thread.yield(); + } else { + command.append(line); + command.append(" "); + } + } + + } catch (SQLException e) { + logger.error("Error executing: " + command.toString()); + throw e; + } catch (IOException e) { + e.fillInStackTrace(); + logger.error("Error executing: " + command.toString()); + throw e; + } + } + + private void runScript(Connection conn, Reader reader , String dbName) throws IOException, SQLException { + StringBuffer command = null; + String sql = ""; + String appKey = dbName.substring(dbName.lastIndexOf("_")+1, dbName.length()); + try { + LineNumberReader lineReader = new LineNumberReader(reader); + String line = null; + while ((line = lineReader.readLine()) != null) { + if (command == null) { + command = new StringBuffer(); + } + String trimmedLine = line.trim(); + if (trimmedLine.startsWith("--")) { + logger.info(trimmedLine); + } else if (trimmedLine.length() < 1 || trimmedLine.startsWith("//")) { + // Do nothing + } else if (trimmedLine.length() < 1 || trimmedLine.startsWith("--")) { + // Do nothing + + } else if (trimmedLine.startsWith("delimiter")) { + String newDelimiter = trimmedLine.split(" ")[1]; + this.setDelimiter(newDelimiter, fullLineDelimiter); + + } else if (!fullLineDelimiter && trimmedLine.endsWith(getDelimiter()) + || fullLineDelimiter && trimmedLine.equals(getDelimiter())) { + command.append(line.substring(0, line.lastIndexOf(getDelimiter()))); + command.append(" "); + Statement statement = conn.createStatement(); + + // logger.info(command.toString()); + + sql = command.toString().replaceAll("\\{\\{APPDB\\}\\}", dbName); + boolean hasResults = false; + logger.info("sql:"+sql); + if (stopOnError) { + hasResults = statement.execute(sql); + } else { + try { + statement.execute(sql); + } catch (SQLException e) { + logger.error(e.getMessage(),e); + throw e; + } + } + + ResultSet rs = statement.getResultSet(); + if (hasResults && rs != null) { + ResultSetMetaData md = rs.getMetaData(); + int cols = md.getColumnCount(); + for (int i = 0; i < cols; i++) { + String name = md.getColumnLabel(i); + logger.info(name + "\t"); + } + logger.info(""); + while (rs.next()) { + for (int i = 0; i < cols; i++) { + String value = rs.getString(i); + logger.info(value + "\t"); + } + logger.info(""); + } + } + + command = null; + try { + statement.close(); + } catch (Exception e) { + // Ignore to workaround a bug in Jakarta DBCP + } + Thread.yield(); + } else { + command.append(line); + command.append(" "); + } + } + + } catch (SQLException e) { + logger.error("Error executing: " + sql); + throw e; + } catch (IOException e) { + e.fillInStackTrace(); + logger.error("Error executing: " + sql); + throw e; + } + } + + private String getDelimiter() { + return delimiter; + } + +} \ No newline at end of file diff --git a/escheduler-dao/readme.txt b/escheduler-dao/readme.txt new file mode 100644 index 0000000000..c47308a1a1 --- /dev/null +++ b/escheduler-dao/readme.txt @@ -0,0 +1,13 @@ +-- 用户指定队列 +alter table t_escheduler_user add queue varchar(64); + +-- 访问token +CREATE TABLE `t_escheduler_access_token` ( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键', + `user_id` int(11) DEFAULT NULL COMMENT '用户id', + `token` varchar(64) DEFAULT NULL COMMENT 'token令牌', + `expire_time` datetime DEFAULT NULL COMMENT 'token有效结束时间', + `create_time` datetime DEFAULT NULL COMMENT '创建时间', + `update_time` datetime DEFAULT NULL COMMENT '更新时间', + PRIMARY KEY (`id`) +) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8; \ No newline at end of file diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index 38dce40f81..e6156a0f4c 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -59,7 +59,7 @@ public class ProcessDao extends AbstractBaseDao { ExecutionStatus.READY_STOP.ordinal()}; @Autowired - private ProjectMapper projectMapper; + private UserMapper userMapper; @Autowired private ProcessDefinitionMapper processDefineMapper; @@ -102,7 +102,7 @@ public class ProcessDao extends AbstractBaseDao { */ @Override protected void init() { - projectMapper = getMapper(ProjectMapper.class); + userMapper=getMapper(UserMapper.class); processDefineMapper = getMapper(ProcessDefinitionMapper.class); processInstanceMapper = getMapper(ProcessInstanceMapper.class); dataSourceMapper = getMapper(DataSourceMapper.class); @@ -120,48 +120,64 @@ public class ProcessDao extends AbstractBaseDao { * find one command from command queue, construct process instance * @param logger * @param host - * @param vaildThreadNum + * @param validThreadNum * @return */ @Transactional(value = "TransactionManager",rollbackFor = Exception.class) - public ProcessInstance scanCommand(Logger logger, String host, int vaildThreadNum){ + public ProcessInstance scanCommand(Logger logger, String host, int validThreadNum){ ProcessInstance processInstance = null; Command command = findOneCommand(); - if (command == null) { return null; } logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); - processInstance = constructProcessInstance(command, host); - - //cannot construct process instance, return null; - if(processInstance == null){ - logger.error("scan command, command parameter is error: %s", command.toString()); - }else{ - // check thread number enough for this command, if not, change state to waiting thread. - int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId()); - if(vaildThreadNum < commandThreadCount){ - logger.info("there is not enough thread for this command: {}",command.toString() ); - processInstance.setState(ExecutionStatus.WAITTING_THREAD); - if(command.getCommandType() != CommandType.RECOVER_WAITTING_THREAD){ - processInstance.addHistoryCmd(command.getCommandType()); - } - saveProcessInstance(processInstance); - this.setSubProcessParam(processInstance); - createRecoveryWaitingThreadCommand(command, processInstance); + try{ + processInstance = constructProcessInstance(command, host); + //cannot construct process instance, return null; + if(processInstance == null){ + logger.error("scan command, command parameter is error: %s", command.toString()); + delCommandByid(command.getId()); return null; + }else if(!checkThreadNum(command, validThreadNum)){ + logger.info("there is not enough thread for this command: {}",command.toString() ); + return setWaitingThreadProcess(command, processInstance); }else{ - processInstance.setCommandType(command.getCommandType()); - processInstance.addHistoryCmd(command.getCommandType()); - saveProcessInstance(processInstance); - this.setSubProcessParam(processInstance); + processInstance.setCommandType(command.getCommandType()); + processInstance.addHistoryCmd(command.getCommandType()); + saveProcessInstance(processInstance); + this.setSubProcessParam(processInstance); + delCommandByid(command.getId()); + return processInstance; } + }catch (Exception e){ + logger.error("scan command error ", e); + delCommandByid(command.getId()); } - // delete command - delCommandByid(command.getId()); - return processInstance; + return null; + } + + /** + * set process waiting thread + * @param command + * @param processInstance + * @return + */ + private ProcessInstance setWaitingThreadProcess(Command command, ProcessInstance processInstance) { + processInstance.setState(ExecutionStatus.WAITTING_THREAD); + if(command.getCommandType() != CommandType.RECOVER_WAITTING_THREAD){ + processInstance.addHistoryCmd(command.getCommandType()); + } + saveProcessInstance(processInstance); + this.setSubProcessParam(processInstance); + createRecoveryWaitingThreadCommand(command, processInstance); + return null; + } + + private boolean checkThreadNum(Command command, int validThreadNum) { + int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId()); + return validThreadNum >= commandThreadCount; } /** @@ -245,7 +261,7 @@ public class ProcessDao extends AbstractBaseDao { public ProcessInstance findProcessInstanceByScheduleTime(int defineId, Date scheduleTime){ return processInstanceMapper.queryByScheduleTime(defineId, - DateUtils.dateToString(scheduleTime), 0,null, null); + DateUtils.dateToString(scheduleTime), 0, null, null); } /** @@ -669,7 +685,7 @@ public class ProcessDao extends AbstractBaseDao { paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(processInstance.getId())); processInstance.setCommandParam(JSONUtils.toJson(paramMap)); processInstance.setIsSubProcess(Flag.YES); - this.updateProcessInstance(processInstance); + this.saveProcessInstance(processInstance); } // copy parent instance user def params to sub process.. String parentInstanceId = paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID); @@ -677,7 +693,7 @@ public class ProcessDao extends AbstractBaseDao { ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)); if(parentInstance != null){ processInstance.setGlobalParams(parentInstance.getGlobalParams()); - this.updateProcessInstance(processInstance); + this.saveProcessInstance(processInstance); }else{ logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam); } @@ -1194,7 +1210,7 @@ public class ProcessDao extends AbstractBaseDao { public int updateProcessInstance(Integer processInstanceId, String processJson, String globalParams, Date scheduleTime, Flag flag, String locations, String connects){ - return processInstanceMapper.updateProcessInstance( processInstanceId, processJson, + return processInstanceMapper.updateProcessInstance(processInstanceId, processJson, globalParams, scheduleTime, locations, connects, flag); } @@ -1538,4 +1554,16 @@ public class ProcessDao extends AbstractBaseDao { DateUtils.dateToString(dateInterval.getEndTime()), stateArray); } + + /** + * query user queue by process instance id + * @param processInstanceId + * @return + */ + public String queryQueueByProcessInstanceId(int processInstanceId){ + return userMapper.queryQueueByProcessInstanceId(processInstanceId); + } + + + } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java index ebc206f9ac..555f3a6d18 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java @@ -17,6 +17,7 @@ package cn.escheduler.dao; import cn.escheduler.common.Constants; +import cn.escheduler.common.utils.DateUtils; import cn.escheduler.dao.model.TaskRecord; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; @@ -39,6 +40,8 @@ public class TaskRecordDao { private static Logger logger = LoggerFactory.getLogger(TaskRecordDao.class.getName()); + + /** * 加载配置文件 */ @@ -133,7 +136,7 @@ public class TaskRecordDao { * @param filterMap * @return */ - public static int countTaskRecord(Map filterMap){ + public static int countTaskRecord(Map filterMap, String table){ int count = 0; Connection conn = null; @@ -142,7 +145,7 @@ public class TaskRecordDao { if(conn == null){ return count; } - String sql = "select count(1) as count from eamp_hive_log_hd"; + String sql = String.format("select count(1) as count from %s", table); sql += getWhereString(filterMap); PreparedStatement pstmt; pstmt = conn.prepareStatement(sql); @@ -170,9 +173,9 @@ public class TaskRecordDao { * @param filterMap * @return */ - public static List queryAllTaskRecord(Map filterMap ) { + public static List queryAllTaskRecord(Map filterMap , String table) { - String sql = "select * from eamp_hive_log_hd "; + String sql = String.format("select * from %s", table); sql += getWhereString(filterMap); int offset = Integer.parseInt(filterMap.get("offset")); @@ -201,8 +204,8 @@ public class TaskRecordDao { taskRecord.setProcId(resultSet.getInt("PROC_ID")); taskRecord.setProcName(resultSet.getString("PROC_NAME")); taskRecord.setProcDate(resultSet.getString("PROC_DATE")); - taskRecord.setStartDate(resultSet.getDate("STARTDATE")); - taskRecord.setEndDate(resultSet.getDate("ENDDATE")); + taskRecord.setStartTime(DateUtils.stringToDate(resultSet.getString("STARTDATE"))); + taskRecord.setEndTime(DateUtils.stringToDate(resultSet.getString("ENDDATE"))); taskRecord.setResult(resultSet.getString("RESULT")); taskRecord.setDuration(resultSet.getInt("DURATION")); taskRecord.setNote(resultSet.getString("NOTE")); diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/AccessTokenMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/AccessTokenMapper.java new file mode 100644 index 0000000000..aa196561b5 --- /dev/null +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/AccessTokenMapper.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.dao.mapper; + +import cn.escheduler.common.enums.UserType; +import cn.escheduler.dao.model.AccessToken; +import cn.escheduler.dao.model.User; +import org.apache.ibatis.annotations.*; +import org.apache.ibatis.type.EnumOrdinalTypeHandler; +import org.apache.ibatis.type.JdbcType; + +import java.sql.Timestamp; +import java.util.List; + +public interface AccessTokenMapper { + + /** + * insert accessToken + * @param accessToken + * @return + */ + @InsertProvider(type = AccessTokenMapperProvider.class, method = "insert") + @Options(useGeneratedKeys = true,keyProperty = "accessToken.id") + @SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "accessToken.id", before = false, resultType = int.class) + int insert(@Param("accessToken") AccessToken accessToken); + + + /** + * delete accessToken + * @param accessTokenId + * @return + */ + @DeleteProvider(type = AccessTokenMapperProvider.class, method = "delete") + int delete(@Param("accessTokenId") int accessTokenId); + + + /** + * update accessToken + * + * @param accessToken + * @return + */ + @UpdateProvider(type = AccessTokenMapperProvider.class, method = "update") + int update(@Param("accessToken") AccessToken accessToken); + + + /** + * query access token list paging + * @param searchVal + * @param offset + * @param pageSize + * @return + */ + @Results(value = {@Result(property = "id", column = "id", id = true, javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "userId", column = "user_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "token", column = "token", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "expireTime", column = "expire_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), + @Result(property = "createTime", column = "create_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), + @Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE) + }) + @SelectProvider(type = AccessTokenMapperProvider.class, method = "queryAccessTokenPaging") + List queryAccessTokenPaging(@Param("searchVal") String searchVal, + @Param("offset") Integer offset, + @Param("pageSize") Integer pageSize); + + /** + * count access token by search value + * @param searchVal + * @return + */ + @SelectProvider(type = AccessTokenMapperProvider.class, method = "countAccessTokenPaging") + Integer countAccessTokenPaging(@Param("searchVal") String searchVal); +} diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/AccessTokenMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/AccessTokenMapperProvider.java new file mode 100644 index 0000000000..a2b69d5cc1 --- /dev/null +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/AccessTokenMapperProvider.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.dao.mapper; + +import org.apache.commons.lang3.StringUtils; +import org.apache.ibatis.jdbc.SQL; + +import java.util.Map; + +/** + * access token mapper provider + * + */ +public class AccessTokenMapperProvider { + + private static final String TABLE_NAME = "t_escheduler_access_token"; + + /** + * insert accessToken + * + * @param parameter + * @return + */ + public String insert(Map parameter) { + return new SQL() { + { + INSERT_INTO(TABLE_NAME); + VALUES("`user_id`", "#{accessToken.userId}"); + VALUES("`token`", "#{accessToken.token}"); + VALUES("`expire_time`", "#{accessToken.expireTime}");; + VALUES("`create_time`", "#{accessToken.createTime}"); + VALUES("`update_time`", "#{accessToken.updateTime}"); + } + }.toString(); + } + + /** + * delete accessToken + * + * @param parameter + * @return + */ + public String delete(Map parameter) { + return new SQL() { + { + DELETE_FROM(TABLE_NAME); + + WHERE("`id`=#{accessTokenId}"); + } + }.toString(); + } + + /** + * update accessToken + * + * @param parameter + * @return + */ + public String update(Map parameter) { + return new SQL() { + { + UPDATE(TABLE_NAME); + + SET("`user_id`=#{accessToken.userId}"); + SET("`token`=#{accessToken.token}"); + SET("`expire_time`=#{accessToken.expireTime}"); + SET("`update_time`=#{accessToken.updateTime}"); + + WHERE("`id`=#{user.id}"); + } + }.toString(); + } + + + /** + * count user number by search value + * @param parameter + * @return + */ + public String countAccessTokenPaging(Map parameter) { + return new SQL() {{ + SELECT("count(0)"); + FROM(TABLE_NAME + " t,t_escheduler_user u"); + Object searchVal = parameter.get("searchVal"); + WHERE("u.id = t.user_id"); + if(searchVal != null && StringUtils.isNotEmpty(searchVal.toString())){ + WHERE(" u.user_name like concat('%', #{searchVal}, '%')"); + } + }}.toString(); + } + + /** + * query user list paging + * @param parameter + * @return + */ + public String queryAccessTokenPaging(Map parameter) { + return new SQL() { + { + SELECT("t.*,u.user_name"); + FROM(TABLE_NAME + " t,t_escheduler_user u"); + Object searchVal = parameter.get("searchVal"); + WHERE("u.id = t.user_id"); + if(searchVal != null && StringUtils.isNotEmpty(searchVal.toString())){ + WHERE(" u.user_name like concat('%', #{searchVal}, '%') "); + } + ORDER_BY(" t.update_time desc limit #{offset},#{pageSize} "); + } + }.toString(); + + } + + + + +} diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapper.java index d33fb9ebfb..21415c758e 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapper.java @@ -222,4 +222,33 @@ public interface UserMapper { }) @SelectProvider(type = UserMapperProvider.class, method = "queryTenantCodeByUserId") User queryTenantCodeByUserId(@Param("userId") int userId); + + + /** + * query user queue by process instance id + * @param processInstanceId + * @return + */ + @SelectProvider(type = UserMapperProvider.class, method = "queryQueueByProcessInstanceId") + String queryQueueByProcessInstanceId(@Param("processInstanceId") int processInstanceId); + + + /** + * query user by token + * @param token + * @return + */ + @Results(value = { + @Result(property = "id", column = "id", id = true, javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "userPassword", column = "user_password", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "email", column = "email", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "phone", column = "phone", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "userType", column = "user_type", typeHandler = EnumOrdinalTypeHandler.class, javaType = UserType.class, jdbcType = JdbcType.TINYINT), + @Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "createTime", column = "create_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), + @Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE) + }) + @SelectProvider(type = UserMapperProvider.class, method = "queryUserByToken") + User queryUserByToken(@Param("token") String token); } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapperProvider.java index d1cfc34ad9..8496687c39 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapperProvider.java @@ -47,6 +47,7 @@ public class UserMapperProvider { VALUES("`phone`", "#{user.phone}"); VALUES("`user_type`", EnumFieldUtil.genFieldStr("user.userType", UserType.class)); VALUES("`tenant_id`", "#{user.tenantId}"); + VALUES("`queue`", "#{user.queue}"); VALUES("`create_time`", "#{user.createTime}"); VALUES("`update_time`", "#{user.updateTime}"); } @@ -86,6 +87,7 @@ public class UserMapperProvider { SET("`phone`=#{user.phone}"); SET("`user_type`="+EnumFieldUtil.genFieldStr("user.userType", UserType.class)); SET("`tenant_id`=#{user.tenantId}"); + SET("`queue`=#{user.queue}"); SET("`create_time`=#{user.createTime}"); SET("`update_time`=#{user.updateTime}"); @@ -247,4 +249,36 @@ public class UserMapperProvider { }.toString(); } + + /** + * query tenant code by user id + * @param parameter + * @return + */ + public String queryQueueByProcessInstanceId(Map parameter) { + return new SQL() { + { + SELECT("queue"); + FROM(TABLE_NAME + " u,t_escheduler_process_instance p"); + WHERE("u.id = p.executor_id and p.id=#{processInstanceId}"); + } + }.toString(); + } + + + /** + * query user by id + * @param parameter + * @return + */ + public String queryUserByToken(Map parameter) { + return new SQL() { + { + SELECT("u.*"); + FROM(TABLE_NAME + " u ,t_escheduler_access_token t"); + WHERE(" u.id = t.user_id and token=#{token}"); + } + }.toString(); + } + } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/AccessToken.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/AccessToken.java new file mode 100644 index 0000000000..559fe45a96 --- /dev/null +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/AccessToken.java @@ -0,0 +1,126 @@ +package cn.escheduler.dao.model; + +import java.util.Date; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class AccessToken { + + /** + * id + */ + private int id; + + /** + * user id + */ + private int userId; + + /** + * user name + */ + private String userName; + + /** + * user token + */ + private String token; + + /** + * token expire time + */ + private Date expireTime; + + /** + * create time + */ + private Date createTime; + + /** + * update time + */ + private Date updateTime; + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public int getUserId() { + return userId; + } + + public void setUserId(int userId) { + this.userId = userId; + } + + public String getToken() { + return token; + } + + public void setToken(String token) { + this.token = token; + } + + public Date getExpireTime() { + return expireTime; + } + + public void setExpireTime(Date expireTime) { + this.expireTime = expireTime; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + @Override + public String toString() { + return "AccessToken{" + + "id=" + id + + ", userId=" + userId + + ", userName='" + userName + '\'' + + ", token='" + token + '\'' + + ", expireTime=" + expireTime + + ", createTime=" + createTime + + ", updateTime=" + updateTime + + '}'; + } +} diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskRecord.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskRecord.java index 17bd073366..82e96061b8 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskRecord.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskRecord.java @@ -46,12 +46,12 @@ public class TaskRecord { /** * start date */ - private Date startDate; + private Date startTime; /** * end date */ - private Date endDate; + private Date endTime; /** * result @@ -136,20 +136,20 @@ public class TaskRecord { this.procDate = procDate; } - public Date getStartDate() { - return startDate; + public Date getStartTime() { + return startTime; } - public void setStartDate(Date startDate) { - this.startDate = startDate; + public void setStartTime(Date startTime) { + this.startTime = startTime; } - public Date getEndDate() { - return endDate; + public Date getEndTime() { + return endTime; } - public void setEndDate(Date endDate) { - this.endDate = endDate; + public void setEndTime(Date endTime) { + this.endTime = endTime; } public String getResult() { @@ -238,8 +238,8 @@ public class TaskRecord { +" proc id:" + procId + " proc name:" + procName + " proc date: " + procDate - + " start date:" + startDate - + " end date:" + endDate + + " start date:" + startTime + + " end date:" + endTime + " result : " + result + " duration : " + duration + " note : " + note diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/User.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/User.java index 7fda405ef2..6f831fbd96 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/User.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/User.java @@ -79,6 +79,12 @@ public class User { * alert group */ private String alertGroup; + + /** + * user specified queue + */ + private String queue; + /** * create time */ @@ -194,23 +200,12 @@ public class User { this.tenantCode = tenantCode; } - @Override - public String toString() { - return "User{" + - "id=" + id + - ", userName='" + userName + '\'' + - ", userPassword='" + userPassword + '\'' + - ", email='" + email + '\'' + - ", phone='" + phone + '\'' + - ", userType=" + userType + - ", tenantId=" + tenantId + - ", tenantCode='" + tenantCode + '\'' + - ", tenantName='" + tenantName + '\'' + - ", queueName='" + queueName + '\'' + - ", alertGroup='" + alertGroup + '\'' + - ", createTime=" + createTime + - ", updateTime=" + updateTime + - '}'; + public String getQueue() { + return queue; + } + + public void setQueue(String queue) { + this.queue = queue; } @Override @@ -237,4 +232,24 @@ public class User { result = 31 * result + userName.hashCode(); return result; } + + @Override + public String toString() { + return "User{" + + "id=" + id + + ", userName='" + userName + '\'' + + ", userPassword='" + userPassword + '\'' + + ", email='" + email + '\'' + + ", phone='" + phone + '\'' + + ", userType=" + userType + + ", tenantId=" + tenantId + + ", tenantCode='" + tenantCode + '\'' + + ", tenantName='" + tenantName + '\'' + + ", queueName='" + queueName + '\'' + + ", alertGroup='" + alertGroup + '\'' + + ", queue='" + queue + '\'' + + ", createTime=" + createTime + + ", updateTime=" + updateTime + + '}'; + } } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/EschedulerManager.java b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/EschedulerManager.java new file mode 100644 index 0000000000..34f07fbb29 --- /dev/null +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/EschedulerManager.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.dao.upgrade; + +import cn.escheduler.common.utils.SchemaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * upgrade manager + */ +public class EschedulerManager { + private static final Logger logger = LoggerFactory.getLogger(EschedulerManager.class); + UpgradeDao upgradeDao = UpgradeDao.getInstance(); + + public void initEscheduler() { + this.initEschedulerSchema(); + } + + public void initEschedulerSchema() { + + logger.info("Start initializing the ark manager mysql table structure"); + upgradeDao.initEschedulerSchema(); + } + + + /** + * upgrade escheduler + */ + public void upgradeEscheduler() throws Exception{ + + // Gets a list of all upgrades + List schemaList = SchemaUtils.getAllSchemaList(); + if(schemaList == null || schemaList.size() == 0) { + logger.info("There is no schema to upgrade!"); + }else { + + String version = ""; + // The target version of the upgrade + String schemaVersion = ""; + for(String schemaDir : schemaList) { + // Gets the version of the current system + if (upgradeDao.isExistsTable("t_escheduler_version")) { + version = upgradeDao.getCurrentVersion(); + }else { + version = "1.0.0"; + } + + schemaVersion = schemaDir.split("_")[0]; + if(SchemaUtils.isAGreatVersion(schemaVersion , version)) { + + logger.info("upgrade escheduler metadata version from " + version + " to " + schemaVersion); + + + logger.info("Begin upgrading escheduler's mysql table structure"); + upgradeDao.upgradeEscheduler(schemaDir); + + } + + } + } + + // Assign the value of the version field in the version table to the version of the product + upgradeDao.updateVersion(SchemaUtils.getSoftVersion()); + } +} diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/UpgradeDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/UpgradeDao.java new file mode 100644 index 0000000000..f4fb307f05 --- /dev/null +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/UpgradeDao.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.dao.upgrade; + +import cn.escheduler.common.utils.MysqlUtil; +import cn.escheduler.common.utils.ScriptRunner; +import cn.escheduler.dao.AbstractBaseDao; +import cn.escheduler.dao.datasource.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +public class UpgradeDao extends AbstractBaseDao { + + public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class); + private static final String T_VERSION_NAME = "t_escheduler_version"; + + @Override + protected void init() { + + } + + private static class UpgradeDaoHolder { + private static final UpgradeDao INSTANCE = new UpgradeDao(); + } + + private UpgradeDao() { + } + + public static final UpgradeDao getInstance() { + return UpgradeDaoHolder.INSTANCE; + } + + + + public void initEschedulerSchema() { + + // Execute the escheduler DDL, it cannot be rolled back + runInitEschedulerDDL(); + + // Execute the escheduler DML, it can be rolled back + runInitEschedulerDML(); + + } + + private void runInitEschedulerDML() { + Connection conn = null; + try { + conn = ConnectionFactory.getDataSource().getConnection(); + conn.setAutoCommit(false); + // 执行escheduler_dml.sql脚本,导入escheduler相关的数据 + // Execute the ark_manager_dml.sql script to import the data related to escheduler + + ScriptRunner initScriptRunner = new ScriptRunner(conn, false, true); + Reader initSqlReader = new FileReader(new File("sql/create/release-1.0.0_schema/mysql/escheduler_dml.sql")); + initScriptRunner.runScript(initSqlReader); + + conn.commit(); + } catch (IOException e) { + try { + conn.rollback(); + } catch (SQLException e1) { + logger.error(e1.getMessage(),e1); + } + logger.error(e.getMessage(),e); + throw new RuntimeException(e.getMessage(),e); + } catch (Exception e) { + try { + conn.rollback(); + } catch (SQLException e1) { + logger.error(e1.getMessage(),e1); + } + logger.error(e.getMessage(),e); + throw new RuntimeException(e.getMessage(),e); + } finally { + MysqlUtil.realeaseResource(null, null, conn); + + } + + } + + private void runInitEschedulerDDL() { + Connection conn = null; + try { + conn = ConnectionFactory.getDataSource().getConnection(); + // Execute the escheduler_ddl.sql script to create the table structure of escheduler + ScriptRunner initScriptRunner = new ScriptRunner(conn, true, true); + Reader initSqlReader = new FileReader(new File("sql/create/release-1.0.0_schema/mysql/escheduler_ddl.sql")); + initScriptRunner.runScript(initSqlReader); + + } catch (IOException e) { + + logger.error(e.getMessage(),e); + throw new RuntimeException(e.getMessage(),e); + } catch (Exception e) { + + logger.error(e.getMessage(),e); + throw new RuntimeException(e.getMessage(),e); + } finally { + MysqlUtil.realeaseResource(null, null, conn); + + } + + } + + + public boolean isExistsTable(String tableName) { + Connection conn = null; + try { + conn = ConnectionFactory.getDataSource().getConnection(); + ResultSet rs = conn.getMetaData().getTables(null, null, tableName, null); + if (rs.next()) { + return true; + } else { + return false; + } + + } catch (SQLException e) { + logger.error(e.getMessage(),e); + throw new RuntimeException(e.getMessage(),e); + } finally { + MysqlUtil.realeaseResource(null, null, conn); + + } + + } + + + public String getCurrentVersion() { + String sql = String.format("select version from %s",T_VERSION_NAME); + Connection conn = null; + ResultSet rs = null; + PreparedStatement pstmt = null; + String version = null; + try { + conn = ConnectionFactory.getDataSource().getConnection(); + pstmt = conn.prepareStatement(sql); + rs = pstmt.executeQuery(); + + if (rs.next()) { + version = rs.getString(1); + } + + return version; + + } catch (SQLException e) { + logger.error(e.getMessage(),e); + throw new RuntimeException("sql: " + sql, e); + } finally { + MysqlUtil.realeaseResource(rs, pstmt, conn); + + } + } + + + public void upgradeEscheduler(String schemaDir) { + + upgradeEschedulerDDL(schemaDir); + + upgradeEschedulerDML(schemaDir); + + } + + private void upgradeEschedulerDML(String schemaDir) { + String schemaVersion = schemaDir.split("_")[0]; + String mysqlSQLFilePath = "sql/upgrade/" + schemaDir + "/mysql/escheduler_dml.sql"; + Connection conn = null; + PreparedStatement pstmt = null; + try { + conn = ConnectionFactory.getDataSource().getConnection(); + conn.setAutoCommit(false); + // Execute the upgraded escheduler dml + ScriptRunner scriptRunner = new ScriptRunner(conn, false, true); + Reader sqlReader = new FileReader(new File(mysqlSQLFilePath)); + scriptRunner.runScript(sqlReader); + if (isExistsTable(T_VERSION_NAME)) { + // Change version in the version table to the new version + String upgradeSQL = String.format("update %s set version = ?",T_VERSION_NAME); + pstmt = conn.prepareStatement(upgradeSQL); + pstmt.setString(1, schemaVersion); + pstmt.executeUpdate(); + } + conn.commit(); + } catch (FileNotFoundException e) { + try { + conn.rollback(); + } catch (SQLException e1) { + logger.error(e1.getMessage(),e1); + } + logger.error(e.getMessage(),e); + throw new RuntimeException("sql file not found ", e); + } catch (IOException e) { + try { + conn.rollback(); + } catch (SQLException e1) { + logger.error(e1.getMessage(),e1); + } + logger.error(e.getMessage(),e); + throw new RuntimeException(e.getMessage(),e); + } catch (SQLException e) { + try { + conn.rollback(); + } catch (SQLException e1) { + logger.error(e1.getMessage(),e1); + } + logger.error(e.getMessage(),e); + throw new RuntimeException(e.getMessage(),e); + } catch (Exception e) { + try { + conn.rollback(); + } catch (SQLException e1) { + logger.error(e1.getMessage(),e1); + } + logger.error(e.getMessage(),e); + throw new RuntimeException(e.getMessage(),e); + } finally { + MysqlUtil.realeaseResource(null, pstmt, conn); + } + + } + + private void upgradeEschedulerDDL(String schemaDir) { + String mysqlSQLFilePath = "sql/upgrade/" + schemaDir + "/mysql/escheduler_ddl.sql"; + Connection conn = null; + PreparedStatement pstmt = null; + try { + conn = ConnectionFactory.getDataSource().getConnection(); + String dbName = conn.getCatalog(); + logger.info(dbName); + conn.setAutoCommit(true); + // Execute the escheduler ddl.sql for the upgrade + ScriptRunner scriptRunner = new ScriptRunner(conn, true, true); + Reader sqlReader = new FileReader(new File(mysqlSQLFilePath)); + scriptRunner.runScript(sqlReader); + + } catch (FileNotFoundException e) { + + logger.error(e.getMessage(),e); + throw new RuntimeException("sql file not found ", e); + } catch (IOException e) { + + logger.error(e.getMessage(),e); + throw new RuntimeException(e.getMessage(),e); + } catch (SQLException e) { + + logger.error(e.getMessage(),e); + throw new RuntimeException(e.getMessage(),e); + } catch (Exception e) { + + logger.error(e.getMessage(),e); + throw new RuntimeException(e.getMessage(),e); + } finally { + MysqlUtil.realeaseResource(null, pstmt, conn); + } + + } + + + + public void updateVersion(String version) { + // Change version in the version table to the new version + String upgradeSQL = String.format("update %s set version = ?",T_VERSION_NAME); + PreparedStatement pstmt = null; + Connection conn = null; + try { + conn = ConnectionFactory.getDataSource().getConnection(); + pstmt = conn.prepareStatement(upgradeSQL); + pstmt.setString(1, version); + pstmt.executeUpdate(); + + } catch (SQLException e) { + logger.error(e.getMessage(),e); + throw new RuntimeException("sql: " + upgradeSQL, e); + } finally { + MysqlUtil.realeaseResource(null, pstmt, conn); + } + + } + +} diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/shell/CreateEscheduler.java b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/shell/CreateEscheduler.java new file mode 100644 index 0000000000..012c32bb9a --- /dev/null +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/shell/CreateEscheduler.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.dao.upgrade.shell; + +import cn.escheduler.dao.upgrade.EschedulerManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * init escheduler + * + */ +public class CreateEscheduler { + + private static final Logger logger = LoggerFactory.getLogger(CreateEscheduler.class); + + public static void main(String[] args) { + Thread.currentThread().setName("manager-CreateEscheduler"); + EschedulerManager eschedulerManager = new EschedulerManager(); + eschedulerManager.initEscheduler(); + logger.info("init escheduler finished"); + try { + eschedulerManager.upgradeEscheduler(); + logger.info("upgrade escheduler finished"); + } catch (Exception e) { + logger.error("upgrade escheduler failed",e); + } + + } +} diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/shell/InitEscheduler.java b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/shell/InitEscheduler.java new file mode 100644 index 0000000000..e88bb1e3f1 --- /dev/null +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/shell/InitEscheduler.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.dao.upgrade.shell; + +import cn.escheduler.dao.upgrade.EschedulerManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * init escheduler + * + */ +public class InitEscheduler { + + private static final Logger logger = LoggerFactory.getLogger(InitEscheduler.class); + + public static void main(String[] args) { + Thread.currentThread().setName("manager-InitEscheduler"); + EschedulerManager eschedulerManager = new EschedulerManager(); + eschedulerManager.initEscheduler(); + logger.info("init escheduler finished"); + + } +} diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/shell/UpgradeEscheduler.java b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/shell/UpgradeEscheduler.java new file mode 100644 index 0000000000..e73a1162c2 --- /dev/null +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/upgrade/shell/UpgradeEscheduler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.dao.upgrade.shell; + +import cn.escheduler.dao.upgrade.EschedulerManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * upgrade escheduler database + */ +public class UpgradeEscheduler { + private static final Logger logger = LoggerFactory.getLogger(UpgradeEscheduler.class); + + public static void main(String[] args) { + Thread.currentThread().setName("manager-UpgradeEscheduler"); + + EschedulerManager eschedulerManager = new EschedulerManager(); + try { + eschedulerManager.upgradeEscheduler(); + logger.info("upgrade escheduler finished"); + } catch (Exception e) { + logger.error(e.getMessage(),e); + logger.info("Upgrade escheduler failed"); + throw new RuntimeException(e); + } + + + } + + + +} diff --git a/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/AccessTokenMapperTest.java b/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/AccessTokenMapperTest.java new file mode 100644 index 0000000000..d5dfdcb1ec --- /dev/null +++ b/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/AccessTokenMapperTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.dao.mapper; + +import cn.escheduler.common.utils.EncryptionUtils; +import cn.escheduler.dao.datasource.ConnectionFactory; +import cn.escheduler.dao.model.AccessToken; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Date; +import java.util.List; + + + +public class AccessTokenMapperTest { + + + AccessTokenMapper accessTokenMapper; + + @Before + public void before(){ + accessTokenMapper = ConnectionFactory.getSqlSession().getMapper(AccessTokenMapper.class); + } + + @Test + public void testInsert(){ + AccessToken accessToken = new AccessToken(); + accessToken.setUserId(10); + accessToken.setExpireTime(new Date()); + accessToken.setToken("ssssssssssssssssssssssssss"); + accessToken.setCreateTime(new Date()); + accessToken.setUpdateTime(new Date()); + accessTokenMapper.insert(accessToken); + } + + @Test + public void testListPaging(){ + Integer count = accessTokenMapper.countAccessTokenPaging(""); + Assert.assertEquals(count, (Integer) 5); + + List accessTokenList = accessTokenMapper.queryAccessTokenPaging("", 0, 2); + Assert.assertEquals(accessTokenList.size(), 5); + } + + +} diff --git a/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/UserMapperTest.java b/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/UserMapperTest.java index 94ae0ca175..adede0c329 100644 --- a/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/UserMapperTest.java +++ b/escheduler-dao/src/test/java/cn/escheduler/dao/mapper/UserMapperTest.java @@ -60,4 +60,16 @@ public class UserMapperTest { } + @Test + public void queryQueueByProcessInstanceId(){ + String queue = userMapper.queryQueueByProcessInstanceId(41388); + Assert.assertEquals(queue, "ait"); + } + + @Test + public void testQueryUserByToken(){ + User user = userMapper.queryUserByToken("ad9e8fccfc11bd18bb45aa994568b8ef"); + Assert.assertEquals(user.getUserName(), "qiaozhanwei"); + } + } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java index 016607c79e..f381b9ef66 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java @@ -36,6 +36,7 @@ import cn.escheduler.server.worker.task.AbstractTask; import cn.escheduler.server.worker.task.TaskManager; import cn.escheduler.server.worker.task.TaskProps; import com.alibaba.fastjson.JSONObject; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,12 +155,18 @@ public class TaskScheduleThread implements Callable { taskProps.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId()); + String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId()); + taskProps.setScheduleTime(processInstance.getScheduleTime()); taskProps.setNodeName(taskInstance.getName()); taskProps.setTaskInstId(taskInstance.getId()); taskProps.setEnvFile(CommonUtils.getSystemEnvPath()); // set queue - taskProps.setQueue(taskInstance.getProcessInstance().getQueue()); + if (StringUtils.isEmpty(queue)){ + taskProps.setQueue(taskInstance.getProcessInstance().getQueue()); + }else { + taskProps.setQueue(queue); + } taskProps.setTaskStartTime(taskInstance.getStartTime()); taskProps.setDefinedParams(allParamMap); @@ -188,7 +195,7 @@ public class TaskScheduleThread implements Callable { task.handle(); - logger.info("task : {} exit status code : {}",taskProps.getTaskAppId(),task.getExitStatusCode()); + logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode()); if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ status = ExecutionStatus.SUCCESS; diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java index 2efe8cbf54..98428c5389 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java @@ -22,8 +22,11 @@ import cn.escheduler.common.enums.DbType; import cn.escheduler.common.enums.Direct; import cn.escheduler.common.enums.TaskTimeoutStrategy; import cn.escheduler.common.job.db.BaseDataSource; +import cn.escheduler.common.job.db.ClickHouseDataSource; import cn.escheduler.common.job.db.MySQLDataSource; +import cn.escheduler.common.job.db.OracleDataSource; import cn.escheduler.common.job.db.PostgreDataSource; +import cn.escheduler.common.job.db.SQLServerDataSource; import cn.escheduler.common.process.Property; import cn.escheduler.common.task.AbstractParameters; import cn.escheduler.common.task.procedure.ProcedureParameters; @@ -111,6 +114,17 @@ public class ProcedureTask extends AbstractTask { }else if (DbType.POSTGRESQL.name().equals(dataSource.getType().name())){ baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),PostgreDataSource.class); Class.forName(Constants.JDBC_POSTGRESQL_CLASS_NAME); + }else if (DbType.CLICKHOUSE.name().equals(dataSource.getType().name())){ + // NOTE: currently, ClickHouse don't support procedure or UDF yet, + // but still load JDBC driver to keep source code sync with other DB + baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),ClickHouseDataSource.class); + Class.forName(Constants.JDBC_CLICKHOUSE_CLASS_NAME); + }else if (DbType.ORACLE.name().equals(dataSource.getType().name())){ + baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(), OracleDataSource.class); + Class.forName(Constants.JDBC_ORACLE_CLASS_NAME); + }else if (DbType.SQLSERVER.name().equals(dataSource.getType().name())){ + baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(), SQLServerDataSource.class); + Class.forName(Constants.JDBC_SQLSERVER_CLASS_NAME); } // get jdbc connection diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java index 36d92d71b5..bab755ba67 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java @@ -120,6 +120,15 @@ public class SqlTask extends AbstractTask { }else if (DbType.SPARK.name().equals(dataSource.getType().name())){ baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),SparkDataSource.class); Class.forName(Constants.JDBC_SPARK_CLASS_NAME); + }else if (DbType.CLICKHOUSE.name().equals(dataSource.getType().name())){ + baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),ClickHouseDataSource.class); + Class.forName(Constants.JDBC_CLICKHOUSE_CLASS_NAME); + }else if (DbType.ORACLE.name().equals(dataSource.getType().name())){ + baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),OracleDataSource.class); + Class.forName(Constants.JDBC_ORACLE_CLASS_NAME); + }else if (DbType.SQLSERVER.name().equals(dataSource.getType().name())){ + baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),SQLServerDataSource.class); + Class.forName(Constants.JDBC_SQLSERVER_CLASS_NAME); } Map sqlParamMap = new HashMap(); diff --git a/escheduler-server/src/test/java/cn/escheduler/server/worker/sql/SqlExecutorTest.java b/escheduler-server/src/test/java/cn/escheduler/server/worker/sql/SqlExecutorTest.java index aecf3e9230..120e7a9972 100644 --- a/escheduler-server/src/test/java/cn/escheduler/server/worker/sql/SqlExecutorTest.java +++ b/escheduler-server/src/test/java/cn/escheduler/server/worker/sql/SqlExecutorTest.java @@ -52,21 +52,63 @@ public class SqlExecutorTest { @Test public void test() throws Exception { + String nodeName = "mysql sql test"; + String taskAppId = "51_11282_263978"; + String tenantCode = "hdfs"; + int taskInstId = 263978; + sharedTestSqlTask(nodeName, taskAppId, tenantCode, taskInstId); + } + + @Test + public void testClickhouse() throws Exception { + String nodeName = "ClickHouse sql test"; + String taskAppId = "1_11_20"; + String tenantCode = "default"; + int taskInstId = 20; + sharedTestSqlTask(nodeName, taskAppId, tenantCode, taskInstId); + } + + @Test + public void testOracle() throws Exception { + String nodeName = "oracle sql test"; + String taskAppId = "2_13_25"; + String tenantCode = "demo"; + int taskInstId = 25; + sharedTestSqlTask(nodeName, taskAppId, tenantCode, taskInstId); + } + + @Test + public void testSQLServer() throws Exception { + String nodeName = "SQL Server sql test"; + String taskAppId = "3_14_27"; + String tenantCode = "demo"; + int taskInstId = 27; + sharedTestSqlTask(nodeName, taskAppId, tenantCode, taskInstId); + } + /** + * Basic test template for SQLTasks, mainly test different types of DBMS types + * @param nodeName node name for selected task + * @param taskAppId task app id + * @param tenantCode tenant code + * @param taskInstId task instance id + * @throws Exception + */ + private void sharedTestSqlTask(String nodeName, String taskAppId, String tenantCode, int taskInstId) throws Exception { TaskProps taskProps = new TaskProps(); taskProps.setTaskDir(""); // processDefineId_processInstanceId_taskInstanceId - taskProps.setTaskAppId("51_11282_263978"); + taskProps.setTaskAppId(taskAppId); // set tenant -> task execute linux user - taskProps.setTenantCode("hdfs"); + taskProps.setTenantCode(tenantCode); taskProps.setTaskStartTime(new Date()); taskProps.setTaskTimeout(360000); - taskProps.setTaskInstId(263978); - taskProps.setNodeName("mysql sql test"); + taskProps.setTaskInstId(taskInstId); + taskProps.setNodeName(nodeName); - TaskInstance taskInstance = processDao.findTaskInstanceById(263978); + TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId); String taskJson = taskInstance.getTaskJson(); TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class); diff --git a/escheduler-ui/.env b/escheduler-ui/.env index d4dcd9f473..52ce563e9b 100644 --- a/escheduler-ui/.env +++ b/escheduler-ui/.env @@ -1,6 +1,6 @@ # 后端接口地址 -API_BASE = http://192.168.220.154:12345 +API_BASE = http://192.168.220.247:12345 # 本地开发如需ip访问项目把"#"号去掉 #DEV_HOST = 192.168.xx.xx diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/config.js b/escheduler-ui/src/js/conf/home/pages/dag/_source/config.js index cefcf2d07a..740846890c 100644 --- a/escheduler-ui/src/js/conf/home/pages/dag/_source/config.js +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/config.js @@ -32,31 +32,31 @@ const toolOper = (dagThis) => { code: 'pointer', icon: '', disable: disabled, - desc: `${i18n.$t('拖动节点和选中项')}` + desc: `${i18n.$t('Drag Nodes and Selected Items')}` }, { code: 'line', icon: '', disable: disabled, - desc: `${i18n.$t('选择线条连接')}` + desc: `${i18n.$t('Select Line Connection')}` }, { code: 'remove', icon: '', disable: disabled, - desc: `${i18n.$t('删除选中的线或节点')}` + desc: `${i18n.$t('Delete selected lines or nodes')}` }, { code: 'download', icon: '', disable: !!dagThis.type, - desc: `${i18n.$t('下载')}` + desc: `${i18n.$t('Download')}` }, { code: 'screen', icon: '', disable: disabled, - desc: `${i18n.$t('全屏')}` + desc: `${i18n.$t('Full Screen')}` } ] } @@ -70,17 +70,17 @@ const toolOper = (dagThis) => { let publishStatus = [ { id: 0, - desc: `${i18n.$t('未发布')}`, + desc: `${i18n.$t('Unpublished')}`, code: 'NOT_RELEASE' }, { id: 1, - desc: `${i18n.$t('上线')}`, + desc: `${i18n.$t('online')}`, code: 'ONLINE' }, { id: 2, - desc: `${i18n.$t('下线')}`, + desc: `${i18n.$t('offline')}`, code: 'OFFLINE' } ] @@ -92,47 +92,47 @@ let publishStatus = [ */ let runningType = [ { - desc: `${i18n.$t('启动工作流')}`, + desc: `${i18n.$t('Start Process')}`, code: 'START_PROCESS' }, { - desc: `${i18n.$t('从当前节点开始执行')}`, + desc: `${i18n.$t('Execute from the current node')}`, code: 'START_CURRENT_TASK_PROCESS' }, { - desc: `${i18n.$t('恢复被容错的工作流')}`, + desc: `${i18n.$t('Recover tolerance fault process')}`, code: 'RECOVER_TOLERANCE_FAULT_PROCESS' }, { - desc: `${i18n.$t('恢复暂停流程')}`, + desc: `${i18n.$t('Resume the suspension process')}`, code: 'RECOVER_SUSPENDED_PROCESS' }, { - desc: `${i18n.$t('从失败节点开始执行')}`, + desc: `${i18n.$t('Execute from the failed nodes')}`, code: 'START_FAILURE_TASK_PROCESS' }, { - desc: `${i18n.$t('补数')}`, + desc: `${i18n.$t('Complement Data')}`, code: 'COMPLEMENT_DATA' }, { - desc: `${i18n.$t('调度执行')}`, + desc: `${i18n.$t('Scheduling execution')}`, code: 'SCHEDULER' }, { - desc: `${i18n.$t('重跑')}`, + desc: `${i18n.$t('Rerun')}`, code: 'REPEAT_RUNNING' }, { - desc: `${i18n.$t('暂停')}`, + desc: `${i18n.$t('Pause')}`, code: 'PAUSE' }, { - desc: `${i18n.$t('停止')}`, + desc: `${i18n.$t('Stop')}`, code: 'STOP' }, { - desc: `${i18n.$t('恢复等待线程')}`, + desc: `${i18n.$t('Recovery waiting thread')}`, code: 'RECOVER_WAITTING_THREAD' } ] @@ -149,63 +149,63 @@ let runningType = [ let tasksState = { 'SUBMITTED_SUCCESS': { id: 0, - desc: `${i18n.$t('提交成功')}`, + desc: `${i18n.$t('Submitted successfully')}`, color: '#A9A9A9', icoUnicode: '', isSpin: false }, 'RUNNING_EXEUTION': { id: 1, - desc: `${i18n.$t('正在执行')}`, + desc: `${i18n.$t('Executing')}`, color: '#0097e0', icoUnicode: '', isSpin: true }, 'READY_PAUSE': { id: 2, - desc: `${i18n.$t('准备暂停')}`, + desc: `${i18n.$t('Ready to pause')}`, color: '#07b1a3', icoUnicode: '', isSpin: false }, 'PAUSE': { id: 3, - desc: `${i18n.$t('暂停')}`, + desc: `${i18n.$t('Pause')}`, color: '#057c72', icoUnicode: '', isSpin: false }, 'READY_STOP': { id: 4, - desc: `${i18n.$t('准备停止')}`, + desc: `${i18n.$t('Ready to stop')}`, color: '#FE0402', icoUnicode: '', isSpin: false }, 'STOP': { id: 5, - desc: `${i18n.$t('停止')}`, + desc: `${i18n.$t('Stop')}`, color: '#e90101', icoUnicode: '', isSpin: false }, 'FAILURE': { id: 6, - desc: `${i18n.$t('失败')}`, + desc: `${i18n.$t('failed')}`, color: '#000000', icoUnicode: '', isSpin: false }, 'SUCCESS': { id: 7, - desc: `${i18n.$t('成功')}`, + desc: `${i18n.$t('success')}`, color: '#33cc00', icoUnicode: '', isSpin: false }, 'NEED_FAULT_TOLERANCE': { id: 8, - desc: `${i18n.$t('需要容错')}`, + desc: `${i18n.$t('Need fault tolerance')}`, color: '#FF8C00', icoUnicode: '', isSpin: false @@ -219,14 +219,14 @@ let tasksState = { }, 'WAITTING_THREAD': { id: 10, - desc: `${i18n.$t('等待线程')}`, + desc: `${i18n.$t('Waiting for thread')}`, color: '#912eed', icoUnicode: '', isSpin: false }, 'WAITTING_DEPEND': { id: 11, - desc: `${i18n.$t('等待依赖')}`, + desc: `${i18n.$t('Waiting for dependence')}`, color: '#5101be', icoUnicode: '', isSpin: false diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss b/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss index e159231fd0..dcfa6b7d5d 100644 --- a/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss @@ -386,10 +386,11 @@ svg path:hover { .name-p { position: absolute; left: 50%; - bottom: -24px; + top: 58px; width: 200px; text-align: center; margin-left: -100px; + word-break:break-all; } .ban-p { position: absolute; diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue b/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue index cc555979a1..3c08497b23 100644 --- a/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue @@ -1,7 +1,7 @@