diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java index 6e32d12df3..362c6130f4 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java @@ -61,4 +61,8 @@ public class StringUtils { public static String trim(String str) { return str == null ? null : str.trim(); } + + public static boolean equalsIgnoreCase(String str1, String str2) { + return str1 == null ? str2 == null : str1.equalsIgnoreCase(str2); + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java index ddf1fecf76..484b837d70 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.service.bean; import org.springframework.beans.BeansException; @@ -31,9 +32,7 @@ public class SpringApplicationContext implements ApplicationContextAware { SpringApplicationContext.applicationContext = applicationContext; } - public static T getBean(Class requiredType){ + public static T getBean(Class requiredType) { return applicationContext.getBean(requiredType); } - - } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/ServiceException.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/ServiceException.java new file mode 100644 index 0000000000..4465970678 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/ServiceException.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.exceptions; + +/** + * Custom ZKServerException exception + */ +public class ServiceException extends RuntimeException { + + /** + * Construct a new runtime exception with the error message + * + * @param errMsg Error message + */ + public ServiceException(String errMsg) { + super(errMsg); + } + + /** + * Construct a new runtime exception with the cause + * + * @param cause cause + */ + public ServiceException(Throwable cause) { + super(cause); + } + + /** + * Construct a new runtime exception with the detail message and cause + * + * @param errMsg message + * @param cause cause + */ + public ServiceException(String errMsg, Throwable cause) { + super(errMsg, cause); + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java index 1a9295bb10..a8f73f0c97 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.service.permission; import org.apache.dolphinscheduler.common.enums.AuthorizationType; @@ -21,11 +22,13 @@ import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.slf4j.Logger; import java.util.List; +import org.slf4j.Logger; + public class PermissionCheck { /** * logger @@ -58,8 +61,9 @@ public class PermissionCheck { /** * permission check + * * @param authorizationType authorization type - * @param processService process dao + * @param processService process dao */ public PermissionCheck(AuthorizationType authorizationType, ProcessService processService) { this.authorizationType = authorizationType; @@ -68,10 +72,6 @@ public class PermissionCheck { /** * permission check - * @param authorizationType - * @param processService - * @param needChecks - * @param userId */ public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, T[] needChecks, int userId) { this.authorizationType = authorizationType; @@ -82,11 +82,6 @@ public class PermissionCheck { /** * permission check - * @param authorizationType - * @param processService - * @param needChecks - * @param userId - * @param logger */ public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, T[] needChecks, int userId, Logger logger) { this.authorizationType = authorizationType; @@ -98,13 +93,8 @@ public class PermissionCheck { /** * permission check - * @param logger - * @param authorizationType - * @param processService - * @param resourceList - * @param userId */ - public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, List resourceList, int userId,Logger logger) { + public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, List resourceList, int userId, Logger logger) { this.authorizationType = authorizationType; this.processService = processService; this.resourceList = resourceList; @@ -154,9 +144,10 @@ public class PermissionCheck { /** * has permission + * * @return true if has permission */ - public boolean hasPermission(){ + public boolean hasPermission() { try { checkPermission(); return true; @@ -167,23 +158,24 @@ public class PermissionCheck { /** * check permission - * @throws Exception exception + * + * @throws ServiceException exception */ - public void checkPermission() throws Exception{ - if(this.needChecks.length > 0){ + public void checkPermission() throws ServiceException { + if (this.needChecks.length > 0) { // get user type in order to judge whether the user is admin User user = processService.getUserById(userId); if (user == null) { - logger.error("user id {} didn't exist",userId); - throw new RuntimeException(String.format("user %s didn't exist",userId)); + logger.error("user id {} doesn't exist", userId); + throw new ServiceException(String.format("user %s doesn't exist", userId)); } - if (user.getUserType() != UserType.ADMIN_USER){ - List unauthorizedList = processService.listUnauthorized(userId,needChecks,authorizationType); + if (user.getUserType() != UserType.ADMIN_USER) { + List unauthorizedList = processService.listUnauthorized(userId, needChecks, authorizationType); // if exist unauthorized resource - if(CollectionUtils.isNotEmpty(unauthorizedList)){ - logger.error("user {} didn't has permission of {}: {}", user.getUserName(), authorizationType.getDescp(),unauthorizedList); - throw new RuntimeException(String.format("user %s didn't has permission of %s %s", user.getUserName(), authorizationType.getDescp(), unauthorizedList.get(0))); + if (CollectionUtils.isNotEmpty(unauthorizedList)) { + logger.error("user {} doesn't have permission of {}: {}", user.getUserName(), authorizationType.getDescp(), unauthorizedList); + throw new ServiceException(String.format("user %s doesn't have permission of %s %s", user.getUserName(), authorizationType.getDescp(), unauthorizedList.get(0))); } } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index cfe649d74f..7c305097ff 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -84,6 +84,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; import java.util.Date; +import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -110,11 +111,11 @@ public class ProcessService { private final Logger logger = LoggerFactory.getLogger(getClass()); - private final int[] stateArray = new int[] {ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), - ExecutionStatus.RUNNING_EXECUTION.ordinal(), - ExecutionStatus.DELAY_EXECUTION.ordinal(), - ExecutionStatus.READY_PAUSE.ordinal(), - ExecutionStatus.READY_STOP.ordinal()}; + private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), + ExecutionStatus.RUNNING_EXECUTION.ordinal(), + ExecutionStatus.DELAY_EXECUTION.ordinal(), + ExecutionStatus.READY_PAUSE.ordinal(), + ExecutionStatus.READY_STOP.ordinal()}; @Autowired private UserMapper userMapper; @@ -158,16 +159,16 @@ public class ProcessService { /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction * - * @param logger logger - * @param host host + * @param logger logger + * @param host host * @param validThreadNum validThreadNum - * @param command found command + * @param command found command * @return process instance */ @Transactional(rollbackFor = Exception.class) public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) { ProcessInstance processInstance = constructProcessInstance(command, host); - //cannot construct process instance, return null; + // cannot construct process instance, return null if (processInstance == null) { logger.error("scan command, command parameter is error: {}", command); moveToErrorCommand(command, "process instance is null"); @@ -201,7 +202,7 @@ public class ProcessService { /** * set process waiting thread * - * @param command command + * @param command command * @param processInstance processInstance * @return process instance */ @@ -219,7 +220,7 @@ public class ProcessService { /** * check thread num * - * @param command command + * @param command command * @param validThreadNum validThreadNum * @return if thread is enough */ @@ -259,7 +260,7 @@ public class ProcessService { */ public Boolean verifyIsNeedCreateCommand(Command command) { Boolean isNeedCreate = true; - Map cmdTypeMap = new HashMap(); + EnumMap cmdTypeMap = new EnumMap<>(CommandType.class); cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1); cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1); cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1); @@ -296,9 +297,6 @@ public class ProcessService { /** * get task node list by definitionId - * - * @param defineId - * @return */ public List getTaskNodeListByDefinitionId(Integer defineId) { ProcessDefinition processDefinition = processDefineMapper.selectById(defineId); @@ -425,7 +423,7 @@ public class ProcessService { * recursive query sub process definition id by parent id. * * @param parentId parentId - * @param ids ids + * @param ids ids */ public void recurseFindSubProcessId(int parentId, List ids) { ProcessDefinition processDefinition = processDefineMapper.selectById(parentId); @@ -435,7 +433,7 @@ public class ProcessService { List taskNodeList = processData.getTasks(); - if (taskNodeList != null && taskNodeList.size() > 0) { + if (taskNodeList != null && !taskNodeList.isEmpty()) { for (TaskNode taskNode : taskNodeList) { String parameter = taskNode.getParams(); @@ -456,7 +454,7 @@ public class ProcessService { * create recovery waiting thread command and delete origin command at the same time. * if the recovery command is exists, only update the field update_time * - * @param originCommand originCommand + * @param originCommand originCommand * @param processInstance processInstance */ public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) { @@ -473,17 +471,17 @@ public class ProcessService { // process instance quit by "waiting thread" state if (originCommand == null) { Command command = new Command( - CommandType.RECOVER_WAITTING_THREAD, - processInstance.getTaskDependType(), - processInstance.getFailureStrategy(), - processInstance.getExecutorId(), - processInstance.getProcessDefinitionId(), - JSONUtils.toJsonString(cmdParam), - processInstance.getWarningType(), - processInstance.getWarningGroupId(), - processInstance.getScheduleTime(), - processInstance.getWorkerGroup(), - processInstance.getProcessInstancePriority() + CommandType.RECOVER_WAITTING_THREAD, + processInstance.getTaskDependType(), + processInstance.getFailureStrategy(), + processInstance.getExecutorId(), + processInstance.getProcessDefinitionId(), + JSONUtils.toJsonString(cmdParam), + processInstance.getWarningType(), + processInstance.getWarningGroupId(), + processInstance.getScheduleTime(), + processInstance.getWorkerGroup(), + processInstance.getProcessInstancePriority() ); saveCommand(command); return; @@ -508,16 +506,14 @@ public class ProcessService { /** * get schedule time from command * - * @param command command + * @param command command * @param cmdParam cmdParam map * @return date */ private Date getScheduleTime(Command command, Map cmdParam) { Date scheduleTime = command.getScheduleTime(); - if (scheduleTime == null) { - if (cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) { - scheduleTime = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); - } + if (scheduleTime == null && cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) { + scheduleTime = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); } return scheduleTime; } @@ -526,8 +522,8 @@ public class ProcessService { * generate a new work process instance from command. * * @param processDefinition processDefinition - * @param command command - * @param cmdParam cmdParam map + * @param command command + * @param cmdParam cmdParam map * @return process instance */ private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition, @@ -580,10 +576,10 @@ public class ProcessService { // curing global params processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - getCommandTypeIfComplement(processInstance, command), - processInstance.getScheduleTime())); + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + getCommandTypeIfComplement(processInstance, command), + processInstance.getScheduleTime())); //copy process define json to process instance processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson()); @@ -603,7 +599,7 @@ public class ProcessService { * use definition creator's tenant. * * @param tenantId tenantId - * @param userId userId + * @param userId userId * @return tenant */ public Tenant getTenantForProcess(int tenantId, int userId) { @@ -626,15 +622,15 @@ public class ProcessService { /** * check command parameters is valid * - * @param command command + * @param command command * @param cmdParam cmdParam map * @return whether command param is valid */ private Boolean checkCmdParam(Command command, Map cmdParam) { if (command.getTaskDependType() == TaskDependType.TASK_ONLY || command.getTaskDependType() == TaskDependType.TASK_PRE) { if (cmdParam == null - || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODE_NAMES) - || cmdParam.get(Constants.CMD_PARAM_START_NODE_NAMES).isEmpty()) { + || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODE_NAMES) + || cmdParam.get(Constants.CMD_PARAM_START_NODE_NAMES).isEmpty()) { logger.error("command node depend type is {}, but start nodes is null ", command.getTaskDependType()); return false; } @@ -646,7 +642,7 @@ public class ProcessService { * construct process instance according to one command. * * @param command command - * @param host host + * @param host host * @return process instance */ private ProcessInstance constructProcessInstance(Command command, String host) { @@ -714,7 +710,7 @@ public class ProcessService { // generate one new process instance processInstance = generateNewProcessInstance(processDefinition, command, cmdParam); } - if (!checkCmdParam(command, cmdParam)) { + if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) { logger.error("command parameter check failed!"); return null; } @@ -742,7 +738,7 @@ public class ProcessService { initTaskInstance(this.findTaskInstanceById(taskId)); } cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING, - String.join(Constants.COMMA, convertIntListToString(failedList))); + String.join(Constants.COMMA, convertIntListToString(failedList))); processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); processInstance.setRunTimes(runTime + 1); break; @@ -755,7 +751,7 @@ public class ProcessService { cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING); List suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE); List stopNodeList = findTaskIdByInstanceState(processInstance.getId(), - ExecutionStatus.KILL); + ExecutionStatus.KILL); suspendedNodeList.addAll(stopNodeList); for (Integer taskId : suspendedNodeList) { // initialize the pause state @@ -809,7 +805,7 @@ public class ProcessService { * return complement data if the process start with complement data * * @param processInstance processInstance - * @param command command + * @param command command * @return command type */ private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command) { @@ -824,8 +820,8 @@ public class ProcessService { * initialize complement data parameters * * @param processDefinition processDefinition - * @param processInstance processInstance - * @param cmdParam cmdParam + * @param processInstance processInstance + * @param cmdParam cmdParam */ private void initComplementDataParam(ProcessDefinition processDefinition, ProcessInstance processInstance, @@ -835,14 +831,14 @@ public class ProcessService { } Date startComplementTime = DateUtils.parse(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE), - YYYY_MM_DD_HH_MM_SS); + YYYY_MM_DD_HH_MM_SS); if (Flag.NO == processInstance.getIsSubProcess()) { processInstance.setScheduleTime(startComplementTime); } processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); } @@ -862,7 +858,7 @@ public class ProcessService { Map paramMap = JSONUtils.toMap(cmdParam); // write sub process id into cmd param. if (paramMap.containsKey(CMD_PARAM_SUB_PROCESS) - && CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) { + && CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) { paramMap.remove(CMD_PARAM_SUB_PROCESS); paramMap.put(CMD_PARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId())); subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap)); @@ -875,7 +871,7 @@ public class ProcessService { ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)); if (parentInstance != null) { subProcessInstance.setGlobalParams( - joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams())); + joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams())); this.saveProcessInstance(subProcessInstance); } else { logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam); @@ -897,7 +893,7 @@ public class ProcessService { * only the keys doesn't in sub process global would be joined. * * @param parentGlobalParams parentGlobalParams - * @param subGlobalParams subGlobalParams + * @param subGlobalParams subGlobalParams * @return global params join */ private String joinGlobalParams(String parentGlobalParams, String subGlobalParams) { @@ -922,12 +918,11 @@ public class ProcessService { */ private void initTaskInstance(TaskInstance taskInstance) { - if (!taskInstance.isSubProcess()) { - if (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure()) { - taskInstance.setFlag(Flag.NO); - updateTaskInstance(taskInstance); - return; - } + if (!taskInstance.isSubProcess() + && (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) { + taskInstance.setFlag(Flag.NO); + updateTaskInstance(taskInstance); + return; } taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); updateTaskInstance(taskInstance); @@ -944,12 +939,12 @@ public class ProcessService { public TaskInstance submitTask(TaskInstance taskInstance) { ProcessInstance processInstance = this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); logger.info("start submit task : {}, instance id:{}, state: {}", - taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState()); + taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState()); //submit to db TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance); if (task == null) { logger.error("end submit task to db error, task name:{}, process id:{} state: {} ", - taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState()); + taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState()); return task; } if (!task.getState().typeIsFinished()) { @@ -957,7 +952,7 @@ public class ProcessService { } logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ", - taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState()); + taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState()); return task; } @@ -968,7 +963,7 @@ public class ProcessService { * set map {parent instance id, task instance id, 0(child instance id)} * * @param parentInstance parentInstance - * @param parentTask parentTask + * @param parentTask parentTask * @return process instance map */ private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) { @@ -997,7 +992,7 @@ public class ProcessService { * find previous task work process map. * * @param parentProcessInstance parentProcessInstance - * @param parentTask parentTask + * @param parentTask parentTask * @return process instance map */ private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance, @@ -1015,7 +1010,7 @@ public class ProcessService { } } logger.info("sub process instance is not found,parent task:{},parent instance:{}", - parentTask.getId(), parentProcessInstance.getId()); + parentTask.getId(), parentProcessInstance.getId()); return null; } @@ -1049,10 +1044,6 @@ public class ProcessService { /** * complement data needs transform parent parameter to child. - * - * @param instanceMap - * @param parentProcessInstance - * @return */ private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance) { // set sub work process command @@ -1071,11 +1062,6 @@ public class ProcessService { /** * create sub work process command - * - * @param parentProcessInstance - * @param childInstance - * @param instanceMap - * @param task */ public Command createSubProcessCommand(ProcessInstance parentProcessInstance, ProcessInstance childInstance, @@ -1088,25 +1074,23 @@ public class ProcessService { String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance); return new Command( - commandType, - TaskDependType.TASK_POST, - parentProcessInstance.getFailureStrategy(), - parentProcessInstance.getExecutorId(), - childDefineId, - processParam, - parentProcessInstance.getWarningType(), - parentProcessInstance.getWarningGroupId(), - parentProcessInstance.getScheduleTime(), - task.getWorkerGroup(), - parentProcessInstance.getProcessInstancePriority() + commandType, + TaskDependType.TASK_POST, + parentProcessInstance.getFailureStrategy(), + parentProcessInstance.getExecutorId(), + childDefineId, + processParam, + parentProcessInstance.getWarningType(), + parentProcessInstance.getWarningGroupId(), + parentProcessInstance.getScheduleTime(), + task.getWorkerGroup(), + parentProcessInstance.getProcessInstancePriority() ); } /** * initialize sub work flow state * child instance state would be initialized when 'recovery from pause/stop/failure' - * - * @param childInstance */ private void initSubInstanceState(ProcessInstance childInstance) { if (childInstance != null) { @@ -1119,9 +1103,6 @@ public class ProcessService { * get sub work flow command type * child instance exist: child command = fatherCommand * child instance not exists: child command = fatherCommand[0] - * - * @param parentProcessInstance - * @return */ private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) { CommandType commandType = parentProcessInstance.getCommandType(); @@ -1136,7 +1117,7 @@ public class ProcessService { * update sub process definition * * @param parentProcessInstance parentProcessInstance - * @param childDefinitionId childDefinitionId + * @param childDefinitionId childDefinitionId */ private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, int childDefinitionId) { ProcessDefinition fatherDefinition = this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId()); @@ -1150,7 +1131,7 @@ public class ProcessService { /** * submit task to mysql * - * @param taskInstance taskInstance + * @param taskInstance taskInstance * @param processInstance processInstance * @return task instance */ @@ -1163,7 +1144,7 @@ public class ProcessService { } else { if (processInstanceState != ExecutionStatus.READY_STOP - && processInstanceState != ExecutionStatus.READY_PAUSE) { + && processInstanceState != ExecutionStatus.READY_PAUSE) { // failure task set invalid taskInstance.setFlag(Flag.NO); updateTaskInstance(taskInstance); @@ -1204,19 +1185,19 @@ public class ProcessService { * return stop if work process state is ready stop * if all of above are not satisfied, return submit success * - * @param taskInstance taskInstance + * @param taskInstance taskInstance * @param processInstanceState processInstanceState * @return process instance state */ public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState) { ExecutionStatus state = taskInstance.getState(); if ( - // running, delayed or killed - // the task already exists in task queue - // return state - state == ExecutionStatus.RUNNING_EXECUTION - || state == ExecutionStatus.DELAY_EXECUTION - || state == ExecutionStatus.KILL + // running, delayed or killed + // the task already exists in task queue + // return state + state == ExecutionStatus.RUNNING_EXECUTION + || state == ExecutionStatus.DELAY_EXECUTION + || state == ExecutionStatus.KILL ) { return state; } @@ -1225,7 +1206,7 @@ public class ProcessService { if (processInstanceState == ExecutionStatus.READY_PAUSE) { state = ExecutionStatus.PAUSE; } else if (processInstanceState == ExecutionStatus.READY_STOP - || !checkProcessStrategy(taskInstance)) { + || !checkProcessStrategy(taskInstance)) { state = ExecutionStatus.KILL; } else { state = ExecutionStatus.SUBMITTED_SUCCESS; @@ -1380,7 +1361,7 @@ public class ProcessService { * get id list by task state * * @param instanceId instanceId - * @param state state + * @param state state * @return task instance states */ public List findTaskIdByInstanceState(int instanceId, ExecutionStatus state) { @@ -1435,7 +1416,7 @@ public class ProcessService { * find work process map by parent process id and parent task id. * * @param parentWorkProcessId parentWorkProcessId - * @param parentTaskId parentTaskId + * @param parentTaskId parentTaskId * @return process instance map */ public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) { @@ -1457,7 +1438,7 @@ public class ProcessService { * find sub process instance * * @param parentProcessId parentProcessId - * @param parentTaskId parentTaskId + * @param parentTaskId parentTaskId * @return process instance */ public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId) { @@ -1489,12 +1470,12 @@ public class ProcessService { /** * change task state * - * @param state state - * @param startTime startTime - * @param host host + * @param state state + * @param startTime startTime + * @param host host * @param executePath executePath - * @param logPath logPath - * @param taskInstId taskInstId + * @param logPath logPath + * @param taskInstId taskInstId */ public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host, String executePath, @@ -1522,12 +1503,12 @@ public class ProcessService { * update the process instance * * @param processInstanceId processInstanceId - * @param processJson processJson - * @param globalParams globalParams - * @param scheduleTime scheduleTime - * @param flag flag - * @param locations locations - * @param connects connects + * @param processJson processJson + * @param globalParams globalParams + * @param scheduleTime scheduleTime + * @param flag flag + * @param locations locations + * @param connects connects * @return update process instance result */ public int updateProcessInstance(Integer processInstanceId, String processJson, @@ -1548,10 +1529,10 @@ public class ProcessService { /** * change task state * - * @param state state - * @param endTime endTime + * @param state state + * @param endTime endTime * @param taskInstId taskInstId - * @param varPool varPool + * @param varPool varPool */ public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date endTime, @@ -1577,7 +1558,7 @@ public class ProcessService { if (intList == null) { return new ArrayList<>(); } - List result = new ArrayList(intList.size()); + List result = new ArrayList<>(intList.size()); for (Integer intVar : intList) { result.add(String.valueOf(intVar)); } @@ -1642,7 +1623,7 @@ public class ProcessService { */ public List queryNeedFailoverTaskInstances(String host) { return taskInstanceMapper.queryByHostAndStatus(host, - stateArray); + stateArray); } /** @@ -1659,7 +1640,7 @@ public class ProcessService { * update process instance state by id * * @param processInstanceId processInstanceId - * @param executionStatus executionStatus + * @param executionStatus executionStatus * @return update process result */ public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) { @@ -1696,7 +1677,7 @@ public class ProcessService { /** * find tenant code by resource name * - * @param resName resource name + * @param resName resource name * @param resourceType resource type * @return tenant code */ @@ -1714,35 +1695,35 @@ public class ProcessService { */ public List selectAllByProcessDefineId(int[] ids) { return scheduleMapper.selectAllByProcessDefineArray( - ids); + ids); } /** * get dependency cycle by work process define id and scheduler fire time * - * @param masterId masterId + * @param masterId masterId * @param processDefinitionId processDefinitionId - * @param scheduledFireTime the time the task schedule is expected to trigger + * @param scheduledFireTime the time the task schedule is expected to trigger * @return CycleDependency * @throws Exception if error throws Exception */ public CycleDependency getCycleDependency(int masterId, int processDefinitionId, Date scheduledFireTime) throws Exception { - List list = getCycleDependencies(masterId, new int[] {processDefinitionId}, scheduledFireTime); - return list.size() > 0 ? list.get(0) : null; + List list = getCycleDependencies(masterId, new int[]{processDefinitionId}, scheduledFireTime); + return !list.isEmpty() ? list.get(0) : null; } /** * get dependency cycle list by work process define id list and scheduler fire time * - * @param masterId masterId - * @param ids ids + * @param masterId masterId + * @param ids ids * @param scheduledFireTime the time the task schedule is expected to trigger * @return CycleDependency list * @throws Exception if error throws Exception */ public List getCycleDependencies(int masterId, int[] ids, Date scheduledFireTime) throws Exception { - List cycleDependencyList = new ArrayList(); + List cycleDependencyList = new ArrayList<>(); if (null == ids || ids.length == 0) { logger.warn("ids[] is empty!is invalid!"); return cycleDependencyList; @@ -1769,14 +1750,10 @@ public class ProcessService { } Calendar calendar = Calendar.getInstance(); switch (cycleEnum) { - /*case MINUTE: - calendar.add(Calendar.MINUTE,-61);*/ case HOUR: calendar.add(Calendar.HOUR, -25); break; case DAY: - calendar.add(Calendar.DATE, -32); - break; case WEEK: calendar.add(Calendar.DATE, -32); break; @@ -1784,7 +1761,8 @@ public class ProcessService { calendar.add(Calendar.MONTH, -13); break; default: - logger.warn("Dependent process definition's cycleEnum is {},not support!!", cycleEnum.name()); + String cycleName = cycleEnum.name(); + logger.warn("Dependent process definition's cycleEnum is {},not support!!", cycleName); continue; } Date start = calendar.getTime(); @@ -1794,7 +1772,7 @@ public class ProcessService { } else { list = CronUtils.getFireDateList(start, scheduledFireTime, depCronExpression); } - if (list.size() >= 1) { + if (!list.isEmpty()) { start = list.get(list.size() - 1); CycleDependency dependency = new CycleDependency(depSchedule.getProcessDefinitionId(), start, CronUtils.getExpirationTime(start, cycleEnum), cycleEnum); cycleDependencyList.add(dependency); @@ -1813,8 +1791,8 @@ public class ProcessService { */ public ProcessInstance findLastSchedulerProcessInterval(int definitionId, DateInterval dateInterval) { return processInstanceMapper.queryLastSchedulerProcess(definitionId, - dateInterval.getStartTime(), - dateInterval.getEndTime()); + dateInterval.getStartTime(), + dateInterval.getEndTime()); } /** @@ -1826,23 +1804,23 @@ public class ProcessService { */ public ProcessInstance findLastManualProcessInterval(int definitionId, DateInterval dateInterval) { return processInstanceMapper.queryLastManualProcess(definitionId, - dateInterval.getStartTime(), - dateInterval.getEndTime()); + dateInterval.getStartTime(), + dateInterval.getEndTime()); } /** * find last running process instance * * @param definitionId process definition id - * @param startTime start time - * @param endTime end time + * @param startTime start time + * @param endTime end time * @return process instance */ public ProcessInstance findLastRunningProcess(int definitionId, Date startTime, Date endTime) { return processInstanceMapper.queryLastRunningProcess(definitionId, - startTime, - endTime, - stateArray); + startTime, + endTime, + stateArray); } /** @@ -1867,6 +1845,7 @@ public class ProcessService { /** * query project name and user name by processInstanceId. + * * @param processInstanceId processInstanceId * @return projectName and userName */ @@ -1934,35 +1913,32 @@ public class ProcessService { /** * list unauthorized udf function * - * @param userId user id + * @param userId user id * @param needChecks data source id array * @return unauthorized udf function list */ public List listUnauthorized(int userId, T[] needChecks, AuthorizationType authorizationType) { - List resultList = new ArrayList(); + List resultList = new ArrayList<>(); if (Objects.nonNull(needChecks) && needChecks.length > 0) { - Set originResSet = new HashSet(Arrays.asList(needChecks)); + Set originResSet = new HashSet<>(Arrays.asList(needChecks)); switch (authorizationType) { case RESOURCE_FILE_ID: - Set authorizedResourceFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> t.getId()).collect(toSet()); + case UDF_FILE: + Set authorizedResourceFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(Resource::getId).collect(toSet()); originResSet.removeAll(authorizedResourceFiles); break; case RESOURCE_FILE_NAME: - Set authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> t.getFullName()).collect(toSet()); + Set authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(Resource::getFullName).collect(toSet()); originResSet.removeAll(authorizedResources); break; - case UDF_FILE: - Set authorizedUdfFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> t.getId()).collect(toSet()); - originResSet.removeAll(authorizedUdfFiles); - break; case DATASOURCE: - Set authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId, needChecks).stream().map(t -> t.getId()).collect(toSet()); + Set authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId, needChecks).stream().map(DataSource::getId).collect(toSet()); originResSet.removeAll(authorizedDatasources); break; case UDF: - Set authorizedUdfs = udfFuncMapper.listAuthorizedUdfFunc(userId, needChecks).stream().map(t -> t.getId()).collect(toSet()); + Set authorizedUdfs = udfFuncMapper.listAuthorizedUdfFunc(userId, needChecks).stream().map(UdfFunc::getId).collect(toSet()); originResSet.removeAll(authorizedUdfs); break; default: @@ -2007,9 +1983,6 @@ public class ProcessService { /** * format task app id in task instance - * - * @param taskInstance - * @return */ public String formatTaskAppId(TaskInstance taskInstance) { ProcessDefinition definition = this.findProcessDefineById(taskInstance.getProcessDefinitionId()); @@ -2019,9 +1992,9 @@ public class ProcessService { return ""; } return String.format("%s_%s_%s", - definition.getId(), - processInstanceById.getId(), - taskInstance.getId()); + definition.getId(), + processInstanceById.getId(), + taskInstance.getId()); } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java index 6ac847b8db..2921ce2bba 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java @@ -14,8 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.service.quartz; +package org.apache.dolphinscheduler.service.quartz; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -25,6 +25,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.Date; + import org.quartz.Job; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; @@ -34,8 +37,6 @@ import org.slf4j.LoggerFactory; import org.springframework.util.Assert; import org.springframework.util.StringUtils; -import java.util.Date; - /** * process schedule job */ @@ -46,7 +47,7 @@ public class ProcessScheduleJob implements Job { */ private static final Logger logger = LoggerFactory.getLogger(ProcessScheduleJob.class); - public ProcessService getProcessService(){ + public ProcessService getProcessService() { return SpringApplicationContext.getBean(ProcessService.class); } @@ -66,10 +67,8 @@ public class ProcessScheduleJob implements Job { int projectId = dataMap.getInt(Constants.PROJECT_ID); int scheduleId = dataMap.getInt(Constants.SCHEDULE_ID); - Date scheduledFireTime = context.getScheduledFireTime(); - Date fireTime = context.getFireTime(); logger.info("scheduled fire time :{}, fire time :{}, process id :{}", scheduledFireTime, fireTime, scheduleId); @@ -82,11 +81,10 @@ public class ProcessScheduleJob implements Job { return; } - ProcessDefinition processDefinition = getProcessService().findProcessDefineById(schedule.getProcessDefinitionId()); // release state : online/offline ReleaseState releaseState = processDefinition.getReleaseState(); - if (processDefinition == null || releaseState == ReleaseState.OFFLINE) { + if (releaseState == ReleaseState.OFFLINE) { logger.warn("process definition does not exist in db or offline,need not to create command, projectId:{}, processId:{}", projectId, scheduleId); return; } @@ -107,7 +105,6 @@ public class ProcessScheduleJob implements Job { getProcessService().createCommand(command); } - /** * delete job */ diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java index 3b15810e05..fd91e4076d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java @@ -14,15 +14,76 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.service.quartz; +import static org.apache.dolphinscheduler.common.Constants.ORG_POSTGRESQL_DRIVER; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_CLASS; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_DATASOURCE; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_ISCLUSTERED; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_CLASS; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_THREADCOUNT; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY; +import static org.apache.dolphinscheduler.common.Constants.PROJECT_ID; +import static org.apache.dolphinscheduler.common.Constants.QUARTZ_ACQUIRETRIGGERSWITHINLOCK; +import static org.apache.dolphinscheduler.common.Constants.QUARTZ_CLUSTERCHECKININTERVAL; +import static org.apache.dolphinscheduler.common.Constants.QUARTZ_DATASOURCE; +import static org.apache.dolphinscheduler.common.Constants.QUARTZ_INSTANCEID; +import static org.apache.dolphinscheduler.common.Constants.QUARTZ_INSTANCENAME; +import static org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_GROUP_PRIFIX; +import static org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_PRIFIX; +import static org.apache.dolphinscheduler.common.Constants.QUARTZ_MISFIRETHRESHOLD; +import static org.apache.dolphinscheduler.common.Constants.QUARTZ_PROPERTIES_PATH; +import static org.apache.dolphinscheduler.common.Constants.QUARTZ_TABLE_PREFIX; +import static org.apache.dolphinscheduler.common.Constants.QUARTZ_THREADCOUNT; +import static org.apache.dolphinscheduler.common.Constants.QUARTZ_THREADPRIORITY; +import static org.apache.dolphinscheduler.common.Constants.SCHEDULE; +import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_ID; +import static org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME; +import static org.apache.dolphinscheduler.common.Constants.STRING_FALSE; +import static org.apache.dolphinscheduler.common.Constants.STRING_TRUE; +import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; + +import static org.quartz.CronScheduleBuilder.cronSchedule; +import static org.quartz.JobBuilder.newJob; +import static org.quartz.TriggerBuilder.newTrigger; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.service.exceptions.ServiceException; + import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.lang.StringUtils; -import org.apache.dolphinscheduler.common.utils.*; -import org.apache.dolphinscheduler.dao.entity.Schedule; -import org.quartz.*; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.quartz.CronTrigger; +import org.quartz.Job; +import org.quartz.JobDetail; +import org.quartz.JobKey; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.TriggerKey; import org.quartz.impl.StdSchedulerFactory; import org.quartz.impl.jdbcjobstore.JobStoreTX; import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate; @@ -32,300 +93,289 @@ import org.quartz.simpl.SimpleThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static org.apache.dolphinscheduler.common.Constants.*; -import static org.quartz.CronScheduleBuilder.cronSchedule; -import static org.quartz.JobBuilder.newJob; -import static org.quartz.TriggerBuilder.newTrigger; - /** * single Quartz executors instance */ public class QuartzExecutors { - /** - * logger of QuartzExecutors - */ - private static final Logger logger = LoggerFactory.getLogger(QuartzExecutors.class); - - /** - * read write lock - */ - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - - /** - * A Scheduler maintains a registry of org.quartz.JobDetail and Trigger. - */ - private static Scheduler scheduler; - - /** - * load conf - */ - private static Configuration conf; - - private static final class Holder { - private static final QuartzExecutors instance = new QuartzExecutors(); - } - - - private QuartzExecutors() { - try { - conf = new PropertiesConfiguration(QUARTZ_PROPERTIES_PATH); - init(); - }catch (ConfigurationException e){ - logger.warn("not loaded quartz configuration file, will used default value",e); + /** + * logger of QuartzExecutors + */ + private static final Logger logger = LoggerFactory.getLogger(QuartzExecutors.class); + + /** + * read write lock + */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** + * A Scheduler maintains a registry of org.quartz.JobDetail and Trigger. + */ + private static Scheduler scheduler; + + /** + * load conf + */ + private static Configuration conf; + + private static final class Holder { + private static final QuartzExecutors instance = new QuartzExecutors(); } - } - - /** - * thread safe and performance promote - * @return instance of Quartz Executors - */ - public static QuartzExecutors getInstance() { - return Holder.instance; - } - - - /** - * init - * - * Returns a client-usable handle to a Scheduler. - */ - private void init() { - try { - StdSchedulerFactory schedulerFactory = new StdSchedulerFactory(); - Properties properties = new Properties(); - - String dataSourceDriverClass = org.apache.dolphinscheduler.dao.utils.PropertyUtils.getString(SPRING_DATASOURCE_DRIVER_CLASS_NAME); - if (dataSourceDriverClass.equals(ORG_POSTGRESQL_DRIVER)){ - properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, PostgreSQLDelegate.class.getName())); - } else { - properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, StdJDBCDelegate.class.getName())); - } - properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCENAME, conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCENAME, QUARTZ_INSTANCENAME)); - properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCEID, conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCEID, QUARTZ_INSTANCEID)); - properties.setProperty(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,conf.getString(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,STRING_TRUE)); - properties.setProperty(ORG_QUARTZ_JOBSTORE_USEPROPERTIES,conf.getString(ORG_QUARTZ_JOBSTORE_USEPROPERTIES,STRING_FALSE)); - properties.setProperty(ORG_QUARTZ_THREADPOOL_CLASS,conf.getString(ORG_QUARTZ_THREADPOOL_CLASS, SimpleThreadPool.class.getName())); - properties.setProperty(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,conf.getString(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,STRING_TRUE)); - properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADCOUNT,conf.getString(ORG_QUARTZ_THREADPOOL_THREADCOUNT, QUARTZ_THREADCOUNT)); - properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADPRIORITY,conf.getString(ORG_QUARTZ_THREADPOOL_THREADPRIORITY, QUARTZ_THREADPRIORITY)); - properties.setProperty(ORG_QUARTZ_JOBSTORE_CLASS,conf.getString(ORG_QUARTZ_JOBSTORE_CLASS, JobStoreTX.class.getName())); - properties.setProperty(ORG_QUARTZ_JOBSTORE_TABLEPREFIX,conf.getString(ORG_QUARTZ_JOBSTORE_TABLEPREFIX, QUARTZ_TABLE_PREFIX)); - properties.setProperty(ORG_QUARTZ_JOBSTORE_ISCLUSTERED,conf.getString(ORG_QUARTZ_JOBSTORE_ISCLUSTERED,STRING_TRUE)); - properties.setProperty(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD,conf.getString(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, QUARTZ_MISFIRETHRESHOLD)); - properties.setProperty(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL,conf.getString(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, QUARTZ_CLUSTERCHECKININTERVAL)); - properties.setProperty(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK,conf.getString(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, QUARTZ_ACQUIRETRIGGERSWITHINLOCK)); - properties.setProperty(ORG_QUARTZ_JOBSTORE_DATASOURCE,conf.getString(ORG_QUARTZ_JOBSTORE_DATASOURCE, QUARTZ_DATASOURCE)); - properties.setProperty(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,conf.getString(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,DruidConnectionProvider.class.getName())); - - schedulerFactory.initialize(properties); - scheduler = schedulerFactory.getScheduler(); - - } catch (SchedulerException e) { - logger.error(e.getMessage(),e); - System.exit(1); + + private QuartzExecutors() { + try { + conf = new PropertiesConfiguration(QUARTZ_PROPERTIES_PATH); + init(); + } catch (ConfigurationException e) { + logger.warn("not loaded quartz configuration file, will used default value", e); + } } - } - - /** - * Whether the scheduler has been started. - * - * @throws SchedulerException scheduler exception - */ - public void start() throws SchedulerException { - if (!scheduler.isStarted()){ - scheduler.start(); - logger.info("Quartz service started" ); + /** + * thread safe and performance promote + * + * @return instance of Quartz Executors + */ + public static QuartzExecutors getInstance() { + return Holder.instance; } - } - - /** - * stop all scheduled tasks - * - * Halts the Scheduler's firing of Triggers, - * and cleans up all resources associated with the Scheduler. - * - * The scheduler cannot be re-started. - * @throws SchedulerException scheduler exception - */ - public void shutdown() throws SchedulerException { - if (!scheduler.isShutdown()) { - // don't wait for the task to complete - scheduler.shutdown(); - logger.info("Quartz service stopped, and halt all tasks"); + + /** + * init + *

+ * Returns a client-usable handle to a Scheduler. + */ + private void init() { + try { + StdSchedulerFactory schedulerFactory = new StdSchedulerFactory(); + Properties properties = new Properties(); + + String dataSourceDriverClass = org.apache.dolphinscheduler.dao.utils.PropertyUtils.getString(SPRING_DATASOURCE_DRIVER_CLASS_NAME); + if (dataSourceDriverClass.equals(ORG_POSTGRESQL_DRIVER)) { + properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, PostgreSQLDelegate.class.getName())); + } else { + properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, StdJDBCDelegate.class.getName())); + } + properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCENAME, conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCENAME, QUARTZ_INSTANCENAME)); + properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCEID, conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCEID, QUARTZ_INSTANCEID)); + properties.setProperty(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON, conf.getString(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON, STRING_TRUE)); + properties.setProperty(ORG_QUARTZ_JOBSTORE_USEPROPERTIES, conf.getString(ORG_QUARTZ_JOBSTORE_USEPROPERTIES, STRING_FALSE)); + properties.setProperty(ORG_QUARTZ_THREADPOOL_CLASS, conf.getString(ORG_QUARTZ_THREADPOOL_CLASS, SimpleThreadPool.class.getName())); + properties.setProperty(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS, conf.getString(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS, STRING_TRUE)); + properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADCOUNT, conf.getString(ORG_QUARTZ_THREADPOOL_THREADCOUNT, QUARTZ_THREADCOUNT)); + properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADPRIORITY, conf.getString(ORG_QUARTZ_THREADPOOL_THREADPRIORITY, QUARTZ_THREADPRIORITY)); + properties.setProperty(ORG_QUARTZ_JOBSTORE_CLASS, conf.getString(ORG_QUARTZ_JOBSTORE_CLASS, JobStoreTX.class.getName())); + properties.setProperty(ORG_QUARTZ_JOBSTORE_TABLEPREFIX, conf.getString(ORG_QUARTZ_JOBSTORE_TABLEPREFIX, QUARTZ_TABLE_PREFIX)); + properties.setProperty(ORG_QUARTZ_JOBSTORE_ISCLUSTERED, conf.getString(ORG_QUARTZ_JOBSTORE_ISCLUSTERED, STRING_TRUE)); + properties.setProperty(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, conf.getString(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, QUARTZ_MISFIRETHRESHOLD)); + properties.setProperty(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, conf.getString(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, QUARTZ_CLUSTERCHECKININTERVAL)); + properties.setProperty(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, conf.getString(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, QUARTZ_ACQUIRETRIGGERSWITHINLOCK)); + properties.setProperty(ORG_QUARTZ_JOBSTORE_DATASOURCE, conf.getString(ORG_QUARTZ_JOBSTORE_DATASOURCE, QUARTZ_DATASOURCE)); + properties.setProperty(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS, conf.getString(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS, DruidConnectionProvider.class.getName())); + + schedulerFactory.initialize(properties); + scheduler = schedulerFactory.getScheduler(); + + } catch (SchedulerException e) { + logger.error(e.getMessage(), e); + System.exit(1); + } + } - } - - - /** - * add task trigger , if this task already exists, return this task with updated trigger - * - * @param clazz job class name - * @param jobName job name - * @param jobGroupName job group name - * @param startDate job start date - * @param endDate job end date - * @param cronExpression cron expression - * @param jobDataMap job parameters data map - */ - public void addJob(Class clazz,String jobName,String jobGroupName,Date startDate, Date endDate, - String cronExpression, - Map jobDataMap) { - lock.writeLock().lock(); - try { - - JobKey jobKey = new JobKey(jobName, jobGroupName); - JobDetail jobDetail; - //add a task (if this task already exists, return this task directly) - if (scheduler.checkExists(jobKey)) { - - jobDetail = scheduler.getJobDetail(jobKey); - if (jobDataMap != null) { - jobDetail.getJobDataMap().putAll(jobDataMap); + + /** + * Whether the scheduler has been started. + * + * @throws SchedulerException scheduler exception + */ + public void start() throws SchedulerException { + if (!scheduler.isStarted()) { + scheduler.start(); + logger.info("Quartz service started"); } - } else { - jobDetail = newJob(clazz).withIdentity(jobKey).build(); + } - if (jobDataMap != null) { - jobDetail.getJobDataMap().putAll(jobDataMap); + /** + * stop all scheduled tasks + *

+ * Halts the Scheduler's firing of Triggers, + * and cleans up all resources associated with the Scheduler. + *

+ * The scheduler cannot be re-started. + * + * @throws SchedulerException scheduler exception + */ + public void shutdown() throws SchedulerException { + if (!scheduler.isShutdown()) { + // don't wait for the task to complete + scheduler.shutdown(); + logger.info("Quartz service stopped, and halt all tasks"); } + } - scheduler.addJob(jobDetail, false, true); - - logger.info("Add job, job name: {}, group name: {}", - jobName, jobGroupName); - } - - TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName); - /** - * Instructs the Scheduler that upon a mis-fire - * situation, the CronTrigger wants to have it's - * next-fire-time updated to the next time in the schedule after the - * current time (taking into account any associated Calendar), - * but it does not want to be fired now. - */ - CronTrigger cronTrigger = newTrigger().withIdentity(triggerKey).startAt(startDate).endAt(endDate) - .withSchedule(cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing()) - .forJob(jobDetail).build(); - - if (scheduler.checkExists(triggerKey)) { - // updateProcessInstance scheduler trigger when scheduler cycle changes - CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey); - String oldCronExpression = oldCronTrigger.getCronExpression(); - - if (!StringUtils.equalsIgnoreCase(cronExpression,oldCronExpression)) { - // reschedule job trigger - scheduler.rescheduleJob(triggerKey, cronTrigger); - logger.info("reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}", - jobName, jobGroupName, cronExpression, startDate, endDate); - } - } else { - scheduler.scheduleJob(cronTrigger); - logger.info("schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}", - jobName, jobGroupName, cronExpression, startDate, endDate); - } - - } catch (Exception e) { - logger.error("add job failed", e); - throw new RuntimeException("add job failed", e); - } finally { - lock.writeLock().unlock(); + /** + * add task trigger , if this task already exists, return this task with updated trigger + * + * @param clazz job class name + * @param jobName job name + * @param jobGroupName job group name + * @param startDate job start date + * @param endDate job end date + * @param cronExpression cron expression + * @param jobDataMap job parameters data map + */ + public void addJob(Class clazz, String jobName, String jobGroupName, Date startDate, Date endDate, + String cronExpression, + Map jobDataMap) { + lock.writeLock().lock(); + try { + + JobKey jobKey = new JobKey(jobName, jobGroupName); + JobDetail jobDetail; + //add a task (if this task already exists, return this task directly) + if (scheduler.checkExists(jobKey)) { + + jobDetail = scheduler.getJobDetail(jobKey); + if (jobDataMap != null) { + jobDetail.getJobDataMap().putAll(jobDataMap); + } + } else { + jobDetail = newJob(clazz).withIdentity(jobKey).build(); + + if (jobDataMap != null) { + jobDetail.getJobDataMap().putAll(jobDataMap); + } + + scheduler.addJob(jobDetail, false, true); + + logger.info("Add job, job name: {}, group name: {}", + jobName, jobGroupName); + } + + TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName); + /** + * Instructs the Scheduler that upon a mis-fire + * situation, the CronTrigger wants to have it's + * next-fire-time updated to the next time in the schedule after the + * current time (taking into account any associated Calendar), + * but it does not want to be fired now. + */ + CronTrigger cronTrigger = newTrigger().withIdentity(triggerKey).startAt(startDate).endAt(endDate) + .withSchedule(cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing()) + .forJob(jobDetail).build(); + + if (scheduler.checkExists(triggerKey)) { + // updateProcessInstance scheduler trigger when scheduler cycle changes + CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey); + String oldCronExpression = oldCronTrigger.getCronExpression(); + + if (!StringUtils.equalsIgnoreCase(cronExpression, oldCronExpression)) { + // reschedule job trigger + scheduler.rescheduleJob(triggerKey, cronTrigger); + logger.info("reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}", + jobName, jobGroupName, cronExpression, startDate, endDate); + } + } else { + scheduler.scheduleJob(cronTrigger); + logger.info("schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}", + jobName, jobGroupName, cronExpression, startDate, endDate); + } + + } catch (Exception e) { + throw new ServiceException("add job failed", e); + } finally { + lock.writeLock().unlock(); + } } - } - - - /** - * delete job - * - * @param jobName job name - * @param jobGroupName job group name - * @return true if the Job was found and deleted. - */ - public boolean deleteJob(String jobName, String jobGroupName) { - lock.writeLock().lock(); - try { - JobKey jobKey = new JobKey(jobName,jobGroupName); - if(scheduler.checkExists(jobKey)){ - logger.info("try to delete job, job name: {}, job group name: {},", jobName, jobGroupName); - return scheduler.deleteJob(jobKey); - }else { - return true; - } - - } catch (SchedulerException e) { - logger.error("delete job : {} failed",jobName, e); - } finally { - lock.writeLock().unlock(); + + /** + * delete job + * + * @param jobName job name + * @param jobGroupName job group name + * @return true if the Job was found and deleted. + */ + public boolean deleteJob(String jobName, String jobGroupName) { + lock.writeLock().lock(); + try { + JobKey jobKey = new JobKey(jobName, jobGroupName); + if (scheduler.checkExists(jobKey)) { + logger.info("try to delete job, job name: {}, job group name: {},", jobName, jobGroupName); + return scheduler.deleteJob(jobKey); + } else { + return true; + } + + } catch (SchedulerException e) { + logger.error("delete job : {} failed", jobName, e); + } finally { + lock.writeLock().unlock(); + } + return false; + } + + /** + * delete all jobs in job group + * + * @param jobGroupName job group name + * @return true if all of the Jobs were found and deleted, false if + * one or more were not deleted. + */ + public boolean deleteAllJobs(String jobGroupName) { + lock.writeLock().lock(); + try { + logger.info("try to delete all jobs in job group: {}", jobGroupName); + List jobKeys = new ArrayList<>(); + jobKeys.addAll(scheduler.getJobKeys(GroupMatcher.groupEndsWith(jobGroupName))); + + return scheduler.deleteJobs(jobKeys); + } catch (SchedulerException e) { + logger.error("delete all jobs in job group: {} failed", jobGroupName, e); + } finally { + lock.writeLock().unlock(); + } + return false; } - return false; - } - - /** - * delete all jobs in job group - * - * @param jobGroupName job group name - * - * @return true if all of the Jobs were found and deleted, false if - * one or more were not deleted. - */ - public boolean deleteAllJobs(String jobGroupName) { - lock.writeLock().lock(); - try { - logger.info("try to delete all jobs in job group: {}", jobGroupName); - List jobKeys = new ArrayList<>(); - jobKeys.addAll(scheduler.getJobKeys(GroupMatcher.groupEndsWith(jobGroupName))); - - return scheduler.deleteJobs(jobKeys); - } catch (SchedulerException e) { - logger.error("delete all jobs in job group: {} failed",jobGroupName, e); - } finally { - lock.writeLock().unlock(); + + /** + * build job name + * + * @param processId process id + * @return job name + */ + public static String buildJobName(int processId) { + StringBuilder sb = new StringBuilder(30); + sb.append(QUARTZ_JOB_PRIFIX).append(UNDERLINE).append(processId); + return sb.toString(); + } + + /** + * build job group name + * + * @param projectId project id + * @return job group name + */ + public static String buildJobGroupName(int projectId) { + StringBuilder sb = new StringBuilder(30); + sb.append(QUARTZ_JOB_GROUP_PRIFIX).append(UNDERLINE).append(projectId); + return sb.toString(); + } + + /** + * add params to map + * + * @param projectId project id + * @param scheduleId schedule id + * @param schedule schedule + * @return data map + */ + public static Map buildDataMap(int projectId, int scheduleId, Schedule schedule) { + Map dataMap = new HashMap<>(3); + dataMap.put(PROJECT_ID, projectId); + dataMap.put(SCHEDULE_ID, scheduleId); + dataMap.put(SCHEDULE, JSONUtils.toJsonString(schedule)); + + return dataMap; } - return false; - } - - /** - * build job name - * @param processId process id - * @return job name - */ - public static String buildJobName(int processId) { - StringBuilder sb = new StringBuilder(30); - sb.append(QUARTZ_JOB_PRIFIX).append(UNDERLINE).append(processId); - return sb.toString(); - } - - /** - * build job group name - * @param projectId project id - * @return job group name - */ - public static String buildJobGroupName(int projectId) { - StringBuilder sb = new StringBuilder(30); - sb.append(QUARTZ_JOB_GROUP_PRIFIX).append(UNDERLINE).append(projectId); - return sb.toString(); - } - - /** - * add params to map - * - * @param projectId project id - * @param scheduleId schedule id - * @param schedule schedule - * @return data map - */ - public static Map buildDataMap(int projectId, int scheduleId, Schedule schedule) { - Map dataMap = new HashMap<>(3); - dataMap.put(PROJECT_ID, projectId); - dataMap.put(SCHEDULE_ID, scheduleId); - dataMap.put(SCHEDULE, JSONUtils.toJsonString(schedule)); - - return dataMap; - } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java index 0a2e31b610..60c862340b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java @@ -14,159 +14,177 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.service.quartz.cron; +import org.apache.dolphinscheduler.common.enums.CycleEnum; + import com.cronutils.model.Cron; import com.cronutils.model.field.CronField; import com.cronutils.model.field.CronFieldName; -import com.cronutils.model.field.expression.*; -import org.apache.dolphinscheduler.common.enums.CycleEnum; +import com.cronutils.model.field.expression.Always; +import com.cronutils.model.field.expression.And; +import com.cronutils.model.field.expression.Between; +import com.cronutils.model.field.expression.Every; +import com.cronutils.model.field.expression.FieldExpression; +import com.cronutils.model.field.expression.On; /** * Cycle */ public abstract class AbstractCycle { - protected Cron cron; - - protected CronField minField; - protected CronField hourField; - protected CronField dayOfMonthField; - protected CronField dayOfWeekField; - protected CronField monthField; - protected CronField yearField; - - public CycleLinks addCycle(AbstractCycle cycle) { - return new CycleLinks(this.cron).addCycle(this).addCycle(cycle); - } - - /** - * cycle constructor - * @param cron cron - */ - public AbstractCycle(Cron cron) { - if (cron == null) { - throw new IllegalArgumentException("cron must not be null!"); + protected Cron cron; + + protected CronField minField; + protected CronField hourField; + protected CronField dayOfMonthField; + protected CronField dayOfWeekField; + protected CronField monthField; + protected CronField yearField; + + public CycleLinks addCycle(AbstractCycle cycle) { + return new CycleLinks(this.cron).addCycle(this).addCycle(cycle); + } + + /** + * cycle constructor + * + * @param cron cron + */ + protected AbstractCycle(Cron cron) { + if (cron == null) { + throw new IllegalArgumentException("cron must not be null!"); + } + + this.cron = cron; + this.minField = cron.retrieve(CronFieldName.MINUTE); + this.hourField = cron.retrieve(CronFieldName.HOUR); + this.dayOfMonthField = cron.retrieve(CronFieldName.DAY_OF_MONTH); + this.dayOfWeekField = cron.retrieve(CronFieldName.DAY_OF_WEEK); + this.monthField = cron.retrieve(CronFieldName.MONTH); + this.yearField = cron.retrieve(CronFieldName.YEAR); + } + + /** + * whether the minute field has a value + * + * @return if minute field has a value return true,else return false + */ + protected boolean minFiledIsSetAll() { + FieldExpression minFieldExpression = minField.getExpression(); + return (minFieldExpression instanceof Every || minFieldExpression instanceof Always + || minFieldExpression instanceof Between || minFieldExpression instanceof And + || minFieldExpression instanceof On); + } + + /** + * whether the minute field has a value of every or always + * + * @return if minute field has a value of every or always return true,else return false + */ + protected boolean minFiledIsEvery() { + FieldExpression minFieldExpression = minField.getExpression(); + return (minFieldExpression instanceof Every || minFieldExpression instanceof Always); + } + + /** + * whether the hour field has a value + * + * @return if hour field has a value return true,else return false + */ + protected boolean hourFiledIsSetAll() { + FieldExpression hourFieldExpression = hourField.getExpression(); + return (hourFieldExpression instanceof Every || hourFieldExpression instanceof Always + || hourFieldExpression instanceof Between || hourFieldExpression instanceof And + || hourFieldExpression instanceof On); + } + + /** + * whether the hour field has a value of every or always + * + * @return if hour field has a value of every or always return true,else return false + */ + protected boolean hourFiledIsEvery() { + FieldExpression hourFieldExpression = hourField.getExpression(); + return (hourFieldExpression instanceof Every || hourFieldExpression instanceof Always); + } + + /** + * whether the day Of month field has a value + * + * @return if day Of month field has a value return true,else return false + */ + protected boolean dayOfMonthFieldIsSetAll() { + return (dayOfMonthField.getExpression() instanceof Every || dayOfMonthField.getExpression() instanceof Always + || dayOfMonthField.getExpression() instanceof Between || dayOfMonthField.getExpression() instanceof And + || dayOfMonthField.getExpression() instanceof On); + } + + /** + * whether the day Of Month field has a value of every or always + * + * @return if day Of Month field has a value of every or always return true,else return false + */ + protected boolean dayOfMonthFieldIsEvery() { + return (dayOfMonthField.getExpression() instanceof Every || dayOfMonthField.getExpression() instanceof Always); + } + + /** + * whether month field has a value + * + * @return if month field has a value return true,else return false + */ + protected boolean monthFieldIsSetAll() { + FieldExpression monthFieldExpression = monthField.getExpression(); + return (monthFieldExpression instanceof Every || monthFieldExpression instanceof Always + || monthFieldExpression instanceof Between || monthFieldExpression instanceof And + || monthFieldExpression instanceof On); + } + + /** + * whether the month field has a value of every or always + * + * @return if month field has a value of every or always return true,else return false + */ + protected boolean monthFieldIsEvery() { + FieldExpression monthFieldExpression = monthField.getExpression(); + return (monthFieldExpression instanceof Every || monthFieldExpression instanceof Always); + } + + /** + * whether the day Of week field has a value + * + * @return if day Of week field has a value return true,else return false + */ + protected boolean dayofWeekFieldIsSetAll() { + FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression(); + return (dayOfWeekFieldExpression instanceof Every || dayOfWeekFieldExpression instanceof Always + || dayOfWeekFieldExpression instanceof Between || dayOfWeekFieldExpression instanceof And + || dayOfWeekFieldExpression instanceof On); + } + + /** + * whether the day Of week field has a value of every or always + * + * @return if day Of week field has a value of every or always return true,else return false + */ + protected boolean dayofWeekFieldIsEvery() { + FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression(); + return (dayOfWeekFieldExpression instanceof Every || dayOfWeekFieldExpression instanceof Always); } - this.cron = cron; - this.minField = cron.retrieve(CronFieldName.MINUTE); - this.hourField = cron.retrieve(CronFieldName.HOUR); - this.dayOfMonthField = cron.retrieve(CronFieldName.DAY_OF_MONTH); - this.dayOfWeekField = cron.retrieve(CronFieldName.DAY_OF_WEEK); - this.monthField = cron.retrieve(CronFieldName.MONTH); - this.yearField = cron.retrieve(CronFieldName.YEAR); - } - - /** - * whether the minute field has a value - * @return if minute field has a value return true,else return false - */ - protected boolean minFiledIsSetAll(){ - FieldExpression minFieldExpression = minField.getExpression(); - return (minFieldExpression instanceof Every || minFieldExpression instanceof Always - || minFieldExpression instanceof Between || minFieldExpression instanceof And - || minFieldExpression instanceof On); - } - - - /** - * whether the minute field has a value of every or always - * @return if minute field has a value of every or always return true,else return false - */ - protected boolean minFiledIsEvery(){ - FieldExpression minFieldExpression = minField.getExpression(); - return (minFieldExpression instanceof Every || minFieldExpression instanceof Always); - } - - /** - * whether the hour field has a value - * @return if hour field has a value return true,else return false - */ - protected boolean hourFiledIsSetAll(){ - FieldExpression hourFieldExpression = hourField.getExpression(); - return (hourFieldExpression instanceof Every || hourFieldExpression instanceof Always - || hourFieldExpression instanceof Between || hourFieldExpression instanceof And - || hourFieldExpression instanceof On); - } - - /** - * whether the hour field has a value of every or always - * @return if hour field has a value of every or always return true,else return false - */ - protected boolean hourFiledIsEvery(){ - FieldExpression hourFieldExpression = hourField.getExpression(); - return (hourFieldExpression instanceof Every || hourFieldExpression instanceof Always); - } - - /** - * whether the day Of month field has a value - * @return if day Of month field has a value return true,else return false - */ - protected boolean dayOfMonthFieldIsSetAll(){ - return (dayOfMonthField.getExpression() instanceof Every || dayOfMonthField.getExpression() instanceof Always - || dayOfMonthField.getExpression() instanceof Between || dayOfMonthField.getExpression() instanceof And - || dayOfMonthField.getExpression() instanceof On); - } - - - /** - * whether the day Of Month field has a value of every or always - * @return if day Of Month field has a value of every or always return true,else return false - */ - protected boolean dayOfMonthFieldIsEvery(){ - return (dayOfMonthField.getExpression() instanceof Every || dayOfMonthField.getExpression() instanceof Always); - } - - /** - * whether month field has a value - * @return if month field has a value return true,else return false - */ - protected boolean monthFieldIsSetAll(){ - FieldExpression monthFieldExpression = monthField.getExpression(); - return (monthFieldExpression instanceof Every || monthFieldExpression instanceof Always - || monthFieldExpression instanceof Between || monthFieldExpression instanceof And - || monthFieldExpression instanceof On); - } - - /** - * whether the month field has a value of every or always - * @return if month field has a value of every or always return true,else return false - */ - protected boolean monthFieldIsEvery(){ - FieldExpression monthFieldExpression = monthField.getExpression(); - return (monthFieldExpression instanceof Every || monthFieldExpression instanceof Always); - } - - /** - * whether the day Of week field has a value - * @return if day Of week field has a value return true,else return false - */ - protected boolean dayofWeekFieldIsSetAll(){ - FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression(); - return (dayOfWeekFieldExpression instanceof Every || dayOfWeekFieldExpression instanceof Always - || dayOfWeekFieldExpression instanceof Between || dayOfWeekFieldExpression instanceof And - || dayOfWeekFieldExpression instanceof On); - } - - /** - * whether the day Of week field has a value of every or always - * @return if day Of week field has a value of every or always return true,else return false - */ - protected boolean dayofWeekFieldIsEvery(){ - FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression(); - return (dayOfWeekFieldExpression instanceof Every || dayOfWeekFieldExpression instanceof Always); - } - - /** - * get cycle enum - * @return CycleEnum - */ - protected abstract CycleEnum getCycle(); - - /** - * get mini level cycle enum - * @return CycleEnum - */ - protected abstract CycleEnum getMiniCycle(); + /** + * get cycle enum + * + * @return CycleEnum + */ + protected abstract CycleEnum getCycle(); + + /** + * get mini level cycle enum + * + * @return CycleEnum + */ + protected abstract CycleEnum getMiniCycle(); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java index 8a7d891c2e..37d8f10c93 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java @@ -14,322 +14,329 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.service.zk; -import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP; +import static org.apache.dolphinscheduler.common.Constants.COLON; +import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP; +import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING; +import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX; +import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; +import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; +import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.ResInfo; import org.apache.dolphinscheduler.common.utils.StringUtils; + +import org.apache.curator.framework.recipes.locks.InterProcessMutex; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; -import java.util.*; - -import static org.apache.dolphinscheduler.common.Constants.*; - /** * abstract zookeeper client */ @Component public abstract class AbstractZKClient extends ZookeeperCachedOperator { - private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class); - - - /** - * remove dead server by host - * @param host host - * @param serverType serverType - * @throws Exception - */ - public void removeDeadServerByHost(String host, String serverType) throws Exception { - List deadServers = super.getChildrenKeys(getDeadZNodeParentPath()); - for(String serverPath : deadServers){ - if(serverPath.startsWith(serverType+UNDERLINE+host)){ - String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath; - super.remove(server); - logger.info("{} server {} deleted from zk dead server path success" , serverType , host); - } - } - } - - - /** - * opType(add): if find dead server , then add to zk deadServerPath - * opType(delete): delete path from zk - * - * @param zNode node path - * @param zkNodeType master or worker - * @param opType delete or add - * @throws Exception errors - */ - public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception { - String host = getHostByEventDataPath(zNode); - String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX; - - //check server restart, if restart , dead server path in zk should be delete - if(opType.equals(DELETE_ZK_OP)){ - removeDeadServerByHost(host, type); - - }else if(opType.equals(ADD_ZK_OP)){ - String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host; - if(!super.isExisted(deadServerPath)){ - //add dead server info to zk dead server path : /dead-servers/ - - super.persist(deadServerPath,(type + UNDERLINE + host)); - - logger.info("{} server dead , and {} added to zk dead server path success" , - zkNodeType.toString(), zNode); - } - } - - } - - /** - * get active master num - * @return active master number - */ - public int getActiveMasterNum(){ - List childrenList = new ArrayList<>(); - try { - // read master node parent path from conf - if(super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))){ - childrenList = super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER)); - } - } catch (Exception e) { - logger.error("getActiveMasterNum error",e); - } - return childrenList.size(); - } - - /** - * - * @return zookeeper quorum - */ - public String getZookeeperQuorum(){ - return getZookeeperConfig().getServerList(); - } - - /** - * get server list. - * @param zkNodeType zookeeper node type - * @return server list - */ - public List getServersList(ZKNodeType zkNodeType){ - Map masterMap = getServerMaps(zkNodeType); - String parentPath = getZNodeParentPath(zkNodeType); - - List masterServers = new ArrayList<>(); - for (Map.Entry entry : masterMap.entrySet()) { - Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue()); - if(masterServer == null){ - continue; - } - String key = entry.getKey(); - masterServer.setZkDirectory(parentPath + "/"+ key); - //set host and port - String[] hostAndPort=key.split(COLON); - String[] hosts=hostAndPort[0].split(DIVISION_STRING); - // fetch the last one - masterServer.setHost(hosts[hosts.length-1]); - masterServer.setPort(Integer.parseInt(hostAndPort[1])); - masterServers.add(masterServer); - } - return masterServers; - } - - /** - * get master server list map. - * @param zkNodeType zookeeper node type - * @return result : {host : resource info} - */ - public Map getServerMaps(ZKNodeType zkNodeType){ - - Map masterMap = new HashMap<>(); - try { - String path = getZNodeParentPath(zkNodeType); - List serverList = super.getChildrenKeys(path); - if(zkNodeType == ZKNodeType.WORKER){ - List workerList = new ArrayList<>(); - for(String group : serverList){ - List groupServers = super.getChildrenKeys(path + Constants.SLASH + group); - for(String groupServer : groupServers){ - workerList.add(group + Constants.SLASH + groupServer); - } - } - serverList = workerList; - } - for(String server : serverList){ - masterMap.putIfAbsent(server, super.get(path + Constants.SLASH + server)); - } - } catch (Exception e) { - logger.error("get server list failed", e); - } - - return masterMap; - } - - /** - * check the zookeeper node already exists - * @param host host - * @param zkNodeType zookeeper node type - * @return true if exists - */ - public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) { - String path = getZNodeParentPath(zkNodeType); - if(StringUtils.isEmpty(path)){ - logger.error("check zk node exists error, host:{}, zk node type:{}", - host, zkNodeType.toString()); - return false; - } - Map serverMaps = getServerMaps(zkNodeType); - for(String hostKey : serverMaps.keySet()){ - if(hostKey.contains(host)){ - return true; - } - } - return false; - } - - /** - * - * @return get worker node parent path - */ - protected String getWorkerZNodeParentPath(){ - return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS; - } - - /** - * - * @return get master node parent path - */ - protected String getMasterZNodeParentPath(){ - return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS; - } - - /** - * - * @return get master lock path - */ - public String getMasterLockPath(){ - return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS; - } - - /** - * - * @param zkNodeType zookeeper node type - * @return get zookeeper node parent path - */ - public String getZNodeParentPath(ZKNodeType zkNodeType) { - String path = ""; - switch (zkNodeType){ - case MASTER: - return getMasterZNodeParentPath(); - case WORKER: - return getWorkerZNodeParentPath(); - case DEAD_SERVER: - return getDeadZNodeParentPath(); - default: - break; - } - return path; - } - - /** - * - * @return get dead server node parent path - */ - protected String getDeadZNodeParentPath(){ - return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS; - } - - /** - * - * @return get master start up lock path - */ - public String getMasterStartUpLockPath(){ - return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS; - } - - /** - * - * @return get master failover lock path - */ - public String getMasterFailoverLockPath(){ - return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS; - } - - /** - * - * @return get worker failover lock path - */ - public String getWorkerFailoverLockPath(){ - return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS; - } - - /** - * release mutex - * @param mutex mutex - */ - public void releaseMutex(InterProcessMutex mutex) { - if (mutex != null){ - try { - mutex.release(); - } catch (Exception e) { - if("instance must be started before calling this method".equals(e.getMessage())){ - logger.warn("lock release"); - }else{ - logger.error("lock release failed",e); - } - - } - } - } - - /** - * init system znode - */ - protected void initSystemZNode(){ - try { - persist(getMasterZNodeParentPath(), ""); - persist(getWorkerZNodeParentPath(), ""); - persist(getDeadZNodeParentPath(), ""); - - logger.info("initialize server nodes success."); - } catch (Exception e) { - logger.error("init system znode failed",e); - } - } - - /** - * get host ip, string format: masterParentPath/ip - * @param path path - * @return host ip, string format: masterParentPath/ip - */ - protected String getHostByEventDataPath(String path) { - if(StringUtils.isEmpty(path)){ - logger.error("empty path!"); - return ""; - } - String[] pathArray = path.split(SINGLE_SLASH); - if(pathArray.length < 1){ - logger.error("parse ip error: {}", path); - return ""; - } - return pathArray[pathArray.length - 1]; - - } - - @Override - public String toString() { - return "AbstractZKClient{" + - "zkClient=" + getZkClient() + - ", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' + - ", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\'' + - ", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\'' + - '}'; - } -} \ No newline at end of file + private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class); + + /** + * remove dead server by host + * + * @param host host + * @param serverType serverType + */ + public void removeDeadServerByHost(String host, String serverType) { + List deadServers = super.getChildrenKeys(getDeadZNodeParentPath()); + for (String serverPath : deadServers) { + if (serverPath.startsWith(serverType + UNDERLINE + host)) { + String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath; + super.remove(server); + logger.info("{} server {} deleted from zk dead server path success", serverType, host); + } + } + } + + /** + * opType(add): if find dead server , then add to zk deadServerPath + * opType(delete): delete path from zk + * + * @param zNode node path + * @param zkNodeType master or worker + * @param opType delete or add + */ + public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) { + String host = getHostByEventDataPath(zNode); + String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX; + + //check server restart, if restart , dead server path in zk should be delete + if (opType.equals(DELETE_ZK_OP)) { + removeDeadServerByHost(host, type); + + } else if (opType.equals(ADD_ZK_OP)) { + String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host; + if (!super.isExisted(deadServerPath)) { + //add dead server info to zk dead server path : /dead-servers/ + + super.persist(deadServerPath, (type + UNDERLINE + host)); + + logger.info("{} server dead , and {} added to zk dead server path success", + zkNodeType, zNode); + } + } + + } + + /** + * get active master num + * + * @return active master number + */ + public int getActiveMasterNum() { + List childrenList = new ArrayList<>(); + try { + // read master node parent path from conf + if (super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))) { + childrenList = super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER)); + } + } catch (Exception e) { + logger.error("getActiveMasterNum error", e); + } + return childrenList.size(); + } + + /** + * @return zookeeper quorum + */ + public String getZookeeperQuorum() { + return getZookeeperConfig().getServerList(); + } + + /** + * get server list. + * + * @param zkNodeType zookeeper node type + * @return server list + */ + public List getServersList(ZKNodeType zkNodeType) { + Map masterMap = getServerMaps(zkNodeType); + String parentPath = getZNodeParentPath(zkNodeType); + + List masterServers = new ArrayList<>(); + for (Map.Entry entry : masterMap.entrySet()) { + Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue()); + if (masterServer == null) { + continue; + } + String key = entry.getKey(); + masterServer.setZkDirectory(parentPath + "/" + key); + //set host and port + String[] hostAndPort = key.split(COLON); + String[] hosts = hostAndPort[0].split(DIVISION_STRING); + // fetch the last one + masterServer.setHost(hosts[hosts.length - 1]); + masterServer.setPort(Integer.parseInt(hostAndPort[1])); + masterServers.add(masterServer); + } + return masterServers; + } + + /** + * get master server list map. + * + * @param zkNodeType zookeeper node type + * @return result : {host : resource info} + */ + public Map getServerMaps(ZKNodeType zkNodeType) { + + Map masterMap = new HashMap<>(); + try { + String path = getZNodeParentPath(zkNodeType); + List serverList = super.getChildrenKeys(path); + if (zkNodeType == ZKNodeType.WORKER) { + List workerList = new ArrayList<>(); + for (String group : serverList) { + List groupServers = super.getChildrenKeys(path + Constants.SLASH + group); + for (String groupServer : groupServers) { + workerList.add(group + Constants.SLASH + groupServer); + } + } + serverList = workerList; + } + for (String server : serverList) { + masterMap.putIfAbsent(server, super.get(path + Constants.SLASH + server)); + } + } catch (Exception e) { + logger.error("get server list failed", e); + } + + return masterMap; + } + + /** + * check the zookeeper node already exists + * + * @param host host + * @param zkNodeType zookeeper node type + * @return true if exists + */ + public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) { + String path = getZNodeParentPath(zkNodeType); + if (StringUtils.isEmpty(path)) { + logger.error("check zk node exists error, host:{}, zk node type:{}", + host, zkNodeType); + return false; + } + Map serverMaps = getServerMaps(zkNodeType); + for (String hostKey : serverMaps.keySet()) { + if (hostKey.contains(host)) { + return true; + } + } + return false; + } + + /** + * @return get worker node parent path + */ + protected String getWorkerZNodeParentPath() { + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS; + } + + /** + * @return get master node parent path + */ + protected String getMasterZNodeParentPath() { + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS; + } + + /** + * @return get master lock path + */ + public String getMasterLockPath() { + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS; + } + + /** + * @param zkNodeType zookeeper node type + * @return get zookeeper node parent path + */ + public String getZNodeParentPath(ZKNodeType zkNodeType) { + String path = ""; + switch (zkNodeType) { + case MASTER: + return getMasterZNodeParentPath(); + case WORKER: + return getWorkerZNodeParentPath(); + case DEAD_SERVER: + return getDeadZNodeParentPath(); + default: + break; + } + return path; + } + + /** + * @return get dead server node parent path + */ + protected String getDeadZNodeParentPath() { + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS; + } + + /** + * @return get master start up lock path + */ + public String getMasterStartUpLockPath() { + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS; + } + + /** + * @return get master failover lock path + */ + public String getMasterFailoverLockPath() { + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS; + } + + /** + * @return get worker failover lock path + */ + public String getWorkerFailoverLockPath() { + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS; + } + + /** + * release mutex + * + * @param mutex mutex + */ + public void releaseMutex(InterProcessMutex mutex) { + if (mutex != null) { + try { + mutex.release(); + } catch (Exception e) { + if ("instance must be started before calling this method".equals(e.getMessage())) { + logger.warn("lock release"); + } else { + logger.error("lock release failed", e); + } + + } + } + } + + /** + * init system znode + */ + protected void initSystemZNode() { + try { + persist(getMasterZNodeParentPath(), ""); + persist(getWorkerZNodeParentPath(), ""); + persist(getDeadZNodeParentPath(), ""); + + logger.info("initialize server nodes success."); + } catch (Exception e) { + logger.error("init system znode failed", e); + } + } + + /** + * get host ip, string format: masterParentPath/ip + * + * @param path path + * @return host ip, string format: masterParentPath/ip + */ + protected String getHostByEventDataPath(String path) { + if (StringUtils.isEmpty(path)) { + logger.error("empty path!"); + return ""; + } + String[] pathArray = path.split(SINGLE_SLASH); + if (pathArray.length < 1) { + logger.error("parse ip error: {}", path); + return ""; + } + return pathArray[pathArray.length - 1]; + + } + + @Override + public String toString() { + return "AbstractZKClient{" + + "zkClient=" + getZkClient() + + ", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' + + ", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\'' + + ", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\'' + + '}'; + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java index 5a04c5a23b..e25a22f031 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java @@ -14,9 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.service.zk; -import org.apache.commons.lang.StringUtils; +import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; + +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.service.exceptions.ServiceException; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; @@ -25,18 +30,16 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.TimeUnit; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; - /** * Shared Curator zookeeper client */ @@ -49,7 +52,6 @@ public class CuratorZookeeperClient implements InitializingBean { private CuratorFramework zkClient; - @Override public void afterPropertiesSet() throws Exception { this.zkClient = buildClient(); @@ -91,7 +93,7 @@ public class CuratorZookeeperClient implements InitializingBean { zkClient.blockUntilConnected(30, TimeUnit.SECONDS); } catch (final Exception ex) { - throw new RuntimeException(ex); + throw new ServiceException(ex); } return zkClient; } @@ -123,4 +125,4 @@ public class CuratorZookeeperClient implements InitializingBean { public CuratorFramework getZkClient() { return zkClient; } -} \ No newline at end of file +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java index c7a53ebdc0..7ac23a3c4d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java @@ -14,19 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.service.zk; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.service.exceptions.ServiceException; + import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooKeeperServerMain; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * just speed experience version @@ -51,10 +54,10 @@ public class ZKServer { ZKServer zkServer; if (args.length == 0) { zkServer = new ZKServer(); - } else if (args.length == 1){ - zkServer = new ZKServer(Integer.valueOf(args[0]), ""); + } else if (args.length == 1) { + zkServer = new ZKServer(Integer.parseInt(args[0]), ""); } else { - zkServer = new ZKServer(Integer.valueOf(args[0]), args[1]); + zkServer = new ZKServer(Integer.parseInt(args[0]), args[1]); } zkServer.registerHook(); zkServer.start(); @@ -73,7 +76,7 @@ public class ZKServer { } private void registerHook() { - /** + /* * register hooks, which are called before the process exits */ Runtime.getRuntime().addShutdownHook(new Thread(this::stop)); @@ -90,7 +93,7 @@ public class ZKServer { } } - public boolean isStarted(){ + public boolean isStarted() { return isStarted.get(); } @@ -119,19 +122,19 @@ public class ZKServer { if (file.exists()) { logger.warn("The path of zk server exists"); } - logger.info("zk server starting, data dir path:{}" , zkDataDir); - startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME,"60"); + logger.info("zk server starting, data dir path:{}", zkDataDir); + startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME, "60"); } /** * Starts a local Zk instance * - * @param port The port to listen on + * @param port The port to listen on * @param dataDirPath The path for the Zk data directory - * @param tickTime zk tick time - * @param maxClientCnxns zk max client connections + * @param tickTime zk tick time + * @param maxClientCnxns zk max client connections */ - private void startLocalZkServer(final int port, final String dataDirPath,final int tickTime,String maxClientCnxns) { + private void startLocalZkServer(final int port, final String dataDirPath, final int tickTime, String maxClientCnxns) { if (isStarted.compareAndSet(false, true)) { zooKeeperServerMain = new PublicZooKeeperServerMain(); logger.info("Zookeeper data path : {} ", dataDirPath); @@ -144,8 +147,7 @@ public class ZKServer { zooKeeperServerMain.initializeAndRun(args); } catch (QuorumPeerConfig.ConfigException | IOException e) { - logger.warn("Caught exception while starting ZK", e); - throw new RuntimeException(e); + throw new ServiceException("Caught exception while starting ZK", e); } } } @@ -159,7 +161,7 @@ public class ZKServer { logger.info("zk server stopped"); } catch (Exception e) { - logger.error("Failed to stop ZK ",e); + logger.error("Failed to stop ZK ", e); } } @@ -180,8 +182,7 @@ public class ZKServer { org.apache.commons.io.FileUtils.deleteDirectory(new File(dataDir)); } } catch (Exception e) { - logger.warn("Caught exception while stopping ZK server", e); - throw new RuntimeException(e); + throw new ServiceException("Caught exception while starting ZK", e); } } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java index 6dfce79a3a..88c339b045 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java @@ -14,21 +14,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.service.zk; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; + +import java.nio.charset.StandardCharsets; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; -import java.nio.charset.StandardCharsets; - @Component public class ZookeeperCachedOperator extends ZookeeperOperator { @@ -36,6 +39,7 @@ public class ZookeeperCachedOperator extends ZookeeperOperator { private TreeCache treeCache; + /** * register a unified listener of /${dsRoot}, */ @@ -59,14 +63,16 @@ public class ZookeeperCachedOperator extends ZookeeperOperator { treeCache.start(); } catch (Exception e) { logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot()); - throw new RuntimeException(e); + throw new ServiceException(e); } } //for sub class - protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path){} + protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) { + // Used by sub class + } - public String getFromCache(final String cachePath, final String key) { + public String getFromCache(final String key) { ChildData resultInCache = treeCache.getCurrentData(key); if (null != resultInCache) { return null == resultInCache.getData() ? null : new String(resultInCache.getData(), StandardCharsets.UTF_8); @@ -74,11 +80,11 @@ public class ZookeeperCachedOperator extends ZookeeperOperator { return null; } - public TreeCache getTreeCache(final String cachePath) { + public TreeCache getTreeCache() { return treeCache; } - public void addListener(TreeCacheListener listener){ + public void addListener(TreeCacheListener listener) { this.treeCache.getListenable().addListener(listener); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java index e7b049f8bf..8a219837b7 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java @@ -14,13 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.service.zk; -import org.apache.commons.lang.StringUtils; +import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; + +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.service.exceptions.ServiceException; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; -import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; @@ -29,18 +33,16 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; + +import java.nio.charset.StandardCharsets; +import java.util.List; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; - /** * zk base operator */ @@ -64,19 +66,23 @@ public class ZookeeperOperator implements InitializingBean { /** * this method is for sub class, */ - protected void registerListener(){} + protected void registerListener() { + // Used by sub class + } - protected void treeCacheStart(){} + protected void treeCacheStart() { + // Used by sub class + } public void initStateLister() { checkNotNull(zkClient); zkClient.getConnectionStateListenable().addListener((client, newState) -> { - if(newState == ConnectionState.LOST){ + if (newState == ConnectionState.LOST) { logger.error("connection lost from zookeeper"); - } else if(newState == ConnectionState.RECONNECTED){ + } else if (newState == ConnectionState.RECONNECTED) { logger.info("reconnected to zookeeper"); - } else if(newState == ConnectionState.SUSPENDED){ + } else if (newState == ConnectionState.SUSPENDED) { logger.warn("connection SUSPENDED to zookeeper"); } }); @@ -85,7 +91,8 @@ public class ZookeeperOperator implements InitializingBean { private CuratorFramework buildClient() { logger.info("zookeeper registry center init, server lists is: {}.", zookeeperConfig.getServerList()); - CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper quorum can't be null"))) + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(), + "zookeeper quorum can't be null"))) .retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs())); //these has default value @@ -114,7 +121,7 @@ public class ZookeeperOperator implements InitializingBean { try { zkClient.blockUntilConnected(); } catch (final Exception ex) { - throw new RuntimeException(ex); + throw new ServiceException(ex); } return zkClient; } @@ -138,12 +145,12 @@ public class ZookeeperOperator implements InitializingBean { throw new IllegalStateException(ex); } catch (Exception ex) { logger.error("getChildrenKeys key : {}", key, ex); - throw new RuntimeException(ex); + throw new ServiceException(ex); } } - public boolean hasChildren(final String key){ - Stat stat ; + public boolean hasChildren(final String key) { + Stat stat; try { stat = zkClient.checkExists().forPath(key); return stat.getNumChildren() >= 1; @@ -241,4 +248,4 @@ public class ZookeeperOperator implements InitializingBean { public void close() { CloseableUtils.closeQuietly(zkClient); } -} \ No newline at end of file +}