diff --git a/.github/workflows/ci_e2e.yml b/.github/workflows/ci_e2e.yml
index 7ec204e145..924ef114ef 100644
--- a/.github/workflows/ci_e2e.yml
+++ b/.github/workflows/ci_e2e.yml
@@ -55,7 +55,7 @@ jobs:
run: sh ./dockerfile/hooks/check
- name: Prepare e2e env
run: |
- sudo apt-get install -y libxss1 libappindicator1 libindicator7 xvfb unzip
+ sudo apt-get install -y libxss1 libappindicator1 libindicator7 xvfb unzip libgbm1
wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb
sudo dpkg -i google-chrome*.deb
sudo apt-get install -f -y
diff --git a/.github/workflows/ci_ut.yml b/.github/workflows/ci_ut.yml
index 8d75bae163..1c2952b440 100644
--- a/.github/workflows/ci_ut.yml
+++ b/.github/workflows/ci_ut.yml
@@ -83,4 +83,4 @@ jobs:
mkdir -p ${LOG_DIR}
cd ${DOCKER_DIR}
docker-compose logs db > ${LOG_DIR}/db.txt
- continue-on-error: true
+ continue-on-error: true
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
index edf803fbeb..6dd99201a9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -147,3 +147,4 @@ dolphinscheduler-ui/dist/js/login/index.291b8e3.js.map
dolphinscheduler-ui/dist/lib/external/
/dolphinscheduler-dao/src/main/resources/dao/data_source.properties
+!/zookeeper_data/
diff --git a/docker/postgres/docker-entrypoint-initdb/init.sql b/docker/postgres/docker-entrypoint-initdb/init.sql
index b3c61ebce4..b48ddde042 100755
--- a/docker/postgres/docker-entrypoint-initdb/init.sql
+++ b/docker/postgres/docker-entrypoint-initdb/init.sql
@@ -191,7 +191,7 @@ CREATE TABLE t_ds_alert (
content text ,
alert_type int DEFAULT NULL ,
alert_status int DEFAULT '0' ,
- log text ,
+ ·log· text ,
alertgroup_id int DEFAULT NULL ,
receivers text ,
receivers_cc text ,
@@ -283,18 +283,6 @@ CREATE TABLE t_ds_error_command (
-- Table structure for table t_ds_master_server
--
-DROP TABLE IF EXISTS t_ds_master_server;
-CREATE TABLE t_ds_master_server (
- id int NOT NULL ,
- host varchar(45) DEFAULT NULL ,
- port int DEFAULT NULL ,
- zk_directory varchar(64) DEFAULT NULL ,
- res_info varchar(256) DEFAULT NULL ,
- create_time timestamp DEFAULT NULL ,
- last_heartbeat_time timestamp DEFAULT NULL ,
- PRIMARY KEY (id)
-) ;
-
--
-- Table structure for table t_ds_process_definition
--
@@ -319,6 +307,8 @@ CREATE TABLE t_ds_process_definition (
timeout int DEFAULT '0' ,
tenant_id int NOT NULL DEFAULT '-1' ,
update_time timestamp DEFAULT NULL ,
+ modify_by varchar(36) DEFAULT '' ,
+ resource_ids varchar(64),
PRIMARY KEY (id)
) ;
@@ -359,7 +349,7 @@ CREATE TABLE t_ds_process_instance (
history_cmd text ,
dependence_schedule_times text ,
process_instance_priority int DEFAULT NULL ,
- worker_group_id int DEFAULT '-1' ,
+ worker_group varchar(64) ,
timeout int DEFAULT '0' ,
tenant_id int NOT NULL DEFAULT '-1' ,
PRIMARY KEY (id)
@@ -505,9 +495,12 @@ CREATE TABLE t_ds_resources (
size bigint DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
+ pid int,
+ full_name varchar(64),
+ is_directory int,
PRIMARY KEY (id)
) ;
-;
+
--
-- Table structure for table t_ds_schedules
@@ -526,7 +519,7 @@ CREATE TABLE t_ds_schedules (
warning_type int NOT NULL ,
warning_group_id int DEFAULT NULL ,
process_instance_priority int DEFAULT NULL ,
- worker_group_id int DEFAULT '-1' ,
+ worker_group varchar(64),
create_time timestamp NOT NULL ,
update_time timestamp NOT NULL ,
PRIMARY KEY (id)
@@ -572,7 +565,8 @@ CREATE TABLE t_ds_task_instance (
retry_interval int DEFAULT NULL ,
max_retry_times int DEFAULT NULL ,
task_instance_priority int DEFAULT NULL ,
- worker_group_id int DEFAULT '-1' ,
+ worker_group varchar(64),
+ executor_id int DEFAULT NULL ,
PRIMARY KEY (id)
) ;
@@ -691,9 +685,6 @@ ALTER TABLE t_ds_command ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_command_id_se
DROP SEQUENCE IF EXISTS t_ds_datasource_id_sequence;
CREATE SEQUENCE t_ds_datasource_id_sequence;
ALTER TABLE t_ds_datasource ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_datasource_id_sequence');
-DROP SEQUENCE IF EXISTS t_ds_master_server_id_sequence;
-CREATE SEQUENCE t_ds_master_server_id_sequence;
-ALTER TABLE t_ds_master_server ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_master_server_id_sequence');
DROP SEQUENCE IF EXISTS t_ds_process_definition_id_sequence;
CREATE SEQUENCE t_ds_process_definition_id_sequence;
ALTER TABLE t_ds_process_definition ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_process_definition_id_sequence');
@@ -768,4 +759,4 @@ INSERT INTO t_ds_relation_user_alertgroup(alertgroup_id,user_id,create_time,upda
INSERT INTO t_ds_queue(queue_name,queue,create_time,update_time) VALUES ('default', 'default','2018-11-29 10:22:33', '2018-11-29 10:22:33');
-- Records of t_ds_queue,default queue name : default
-INSERT INTO t_ds_version(version) VALUES ('1.2.0');
\ No newline at end of file
+INSERT INTO t_ds_version(version) VALUES ('2.0.0');
\ No newline at end of file
diff --git a/dockerfile/conf/dolphinscheduler/conf/worker_logback.xml b/dockerfile/conf/dolphinscheduler/conf/worker_logback.xml
new file mode 100644
index 0000000000..1b09260334
--- /dev/null
+++ b/dockerfile/conf/dolphinscheduler/conf/worker_logback.xml
@@ -0,0 +1,79 @@
+
+
+
+
+
+
+
+
+
+ [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
+
+ UTF-8
+
+
+
+
+ INFO
+
+
+
+ taskAppId
+ ${log.base}
+
+
+
+ ${log.base}/${taskAppId}.log
+
+
+ [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
+
+ UTF-8
+
+ true
+
+
+
+
+
+ ${log.base}/dolphinscheduler-worker.log
+
+ INFO
+
+
+
+ ${log.base}/dolphinscheduler-worker.%d{yyyy-MM-dd_HH}.%i.log
+ 168
+ 200MB
+
+
+
+
+ [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
+
+ UTF-8
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java
index 66a63dcff4..e20b75a096 100644
--- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java
+++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java
@@ -55,7 +55,7 @@ public class MailUtils {
public static final Boolean MAIL_USE_SSL = PropertyUtils.getBoolean(Constants.MAIL_SMTP_SSL_ENABLE);
- public static final String XLS_FILE_PATH = PropertyUtils.getString(Constants.XLS_FILE_PATH);
+ public static final String xlsFilePath = PropertyUtils.getString(Constants.XLS_FILE_PATH,"/tmp/xls");
public static final String STARTTLS_ENABLE = PropertyUtils.getString(Constants.MAIL_SMTP_STARTTLS_ENABLE);
@@ -261,8 +261,8 @@ public class MailUtils {
// set attach file
MimeBodyPart part2 = new MimeBodyPart();
// make excel file
- ExcelUtils.genExcelFile(content,title, XLS_FILE_PATH);
- File file = new File(XLS_FILE_PATH + Constants.SINGLE_SLASH + title + Constants.EXCEL_SUFFIX_XLS);
+ ExcelUtils.genExcelFile(content,title, xlsFilePath);
+ File file = new File(xlsFilePath + Constants.SINGLE_SLASH + title + Constants.EXCEL_SUFFIX_XLS);
part2.attachFile(file);
part2.setFileName(MimeUtility.encodeText(title + Constants.EXCEL_SUFFIX_XLS,Constants.UTF_8,"B"));
// add components to collection
diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java
index c2f479d101..91f7261db2 100644
--- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java
+++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java
@@ -79,6 +79,18 @@ public class PropertyUtils {
return properties.getProperty(key.trim());
}
+ /**
+ * get property value
+ *
+ * @param key property name
+ * @param defaultVal default value
+ * @return property value
+ */
+ public static String getString(String key, String defaultVal) {
+ String val = properties.getProperty(key.trim());
+ return val == null ? defaultVal : val;
+ }
+
/**
* get property value
*
diff --git a/dolphinscheduler-alert/src/main/resources/alert.properties b/dolphinscheduler-alert/src/main/resources/alert.properties
index 9f5acea188..c359b27ff2 100644
--- a/dolphinscheduler-alert/src/main/resources/alert.properties
+++ b/dolphinscheduler-alert/src/main/resources/alert.properties
@@ -35,18 +35,18 @@ mail.smtp.ssl.enable=false
mail.smtp.ssl.trust=xxx.xxx.com
#xls file path,need create if not exist
-xls.file.path=/tmp/xls
+#xls.file.path=/tmp/xls
# Enterprise WeChat configuration
enterprise.wechat.enable=false
-enterprise.wechat.corp.id=xxxxxxx
-enterprise.wechat.secret=xxxxxxx
-enterprise.wechat.agent.id=xxxxxxx
-enterprise.wechat.users=xxxxxxx
-enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpId&corpsecret=$secret
-enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$token
-enterprise.wechat.team.send.msg={\"toparty\":\"$toParty\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"}
-enterprise.wechat.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"$msg\"}}
+#enterprise.wechat.corp.id=xxxxxxx
+#enterprise.wechat.secret=xxxxxxx
+#enterprise.wechat.agent.id=xxxxxxx
+#enterprise.wechat.users=xxxxxxx
+#enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpId&corpsecret=$secret
+#enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$token
+#enterprise.wechat.team.send.msg={\"toparty\":\"$toParty\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"}
+#enterprise.wechat.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"$msg\"}}
diff --git a/dolphinscheduler-alert/src/main/resources/logback-alert.xml b/dolphinscheduler-alert/src/main/resources/logback-alert.xml
new file mode 100644
index 0000000000..5d1c07858d
--- /dev/null
+++ b/dolphinscheduler-alert/src/main/resources/logback-alert.xml
@@ -0,0 +1,52 @@
+
+
+
+
+
+
+
+
+
+
+ [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
+
+ UTF-8
+
+
+
+
+ ${log.base}/dolphinscheduler-alert.log
+
+ ${log.base}/dolphinscheduler-alert.%d{yyyy-MM-dd_HH}.%i.log
+ 20
+ 64MB
+
+
+
+ [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
+
+ UTF-8
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
index 5998ec5a4d..e4817ddc18 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
@@ -21,13 +21,15 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.FilterType;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@SpringBootApplication
@ServletComponentScan
-@ComponentScan({"org.apache.dolphinscheduler.api",
- "org.apache.dolphinscheduler.dao",
- "org.apache.dolphinscheduler.service"})
+@ComponentScan(basePackages = {"org.apache.dolphinscheduler"},
+ excludeFilters = @ComponentScan.Filter(type = FilterType.REGEX,
+ pattern = "org.apache.dolphinscheduler.server.*"))
+
public class ApiApplicationServer extends SpringBootServletInitializer {
public static void main(String[] args) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AccessTokenController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AccessTokenController.java
index c03281df7e..8731b264e9 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AccessTokenController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AccessTokenController.java
@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.controller;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.AccessTokenService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
@@ -37,13 +38,14 @@ import springfox.documentation.annotations.ApiIgnore;
import java.util.Map;
import static org.apache.dolphinscheduler.api.enums.Status.*;
+
/**
* access token controller
*/
@Api(tags = "ACCESS_TOKEN_TAG", position = 1)
@RestController
@RequestMapping("/access-token")
-public class AccessTokenController extends BaseController{
+public class AccessTokenController extends BaseController {
private static final Logger logger = LoggerFactory.getLogger(AccessTokenController.class);
@@ -54,140 +56,125 @@ public class AccessTokenController extends BaseController{
/**
* create token
- * @param loginUser login user
- * @param userId token for user id
+ *
+ * @param loginUser login user
+ * @param userId token for user id
* @param expireTime expire time for the token
- * @param token token
+ * @param token token
* @return create result state code
*/
@ApiIgnore
@PostMapping(value = "/create")
@ResponseStatus(HttpStatus.CREATED)
+ @ApiException(CREATE_ACCESS_TOKEN_ERROR)
public Result createToken(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
- @RequestParam(value = "userId") int userId,
- @RequestParam(value = "expireTime") String expireTime,
- @RequestParam(value = "token") String token){
+ @RequestParam(value = "userId") int userId,
+ @RequestParam(value = "expireTime") String expireTime,
+ @RequestParam(value = "token") String token) {
logger.info("login user {}, create token , userId : {} , token expire time : {} , token : {}", loginUser.getUserName(),
- userId,expireTime,token);
-
- try {
- Map result = accessTokenService.createToken(userId, expireTime, token);
- return returnDataList(result);
- }catch (Exception e){
- logger.error(CREATE_ACCESS_TOKEN_ERROR.getMsg(),e);
- return error(CREATE_ACCESS_TOKEN_ERROR.getCode(), CREATE_ACCESS_TOKEN_ERROR.getMsg());
- }
+ userId, expireTime, token);
+
+ Map result = accessTokenService.createToken(userId, expireTime, token);
+ return returnDataList(result);
}
/**
* generate token string
- * @param loginUser login user
- * @param userId token for user
+ *
+ * @param loginUser login user
+ * @param userId token for user
* @param expireTime expire time
* @return token string
*/
@ApiIgnore
@PostMapping(value = "/generate")
@ResponseStatus(HttpStatus.CREATED)
+ @ApiException(GENERATE_TOKEN_ERROR)
public Result generateToken(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
- @RequestParam(value = "userId") int userId,
- @RequestParam(value = "expireTime") String expireTime){
- logger.info("login user {}, generate token , userId : {} , token expire time : {}",loginUser,userId,expireTime);
- try {
- Map result = accessTokenService.generateToken(userId, expireTime);
- return returnDataList(result);
- }catch (Exception e){
- logger.error(GENERATE_TOKEN_ERROR.getMsg(),e);
- return error(GENERATE_TOKEN_ERROR.getCode(), GENERATE_TOKEN_ERROR.getMsg());
- }
+ @RequestParam(value = "userId") int userId,
+ @RequestParam(value = "expireTime") String expireTime) {
+ logger.info("login user {}, generate token , userId : {} , token expire time : {}", loginUser, userId, expireTime);
+ Map result = accessTokenService.generateToken(userId, expireTime);
+ return returnDataList(result);
}
/**
* query access token list paging
*
* @param loginUser login user
- * @param pageNo page number
+ * @param pageNo page number
* @param searchVal search value
- * @param pageSize page size
+ * @param pageSize page size
* @return token list of page number and page size
*/
- @ApiOperation(value = "queryAccessTokenList", notes= "QUERY_ACCESS_TOKEN_LIST_NOTES")
+ @ApiOperation(value = "queryAccessTokenList", notes = "QUERY_ACCESS_TOKEN_LIST_NOTES")
@ApiImplicitParams({
- @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType ="String"),
+ @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType = "String"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"),
- @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType ="Int",example = "20")
+ @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType = "Int", example = "20")
})
- @GetMapping(value="/list-paging")
+ @GetMapping(value = "/list-paging")
@ResponseStatus(HttpStatus.OK)
+ @ApiException(QUERY_ACCESSTOKEN_LIST_PAGING_ERROR)
public Result queryAccessTokenList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
- @RequestParam("pageNo") Integer pageNo,
- @RequestParam(value = "searchVal", required = false) String searchVal,
- @RequestParam("pageSize") Integer pageSize){
+ @RequestParam("pageNo") Integer pageNo,
+ @RequestParam(value = "searchVal", required = false) String searchVal,
+ @RequestParam("pageSize") Integer pageSize) {
logger.info("login user {}, list access token paging, pageNo: {}, searchVal: {}, pageSize: {}",
- loginUser.getUserName(),pageNo,searchVal,pageSize);
- try{
- Map result = checkPageParams(pageNo, pageSize);
- if(result.get(Constants.STATUS) != Status.SUCCESS){
- return returnDataListPaging(result);
- }
- searchVal = ParameterUtils.handleEscapes(searchVal);
- result = accessTokenService.queryAccessTokenList(loginUser, searchVal, pageNo, pageSize);
+ loginUser.getUserName(), pageNo, searchVal, pageSize);
+
+ Map result = checkPageParams(pageNo, pageSize);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
return returnDataListPaging(result);
- }catch (Exception e){
- logger.error(QUERY_ACCESSTOKEN_LIST_PAGING_ERROR.getMsg(),e);
- return error(QUERY_ACCESSTOKEN_LIST_PAGING_ERROR.getCode(),QUERY_ACCESSTOKEN_LIST_PAGING_ERROR.getMsg());
}
+ searchVal = ParameterUtils.handleEscapes(searchVal);
+ result = accessTokenService.queryAccessTokenList(loginUser, searchVal, pageNo, pageSize);
+ return returnDataListPaging(result);
}
/**
* delete access token by id
+ *
* @param loginUser login user
- * @param id token id
+ * @param id token id
* @return delete result code
*/
@ApiIgnore
@PostMapping(value = "/delete")
@ResponseStatus(HttpStatus.OK)
+ @ApiException(DELETE_ACCESS_TOKEN_ERROR)
public Result delAccessTokenById(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
- @RequestParam(value = "id") int id) {
+ @RequestParam(value = "id") int id) {
logger.info("login user {}, delete access token, id: {},", loginUser.getUserName(), id);
- try {
- Map result = accessTokenService.delAccessTokenById(loginUser, id);
- return returnDataList(result);
- }catch (Exception e){
- logger.error(DELETE_ACCESS_TOKEN_ERROR.getMsg(),e);
- return error(Status.DELETE_ACCESS_TOKEN_ERROR.getCode(), Status.DELETE_ACCESS_TOKEN_ERROR.getMsg());
- }
+ Map result = accessTokenService.delAccessTokenById(loginUser, id);
+ return returnDataList(result);
}
/**
* update token
- * @param loginUser login user
- * @param id token id
- * @param userId token for user
+ *
+ * @param loginUser login user
+ * @param id token id
+ * @param userId token for user
* @param expireTime token expire time
- * @param token token string
+ * @param token token string
* @return update result code
*/
@ApiIgnore
@PostMapping(value = "/update")
@ResponseStatus(HttpStatus.OK)
+ @ApiException(UPDATE_ACCESS_TOKEN_ERROR)
public Result updateToken(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "id") int id,
@RequestParam(value = "userId") int userId,
@RequestParam(value = "expireTime") String expireTime,
- @RequestParam(value = "token") String token){
+ @RequestParam(value = "token") String token) {
logger.info("login user {}, update token , userId : {} , token expire time : {} , token : {}", loginUser.getUserName(),
- userId,expireTime,token);
-
- try {
- Map result = accessTokenService.updateToken(id,userId, expireTime, token);
- return returnDataList(result);
- }catch (Exception e){
- logger.error(UPDATE_ACCESS_TOKEN_ERROR.getMsg(),e);
- return error(UPDATE_ACCESS_TOKEN_ERROR.getCode(), UPDATE_ACCESS_TOKEN_ERROR.getMsg());
- }
+ userId, expireTime, token);
+
+ Map result = accessTokenService.updateToken(id, userId, expireTime, token);
+ return returnDataList(result);
}
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index ffedd5703c..046479d4cb 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -64,7 +64,7 @@ public class ExecutorController extends BaseController {
* @param receiversCc receivers cc
* @param runMode run mode
* @param processInstancePriority process instance priority
- * @param workerGroupId worker group id
+ * @param workerGroup worker group
* @param timeout timeout
* @return start process result code
*/
@@ -82,7 +82,7 @@ public class ExecutorController extends BaseController {
@ApiImplicitParam(name = "receiversCc", value = "RECEIVERS_CC",dataType ="String" ),
@ApiImplicitParam(name = "runMode", value = "RUN_MODE",dataType ="RunMode" ),
@ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority" ),
- @ApiImplicitParam(name = "workerGroupId", value = "WORKER_GROUP_ID", dataType = "Int",example = "100"),
+ @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String",example = "default"),
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int",example = "100"),
})
@PostMapping(value = "start-process-instance")
@@ -101,15 +101,15 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "receiversCc", required = false) String receiversCc,
@RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
- @RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId,
+ @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "timeout", required = false) Integer timeout) {
try {
logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, "
+ "failure policy: {}, node name: {}, node dep: {}, notify type: {}, "
- + "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, workerGroupId: {}, timeout: {}",
+ + "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, workerGroup: {}, timeout: {}",
loginUser.getUserName(), projectName, processDefinitionId, scheduleTime,
- failureStrategy, startNodeList, taskDependType, warningType, warningGroupId,receivers,receiversCc,runMode,processInstancePriority,
- workerGroupId, timeout);
+ failureStrategy, startNodeList, taskDependType, warningType, workerGroup,receivers,receiversCc,runMode,processInstancePriority,
+ workerGroup, timeout);
if (timeout == null) {
timeout = Constants.MAX_TASK_TIMEOUT;
@@ -117,7 +117,7 @@ public class ExecutorController extends BaseController {
Map result = execService.execProcessInstance(loginUser, projectName, processDefinitionId, scheduleTime, execType, failureStrategy,
startNodeList, taskDependType, warningType,
- warningGroupId,receivers,receiversCc, runMode,processInstancePriority, workerGroupId, timeout);
+ warningGroupId,receivers,receiversCc, runMode,processInstancePriority, workerGroup, timeout);
return returnDataList(result);
} catch (Exception e) {
logger.error(Status.START_PROCESS_INSTANCE_ERROR.getMsg(),e);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
index 802f09ff20..eefd6baa67 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
@@ -60,14 +60,14 @@ public class LoggerController extends BaseController {
*/
@ApiOperation(value = "queryLog", notes= "QUERY_TASK_INSTANCE_LOG_NOTES")
@ApiImplicitParams({
- @ApiImplicitParam(name = "taskInstId", value = "TASK_ID", dataType = "Int", example = "100"),
+ @ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "skipLineNum", value = "SKIP_LINE_NUM", dataType ="Int", example = "100"),
@ApiImplicitParam(name = "limit", value = "LIMIT", dataType ="Int", example = "100")
})
@GetMapping(value = "/detail")
@ResponseStatus(HttpStatus.OK)
public Result queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
- @RequestParam(value = "taskInstId") int taskInstanceId,
+ @RequestParam(value = "taskInstanceId") int taskInstanceId,
@RequestParam(value = "skipLineNum") int skipNum,
@RequestParam(value = "limit") int limit) {
try {
@@ -91,12 +91,12 @@ public class LoggerController extends BaseController {
*/
@ApiOperation(value = "downloadTaskLog", notes= "DOWNLOAD_TASK_INSTANCE_LOG_NOTES")
@ApiImplicitParams({
- @ApiImplicitParam(name = "taskInstId", value = "TASK_ID",dataType = "Int", example = "100")
+ @ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID",dataType = "Int", example = "100")
})
@GetMapping(value = "/download-log")
@ResponseBody
public ResponseEntity downloadTaskLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
- @RequestParam(value = "taskInstId") int taskInstanceId) {
+ @RequestParam(value = "taskInstanceId") int taskInstanceId) {
try {
byte[] logBytes = loggerService.getLogBytes(taskInstanceId);
return ResponseEntity
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
index 5faf7a4f14..9b47cb54e4 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
@@ -26,8 +26,6 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import io.swagger.annotations.*;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -242,8 +240,7 @@ public class ProcessInstanceController extends BaseController{
logger.info("delete process instance by id, login user:{}, project name:{}, process instance id:{}",
loginUser.getUserName(), projectName, processInstanceId);
// task queue
- ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
- Map result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue);
+ Map result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId);
return returnDataList(result);
}catch (Exception e){
logger.error(DELETE_PROCESS_INSTANCE_BY_ID_ERROR.getMsg(),e);
@@ -372,7 +369,6 @@ public class ProcessInstanceController extends BaseController{
logger.info("delete process instance by ids, login user:{}, project name:{}, process instance ids :{}",
loginUser.getUserName(), projectName, processInstanceIds);
// task queue
- ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
Map result = new HashMap<>(5);
List deleteFailedIdList = new ArrayList<>();
if(StringUtils.isNotEmpty(processInstanceIds)){
@@ -381,7 +377,7 @@ public class ProcessInstanceController extends BaseController{
for (String strProcessInstanceId:processInstanceIdArray) {
int processInstanceId = Integer.parseInt(strProcessInstanceId);
try {
- Map deleteResult = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue);
+ Map deleteResult = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId);
if(!Status.SUCCESS.equals(deleteResult.get(Constants.STATUS))){
deleteFailedIdList.add(strProcessInstanceId);
logger.error((String)deleteResult.get(Constants.MSG));
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
index d246a0f73a..90b4bca81b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
@@ -70,7 +70,7 @@ public class SchedulerController extends BaseController {
* @param processInstancePriority process instance priority
* @param receivers receivers
* @param receiversCc receivers cc
- * @param workerGroupId worker group id
+ * @param workerGroup worker group
* @return create result code
*/
@ApiOperation(value = "createSchedule", notes= "CREATE_SCHEDULE_NOTES")
@@ -96,15 +96,15 @@ public class SchedulerController extends BaseController {
@RequestParam(value = "failureStrategy", required = false, defaultValue = DEFAULT_FAILURE_POLICY) FailureStrategy failureStrategy,
@RequestParam(value = "receivers", required = false) String receivers,
@RequestParam(value = "receiversCc", required = false) String receiversCc,
- @RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId,
+ @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) {
logger.info("login user {}, project name: {}, process name: {}, create schedule: {}, warning type: {}, warning group id: {}," +
"failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {}, workGroupId:{}",
loginUser.getUserName(), projectName, processDefinitionId, schedule, warningType, warningGroupId,
- failureStrategy, receivers, receiversCc, processInstancePriority, workerGroupId);
+ failureStrategy, receivers, receiversCc, processInstancePriority, workerGroup);
try {
Map result = schedulerService.insertSchedule(loginUser, projectName, processDefinitionId, schedule,
- warningType, warningGroupId, failureStrategy, receivers, receiversCc, processInstancePriority, workerGroupId);
+ warningType, warningGroupId, failureStrategy, receivers, receiversCc, processInstancePriority, workerGroup);
return returnDataList(result);
} catch (Exception e) {
@@ -124,7 +124,7 @@ public class SchedulerController extends BaseController {
* @param warningGroupId warning group id
* @param failureStrategy failure strategy
* @param receivers receivers
- * @param workerGroupId worker group id
+ * @param workerGroup worker group
* @param processInstancePriority process instance priority
* @param receiversCc receivers cc
* @return update result code
@@ -151,16 +151,16 @@ public class SchedulerController extends BaseController {
@RequestParam(value = "failureStrategy", required = false, defaultValue = "END") FailureStrategy failureStrategy,
@RequestParam(value = "receivers", required = false) String receivers,
@RequestParam(value = "receiversCc", required = false) String receiversCc,
- @RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId,
+ @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) {
logger.info("login user {}, project name: {},id: {}, updateProcessInstance schedule: {}, notify type: {}, notify mails: {}, " +
"failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {},workerGroupId:{}",
loginUser.getUserName(), projectName, id, schedule, warningType, warningGroupId, failureStrategy,
- receivers, receiversCc, processInstancePriority, workerGroupId);
+ receivers, receiversCc, processInstancePriority, workerGroup);
try {
Map result = schedulerService.updateSchedule(loginUser, projectName, id, schedule,
- warningType, warningGroupId, failureStrategy, receivers, receiversCc, null, processInstancePriority, workerGroupId);
+ warningType, warningGroupId, failureStrategy, receivers, receiversCc, null, processInstancePriority, workerGroup);
return returnDataList(result);
} catch (Exception e) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java
index 8ec1335442..d7c898a29f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java
@@ -27,6 +27,7 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
+import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -34,6 +35,7 @@ import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import springfox.documentation.annotations.ApiIgnore;
+import java.util.List;
import java.util.Map;
/**
@@ -46,7 +48,6 @@ public class WorkerGroupController extends BaseController{
private static final Logger logger = LoggerFactory.getLogger(WorkerGroupController.class);
-
@Autowired
WorkerGroupService workerGroupService;
@@ -135,6 +136,7 @@ public class WorkerGroupController extends BaseController{
loginUser.getUserName() );
try {
+
Map result = workerGroupService.queryAllGroup();
return returnDataList(result);
}catch (Exception e){
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java
index bdb9e1f576..61e3752c69 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java
@@ -96,11 +96,6 @@ public class ProcessMeta {
*/
private String scheduleProcessInstancePriority;
- /**
- * worker group id
- */
- private Integer scheduleWorkerGroupId;
-
/**
* worker group name
*/
@@ -226,14 +221,6 @@ public class ProcessMeta {
this.scheduleProcessInstancePriority = scheduleProcessInstancePriority;
}
- public Integer getScheduleWorkerGroupId() {
- return scheduleWorkerGroupId;
- }
-
- public void setScheduleWorkerGroupId(int scheduleWorkerGroupId) {
- this.scheduleWorkerGroupId = scheduleWorkerGroupId;
- }
-
public String getScheduleWorkerGroupName() {
return scheduleWorkerGroupName;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 416dc0ef54..3e5147bd5c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -27,6 +27,8 @@ public enum Status {
SUCCESS(0, "success", "成功"),
+ INTERNAL_SERVER_ERROR_ARGS(10000, "Internal Server Error: {0}", "服务端异常: {0}"),
+
REQUEST_PARAMS_NOT_VALID_ERROR(10001, "request parameter {0} is not valid", "请求参数[{0}]无效"),
TASK_TIMEOUT_PARAMS_ERROR(10002, "task timeout parameter is not valid", "任务超时参数无效"),
USER_NAME_EXIST(10003, "user name already exists", "用户名已存在"),
@@ -190,6 +192,7 @@ public enum Status {
UDF_RESOURCE_IS_BOUND(20013, "udf resource file is bound by UDF functions:{0}","udf函数绑定了资源文件[{0}]"),
RESOURCE_IS_USED(20014, "resource file is used by process definition","资源文件被上线的流程定义使用了"),
PARENT_RESOURCE_NOT_EXIST(20015, "parent resource not exist","父资源文件不存在"),
+ RESOURCE_NOT_EXIST_OR_NO_PERMISSION(20016, "resource not exist or no permission,please view the task node and remove error resource","请检查任务节点并移除无权限或者已删除的资源"),
USER_NO_OPERATION_PERM(30001, "user has no operation privilege", "当前用户没有操作权限"),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ApiException.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ApiException.java
new file mode 100644
index 0000000000..3c094f5294
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ApiException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.api.exceptions;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * controller exception annotation
+ */
+@Retention(RUNTIME)
+@Target(METHOD)
+public @interface ApiException {
+ Status value();
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ApiExceptionHandler.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ApiExceptionHandler.java
new file mode 100644
index 0000000000..c00c443bf9
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ApiExceptionHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.api.exceptions;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.web.bind.annotation.ControllerAdvice;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.method.HandlerMethod;
+
+/**
+ * Exception Handler
+ */
+@ControllerAdvice
+@ResponseBody
+public class ApiExceptionHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(ApiExceptionHandler.class);
+
+ @ExceptionHandler(Exception.class)
+ public Result exceptionHandler(Exception e, HandlerMethod hm) {
+ logger.error(e.getMessage(), e);
+ ApiException ce = hm.getMethodAnnotation(ApiException.class);
+ if (ce == null) {
+ return Result.errorWithArgs(Status.INTERNAL_SERVER_ERROR_ARGS, e.getMessage());
+ }
+ Status st = ce.value();
+ return Result.error(st);
+ }
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/AccessTokenService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/AccessTokenService.java
index 897646ba70..5d176961bb 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/AccessTokenService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/AccessTokenService.java
@@ -83,6 +83,9 @@ public class AccessTokenService extends BaseService {
public Map createToken(int userId, String expireTime, String token) {
Map result = new HashMap<>(5);
+ if (userId <= 0) {
+ throw new IllegalArgumentException("User id should not less than or equals to 0.");
+ }
AccessToken accessToken = new AccessToken();
accessToken.setUserId(userId);
accessToken.setExpireTime(DateUtils.stringToDate(expireTime));
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java
index bafe833fab..0c93e00a80 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java
@@ -29,8 +29,6 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -318,9 +316,8 @@ public class DataAnalysisService extends BaseService{
return result;
}
- ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
- List tasksQueueList = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
- List tasksKillList = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_KILL);
+ List tasksQueueList = new ArrayList<>();
+ List tasksKillList = new ArrayList<>();
Map dataMap = new HashMap<>();
if (loginUser.getUserType() == UserType.ADMIN_USER){
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index dda960d2e6..7ce7497e98 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -85,7 +85,7 @@ public class ExecutorService extends BaseService{
* @param receivers receivers
* @param receiversCc receivers cc
* @param processInstancePriority process instance priority
- * @param workerGroupId worker group id
+ * @param workerGroup worker group name
* @param runMode run mode
* @param timeout timeout
* @return execute process instance code
@@ -96,7 +96,7 @@ public class ExecutorService extends BaseService{
FailureStrategy failureStrategy, String startNodeList,
TaskDependType taskDependType, WarningType warningType, int warningGroupId,
String receivers, String receiversCc, RunMode runMode,
- Priority processInstancePriority, int workerGroupId, Integer timeout) throws ParseException {
+ Priority processInstancePriority, String workerGroup, Integer timeout) throws ParseException {
Map result = new HashMap<>(5);
// timeout is invalid
if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) {
@@ -128,7 +128,7 @@ public class ExecutorService extends BaseService{
*/
int create = this.createCommand(commandType, processDefinitionId,
taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(),
- warningGroupId, runMode,processInstancePriority, workerGroupId);
+ warningGroupId, runMode,processInstancePriority, workerGroup);
if(create > 0 ){
/**
* according to the process definition ID updateProcessInstance and CC recipient
@@ -435,25 +435,26 @@ public class ExecutorService extends BaseService{
/**
* create command
- *
- * @param commandType
- * @param processDefineId
- * @param nodeDep
- * @param failureStrategy
- * @param startNodeList
- * @param schedule
- * @param warningType
- * @param excutorId
- * @param warningGroupId
- * @param runMode
- * @return
+ * @param commandType commandType
+ * @param processDefineId processDefineId
+ * @param nodeDep nodeDep
+ * @param failureStrategy failureStrategy
+ * @param startNodeList startNodeList
+ * @param schedule schedule
+ * @param warningType warningType
+ * @param executorId executorId
+ * @param warningGroupId warningGroupId
+ * @param runMode runMode
+ * @param processInstancePriority processInstancePriority
+ * @param workerGroup workerGroup
+ * @return command id
* @throws ParseException
*/
private int createCommand(CommandType commandType, int processDefineId,
TaskDependType nodeDep, FailureStrategy failureStrategy,
String startNodeList, String schedule, WarningType warningType,
- int excutorId, int warningGroupId,
- RunMode runMode,Priority processInstancePriority, int workerGroupId){
+ int executorId, int warningGroupId,
+ RunMode runMode,Priority processInstancePriority, String workerGroup) throws ParseException {
/**
* instantiate command schedule instance
@@ -481,10 +482,10 @@ public class ExecutorService extends BaseService{
command.setWarningType(warningType);
}
command.setCommandParam(JSONUtils.toJson(cmdParam));
- command.setExecutorId(excutorId);
+ command.setExecutorId(executorId);
command.setWarningGroupId(warningGroupId);
command.setProcessInstancePriority(processInstancePriority);
- command.setWorkerGroupId(workerGroupId);
+ command.setWorkerGroup(workerGroup);
Date start = null;
Date end = null;
@@ -541,7 +542,7 @@ public class ExecutorService extends BaseService{
processDefineId, schedule);
}
}else{
-
+ command.setCommandParam(JSONUtils.toJson(cmdParam));
return processService.createCommand(command);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
index 1f65208240..91316af455 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
@@ -68,7 +69,7 @@ public class LoggerService {
return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg());
}
- String host = taskInstance.getHost();
+ String host = Host.of(taskInstance.getHost()).getIp();
if(StringUtils.isEmpty(host)){
return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg());
}
@@ -94,7 +95,7 @@ public class LoggerService {
if (taskInstance == null){
throw new RuntimeException("task instance is null");
}
- String host = taskInstance.getHost();
+ String host = Host.of(taskInstance.getHost()).getIp();
return logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath());
}
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index 0639aba113..eed9c78e74 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -44,6 +44,7 @@ import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
+import org.apache.dolphinscheduler.service.permission.PermissionCheck;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -143,6 +144,7 @@ public class ProcessDefinitionService extends BaseDAGService {
processDefine.setTimeout(processData.getTimeout());
processDefine.setTenantId(processData.getTenantId());
processDefine.setModifyBy(loginUser.getUserName());
+ processDefine.setResourceIds(getResourceIds(processData));
//custom global params
List globalParamsList = processData.getGlobalParams();
@@ -333,6 +335,7 @@ public class ProcessDefinitionService extends BaseDAGService {
processDefine.setTimeout(processData.getTimeout());
processDefine.setTenantId(processData.getTenantId());
processDefine.setModifyBy(loginUser.getUserName());
+ processDefine.setResourceIds(getResourceIds(processData));
//custom global params
List globalParamsList = new ArrayList<>();
@@ -476,6 +479,20 @@ public class ProcessDefinitionService extends BaseDAGService {
switch (state) {
case ONLINE:
+ // To check resources whether they are already cancel authorized or deleted
+ String resourceIds = processDefinition.getResourceIds();
+ if (StringUtils.isNotBlank(resourceIds)) {
+ Integer[] resourceIdArray = Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new);
+ PermissionCheck permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID,processService,resourceIdArray,loginUser.getId(),logger);
+ try {
+ permissionCheck.checkPermission();
+ } catch (Exception e) {
+ logger.error(e.getMessage(),e);
+ putMsg(result, Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION, "releaseState");
+ return result;
+ }
+ }
+
processDefinition.setReleaseState(state);
processDefineMapper.updateById(processDefinition);
break;
@@ -580,13 +597,13 @@ public class ProcessDefinitionService extends BaseDAGService {
List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
if (!schedules.isEmpty()) {
Schedule schedule = schedules.get(0);
- WorkerGroup workerGroup = workerGroupMapper.selectById(schedule.getWorkerGroupId());
+ /*WorkerGroup workerGroup = workerGroupMapper.selectById(schedule.getWorkerGroupId());
if (null == workerGroup && schedule.getWorkerGroupId() == -1) {
workerGroup = new WorkerGroup();
workerGroup.setId(-1);
workerGroup.setName("");
- }
+ }*/
exportProcessMeta.setScheduleWarningType(schedule.getWarningType().toString());
exportProcessMeta.setScheduleWarningGroupId(schedule.getWarningGroupId());
@@ -596,11 +613,7 @@ public class ProcessDefinitionService extends BaseDAGService {
exportProcessMeta.setScheduleFailureStrategy(String.valueOf(schedule.getFailureStrategy()));
exportProcessMeta.setScheduleReleaseState(String.valueOf(ReleaseState.OFFLINE));
exportProcessMeta.setScheduleProcessInstancePriority(String.valueOf(schedule.getProcessInstancePriority()));
-
- if (null != workerGroup) {
- exportProcessMeta.setScheduleWorkerGroupId(workerGroup.getId());
- exportProcessMeta.setScheduleWorkerGroupName(workerGroup.getName());
- }
+ exportProcessMeta.setScheduleWorkerGroupName(schedule.getWorkerGroup());
}
//create workflow json file
return JSONUtils.toJsonString(exportProcessMeta);
@@ -799,15 +812,9 @@ public class ProcessDefinitionService extends BaseDAGService {
if (null != processMeta.getScheduleProcessInstancePriority()) {
scheduleObj.setProcessInstancePriority(Priority.valueOf(processMeta.getScheduleProcessInstancePriority()));
}
- if (null != processMeta.getScheduleWorkerGroupId()) {
- scheduleObj.setWorkerGroupId(processMeta.getScheduleWorkerGroupId());
- } else {
- if (null != processMeta.getScheduleWorkerGroupName()) {
- List workerGroups = workerGroupMapper.queryWorkerGroupByName(processMeta.getScheduleWorkerGroupName());
- if(CollectionUtils.isNotEmpty(workerGroups)){
- scheduleObj.setWorkerGroupId(workerGroups.get(0).getId());
- }
- }
+
+ if (null != processMeta.getScheduleWorkerGroupName()) {
+ scheduleObj.setWorkerGroup(processMeta.getScheduleWorkerGroupName());
}
return scheduleMapper.insert(scheduleObj);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index ab5580cc3e..b01a706ff7 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -39,7 +39,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -116,18 +115,7 @@ public class ProcessInstanceService extends BaseDAGService {
return checkResult;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
- String workerGroupName = "";
- if(processInstance.getWorkerGroupId() == -1){
- workerGroupName = DEFAULT;
- }else{
- WorkerGroup workerGroup = workerGroupMapper.selectById(processInstance.getWorkerGroupId());
- if(workerGroup != null){
- workerGroupName = workerGroup.getName();
- }else{
- workerGroupName = DEFAULT;
- }
- }
- processInstance.setWorkerGroupName(workerGroupName);
+
ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
processInstance.setReceivers(processDefinition.getReceivers());
processInstance.setReceiversCc(processDefinition.getReceiversCc());
@@ -475,11 +463,10 @@ public class ProcessInstanceService extends BaseDAGService {
* @param loginUser login user
* @param projectName project name
* @param processInstanceId process instance id
- * @param tasksQueue task queue
* @return delete result code
*/
@Transactional(rollbackFor = Exception.class)
- public Map deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId, ITaskQueue tasksQueue) {
+ public Map deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId) {
Map result = new HashMap<>(5);
Project project = projectMapper.queryByName(projectName);
@@ -497,51 +484,7 @@ public class ProcessInstanceService extends BaseDAGService {
return result;
}
- //process instance priority
- int processInstancePriority = processInstance.getProcessInstancePriority().ordinal();
- // delete zk queue
- if (CollectionUtils.isNotEmpty(taskInstanceList)){
- for (TaskInstance taskInstance : taskInstanceList){
- // task instance priority
- int taskInstancePriority = taskInstance.getTaskInstancePriority().ordinal();
-
- StringBuilder nodeValueSb = new StringBuilder(100);
- nodeValueSb.append(processInstancePriority)
- .append(UNDERLINE)
- .append(processInstanceId)
- .append(UNDERLINE)
- .append(taskInstancePriority)
- .append(UNDERLINE)
- .append(taskInstance.getId())
- .append(UNDERLINE);
-
- int taskWorkerGroupId = processService.getTaskWorkerGroupId(taskInstance);
- WorkerGroup workerGroup = workerGroupMapper.selectById(taskWorkerGroupId);
-
- if(workerGroup == null){
- nodeValueSb.append(DEFAULT_WORKER_ID);
- }else {
-
- String ips = workerGroup.getIpList();
- StringBuilder ipSb = new StringBuilder(100);
- String[] ipArray = ips.split(COMMA);
-
- for (String ip : ipArray) {
- long ipLong = IpUtils.ipToLong(ip);
- ipSb.append(ipLong).append(COMMA);
- }
-
- if(ipSb.length() > 0) {
- ipSb.deleteCharAt(ipSb.length() - 1);
- }
- nodeValueSb.append(ipSb);
- }
- logger.info("delete task queue node : {}",nodeValueSb);
- tasksQueue.removeNode(org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_QUEUE, nodeValueSb.toString());
-
- }
- }
// delete database cascade
int delete = processService.deleteWorkProcessInstanceById(processInstanceId);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
index ff87aadbc7..c98b7c31b9 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
@@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -176,6 +177,21 @@ public class ResourcesService extends BaseService {
putMsg(result, Status.HDFS_NOT_STARTUP);
return result;
}
+
+ if (pid != -1) {
+ Resource parentResource = resourcesMapper.selectById(pid);
+
+ if (parentResource == null) {
+ putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
+ return result;
+ }
+
+ if (!hasPerm(loginUser, parentResource.getUserId())) {
+ putMsg(result, Status.USER_NO_OPERATION_PERM);
+ return result;
+ }
+ }
+
// file is empty
if (file.isEmpty()) {
logger.error("file is empty: {}", file.getOriginalFilename());
@@ -416,6 +432,14 @@ public class ResourcesService extends BaseService {
if (isAdmin(loginUser)) {
userId= 0;
}
+ if (direcotryId != -1) {
+ Resource directory = resourcesMapper.selectById(direcotryId);
+ if (directory == null) {
+ putMsg(result, Status.RESOURCE_NOT_EXIST);
+ return result;
+ }
+ }
+
IPage resourceIPage = resourcesMapper.queryResourcePaging(page,
userId,direcotryId, type.ordinal(), searchVal);
PageInfo pageInfo = new PageInfo(pageNo, pageSize);
@@ -505,8 +529,12 @@ public class ResourcesService extends BaseService {
Map result = new HashMap<>(5);
- Set allResourceList = getAllResources(loginUser, type);
- Visitor resourceTreeVisitor = new ResourceTreeVisitor(new ArrayList<>(allResourceList));
+ int userId = loginUser.getId();
+ if(isAdmin(loginUser)){
+ userId = 0;
+ }
+ List allResourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal(),0);
+ Visitor resourceTreeVisitor = new ResourceTreeVisitor(allResourceList);
//JSONArray jsonArray = JSON.parseArray(JSON.toJSONString(resourceTreeVisitor.visit().getChildren(), SerializerFeature.SortField));
result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren());
putMsg(result,Status.SUCCESS);
@@ -519,7 +547,7 @@ public class ResourcesService extends BaseService {
* @param loginUser login user
* @return all resource set
*/
- private Set getAllResources(User loginUser, ResourceType type) {
+ /*private Set getAllResources(User loginUser, ResourceType type) {
int userId = loginUser.getId();
boolean listChildren = true;
if(isAdmin(loginUser)){
@@ -540,7 +568,7 @@ public class ResourcesService extends BaseService {
}
}
return allResourceList;
- }
+ }*/
/**
* query resource list
@@ -553,7 +581,7 @@ public class ResourcesService extends BaseService {
Map result = new HashMap<>(5);
- Set allResourceList = getAllResources(loginUser, type);
+ List allResourceList = resourcesMapper.queryResourceListAuthored(loginUser.getId(), type.ordinal(),0);
List resources = new ResourceFilter(".jar",new ArrayList<>(allResourceList)).filter();
Visitor resourceTreeVisitor = new ResourceTreeVisitor(resources);
result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren());
@@ -592,15 +620,6 @@ public class ResourcesService extends BaseService {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
- //if resource type is UDF,need check whether it is bound by UDF functon
- if (resource.getType() == (ResourceType.UDF)) {
- List udfFuncs = udfFunctionMapper.listUdfByResourceId(new int[]{resourceId});
- if (CollectionUtils.isNotEmpty(udfFuncs)) {
- logger.error("can't be deleted,because it is bound by UDF functions:{}",udfFuncs.toString());
- putMsg(result,Status.UDF_RESOURCE_IS_BOUND,udfFuncs.get(0).getFuncName());
- return result;
- }
- }
String tenantCode = getTenantCode(resource.getUserId(),result);
if (StringUtils.isEmpty(tenantCode)){
@@ -608,10 +627,22 @@ public class ResourcesService extends BaseService {
}
// get all resource id of process definitions those is released
- Map> resourceProcessMap = getResourceProcessMap();
+ List