Browse Source

[Improvement][service] Refactor service module to fix code smell (#4513)

* chore: Refactor code to fix code smell in service module

* chore: Add licence header to new files

* chore: Fix comment from code review

* chore[service]: Reduce the number of custom runtime exception to one
pull/3/MERGE
Segun Ogundipe 3 years ago committed by GitHub
parent
commit
e083e28720
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
  2. 5
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
  3. 52
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/ServiceException.java
  4. 46
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java
  5. 75
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  6. 15
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
  7. 154
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
  8. 48
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java
  9. 147
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
  10. 20
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java
  11. 31
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
  12. 20
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
  13. 43
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java

@ -61,4 +61,8 @@ public class StringUtils {
public static String trim(String str) {
return str == null ? null : str.trim();
}
public static boolean equalsIgnoreCase(String str1, String str2) {
return str1 == null ? str2 == null : str1.equalsIgnoreCase(str2);
}
}

5
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.bean;
import org.springframework.beans.BeansException;
@ -31,9 +32,7 @@ public class SpringApplicationContext implements ApplicationContextAware {
SpringApplicationContext.applicationContext = applicationContext;
}
public static <T> T getBean(Class<T> requiredType){
public static <T> T getBean(Class<T> requiredType) {
return applicationContext.getBean(requiredType);
}
}

52
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/ServiceException.java

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.exceptions;
/**
* Custom ZKServerException exception
*/
public class ServiceException extends RuntimeException {
/**
* Construct a new runtime exception with the error message
*
* @param errMsg Error message
*/
public ServiceException(String errMsg) {
super(errMsg);
}
/**
* Construct a new runtime exception with the cause
*
* @param cause cause
*/
public ServiceException(Throwable cause) {
super(cause);
}
/**
* Construct a new runtime exception with the detail message and cause
*
* @param errMsg message
* @param cause cause
*/
public ServiceException(String errMsg, Throwable cause) {
super(errMsg, cause);
}
}

46
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.permission;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
@ -21,11 +22,13 @@ import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.util.List;
import org.slf4j.Logger;
public class PermissionCheck<T> {
/**
* logger
@ -58,6 +61,7 @@ public class PermissionCheck<T> {
/**
* permission check
*
* @param authorizationType authorization type
* @param processService process dao
*/
@ -68,10 +72,6 @@ public class PermissionCheck<T> {
/**
* permission check
* @param authorizationType
* @param processService
* @param needChecks
* @param userId
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, T[] needChecks, int userId) {
this.authorizationType = authorizationType;
@ -82,11 +82,6 @@ public class PermissionCheck<T> {
/**
* permission check
* @param authorizationType
* @param processService
* @param needChecks
* @param userId
* @param logger
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, T[] needChecks, int userId, Logger logger) {
this.authorizationType = authorizationType;
@ -98,13 +93,8 @@ public class PermissionCheck<T> {
/**
* permission check
* @param logger
* @param authorizationType
* @param processService
* @param resourceList
* @param userId
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, List<ResourceInfo> resourceList, int userId,Logger logger) {
public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, List<ResourceInfo> resourceList, int userId, Logger logger) {
this.authorizationType = authorizationType;
this.processService = processService;
this.resourceList = resourceList;
@ -154,9 +144,10 @@ public class PermissionCheck<T> {
/**
* has permission
*
* @return true if has permission
*/
public boolean hasPermission(){
public boolean hasPermission() {
try {
checkPermission();
return true;
@ -167,23 +158,24 @@ public class PermissionCheck<T> {
/**
* check permission
* @throws Exception exception
*
* @throws ServiceException exception
*/
public void checkPermission() throws Exception{
if(this.needChecks.length > 0){
public void checkPermission() throws ServiceException {
if (this.needChecks.length > 0) {
// get user type in order to judge whether the user is admin
User user = processService.getUserById(userId);
if (user == null) {
logger.error("user id {} didn't exist",userId);
throw new RuntimeException(String.format("user %s didn't exist",userId));
logger.error("user id {} doesn't exist", userId);
throw new ServiceException(String.format("user %s doesn't exist", userId));
}
if (user.getUserType() != UserType.ADMIN_USER){
List<T> unauthorizedList = processService.listUnauthorized(userId,needChecks,authorizationType);
if (user.getUserType() != UserType.ADMIN_USER) {
List<T> unauthorizedList = processService.listUnauthorized(userId, needChecks, authorizationType);
// if exist unauthorized resource
if(CollectionUtils.isNotEmpty(unauthorizedList)){
logger.error("user {} didn't has permission of {}: {}", user.getUserName(), authorizationType.getDescp(),unauthorizedList);
throw new RuntimeException(String.format("user %s didn't has permission of %s %s", user.getUserName(), authorizationType.getDescp(), unauthorizedList.get(0)));
if (CollectionUtils.isNotEmpty(unauthorizedList)) {
logger.error("user {} doesn't have permission of {}: {}", user.getUserName(), authorizationType.getDescp(), unauthorizedList);
throw new ServiceException(String.format("user %s doesn't have permission of %s %s", user.getUserName(), authorizationType.getDescp(), unauthorizedList.get(0)));
}
}
}

75
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -84,6 +84,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -110,7 +111,7 @@ public class ProcessService {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final int[] stateArray = new int[] {ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
@ -167,7 +168,7 @@ public class ProcessService {
@Transactional(rollbackFor = Exception.class)
public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) {
ProcessInstance processInstance = constructProcessInstance(command, host);
//cannot construct process instance, return null;
// cannot construct process instance, return null
if (processInstance == null) {
logger.error("scan command, command parameter is error: {}", command);
moveToErrorCommand(command, "process instance is null");
@ -259,7 +260,7 @@ public class ProcessService {
*/
public Boolean verifyIsNeedCreateCommand(Command command) {
Boolean isNeedCreate = true;
Map<CommandType, Integer> cmdTypeMap = new HashMap<CommandType, Integer>();
EnumMap<CommandType, Integer> cmdTypeMap = new EnumMap<>(CommandType.class);
cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1);
cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);
cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1);
@ -296,9 +297,6 @@ public class ProcessService {
/**
* get task node list by definitionId
*
* @param defineId
* @return
*/
public List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId) {
ProcessDefinition processDefinition = processDefineMapper.selectById(defineId);
@ -435,7 +433,7 @@ public class ProcessService {
List<TaskNode> taskNodeList = processData.getTasks();
if (taskNodeList != null && taskNodeList.size() > 0) {
if (taskNodeList != null && !taskNodeList.isEmpty()) {
for (TaskNode taskNode : taskNodeList) {
String parameter = taskNode.getParams();
@ -514,11 +512,9 @@ public class ProcessService {
*/
private Date getScheduleTime(Command command, Map<String, String> cmdParam) {
Date scheduleTime = command.getScheduleTime();
if (scheduleTime == null) {
if (cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
if (scheduleTime == null && cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
scheduleTime = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
}
}
return scheduleTime;
}
@ -714,7 +710,7 @@ public class ProcessService {
// generate one new process instance
processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
}
if (!checkCmdParam(command, cmdParam)) {
if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) {
logger.error("command parameter check failed!");
return null;
}
@ -922,13 +918,12 @@ public class ProcessService {
*/
private void initTaskInstance(TaskInstance taskInstance) {
if (!taskInstance.isSubProcess()) {
if (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure()) {
if (!taskInstance.isSubProcess()
&& (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) {
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
return;
}
}
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
updateTaskInstance(taskInstance);
}
@ -1049,10 +1044,6 @@ public class ProcessService {
/**
* complement data needs transform parent parameter to child.
*
* @param instanceMap
* @param parentProcessInstance
* @return
*/
private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance) {
// set sub work process command
@ -1071,11 +1062,6 @@ public class ProcessService {
/**
* create sub work process command
*
* @param parentProcessInstance
* @param childInstance
* @param instanceMap
* @param task
*/
public Command createSubProcessCommand(ProcessInstance parentProcessInstance,
ProcessInstance childInstance,
@ -1105,8 +1091,6 @@ public class ProcessService {
/**
* initialize sub work flow state
* child instance state would be initialized when 'recovery from pause/stop/failure'
*
* @param childInstance
*/
private void initSubInstanceState(ProcessInstance childInstance) {
if (childInstance != null) {
@ -1119,9 +1103,6 @@ public class ProcessService {
* get sub work flow command type
* child instance exist: child command = fatherCommand
* child instance not exists: child command = fatherCommand[0]
*
* @param parentProcessInstance
* @return
*/
private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) {
CommandType commandType = parentProcessInstance.getCommandType();
@ -1577,7 +1558,7 @@ public class ProcessService {
if (intList == null) {
return new ArrayList<>();
}
List<String> result = new ArrayList<String>(intList.size());
List<String> result = new ArrayList<>(intList.size());
for (Integer intVar : intList) {
result.add(String.valueOf(intVar));
}
@ -1727,8 +1708,8 @@ public class ProcessService {
* @throws Exception if error throws Exception
*/
public CycleDependency getCycleDependency(int masterId, int processDefinitionId, Date scheduledFireTime) throws Exception {
List<CycleDependency> list = getCycleDependencies(masterId, new int[] {processDefinitionId}, scheduledFireTime);
return list.size() > 0 ? list.get(0) : null;
List<CycleDependency> list = getCycleDependencies(masterId, new int[]{processDefinitionId}, scheduledFireTime);
return !list.isEmpty() ? list.get(0) : null;
}
@ -1742,7 +1723,7 @@ public class ProcessService {
* @throws Exception if error throws Exception
*/
public List<CycleDependency> getCycleDependencies(int masterId, int[] ids, Date scheduledFireTime) throws Exception {
List<CycleDependency> cycleDependencyList = new ArrayList<CycleDependency>();
List<CycleDependency> cycleDependencyList = new ArrayList<>();
if (null == ids || ids.length == 0) {
logger.warn("ids[] is empty!is invalid!");
return cycleDependencyList;
@ -1769,14 +1750,10 @@ public class ProcessService {
}
Calendar calendar = Calendar.getInstance();
switch (cycleEnum) {
/*case MINUTE:
calendar.add(Calendar.MINUTE,-61);*/
case HOUR:
calendar.add(Calendar.HOUR, -25);
break;
case DAY:
calendar.add(Calendar.DATE, -32);
break;
case WEEK:
calendar.add(Calendar.DATE, -32);
break;
@ -1784,7 +1761,8 @@ public class ProcessService {
calendar.add(Calendar.MONTH, -13);
break;
default:
logger.warn("Dependent process definition's cycleEnum is {},not support!!", cycleEnum.name());
String cycleName = cycleEnum.name();
logger.warn("Dependent process definition's cycleEnum is {},not support!!", cycleName);
continue;
}
Date start = calendar.getTime();
@ -1794,7 +1772,7 @@ public class ProcessService {
} else {
list = CronUtils.getFireDateList(start, scheduledFireTime, depCronExpression);
}
if (list.size() >= 1) {
if (!list.isEmpty()) {
start = list.get(list.size() - 1);
CycleDependency dependency = new CycleDependency(depSchedule.getProcessDefinitionId(), start, CronUtils.getExpirationTime(start, cycleEnum), cycleEnum);
cycleDependencyList.add(dependency);
@ -1867,6 +1845,7 @@ public class ProcessService {
/**
* query project name and user name by processInstanceId.
*
* @param processInstanceId processInstanceId
* @return projectName and userName
*/
@ -1939,30 +1918,27 @@ public class ProcessService {
* @return unauthorized udf function list
*/
public <T> List<T> listUnauthorized(int userId, T[] needChecks, AuthorizationType authorizationType) {
List<T> resultList = new ArrayList<T>();
List<T> resultList = new ArrayList<>();
if (Objects.nonNull(needChecks) && needChecks.length > 0) {
Set<T> originResSet = new HashSet<T>(Arrays.asList(needChecks));
Set<T> originResSet = new HashSet<>(Arrays.asList(needChecks));
switch (authorizationType) {
case RESOURCE_FILE_ID:
Set<Integer> authorizedResourceFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
case UDF_FILE:
Set<Integer> authorizedResourceFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(Resource::getId).collect(toSet());
originResSet.removeAll(authorizedResourceFiles);
break;
case RESOURCE_FILE_NAME:
Set<String> authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> t.getFullName()).collect(toSet());
Set<String> authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(Resource::getFullName).collect(toSet());
originResSet.removeAll(authorizedResources);
break;
case UDF_FILE:
Set<Integer> authorizedUdfFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
originResSet.removeAll(authorizedUdfFiles);
break;
case DATASOURCE:
Set<Integer> authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
Set<Integer> authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId, needChecks).stream().map(DataSource::getId).collect(toSet());
originResSet.removeAll(authorizedDatasources);
break;
case UDF:
Set<Integer> authorizedUdfs = udfFuncMapper.listAuthorizedUdfFunc(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
Set<Integer> authorizedUdfs = udfFuncMapper.listAuthorizedUdfFunc(userId, needChecks).stream().map(UdfFunc::getId).collect(toSet());
originResSet.removeAll(authorizedUdfs);
break;
default:
@ -2007,9 +1983,6 @@ public class ProcessService {
/**
* format task app id in task instance
*
* @param taskInstance
* @return
*/
public String formatTaskAppId(TaskInstance taskInstance) {
ProcessDefinition definition = this.findProcessDefineById(taskInstance.getProcessDefinitionId());

15
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java

@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.quartz;
package org.apache.dolphinscheduler.service.quartz;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -25,6 +25,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
@ -34,8 +37,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.util.Date;
/**
* process schedule job
*/
@ -46,7 +47,7 @@ public class ProcessScheduleJob implements Job {
*/
private static final Logger logger = LoggerFactory.getLogger(ProcessScheduleJob.class);
public ProcessService getProcessService(){
public ProcessService getProcessService() {
return SpringApplicationContext.getBean(ProcessService.class);
}
@ -66,10 +67,8 @@ public class ProcessScheduleJob implements Job {
int projectId = dataMap.getInt(Constants.PROJECT_ID);
int scheduleId = dataMap.getInt(Constants.SCHEDULE_ID);
Date scheduledFireTime = context.getScheduledFireTime();
Date fireTime = context.getFireTime();
logger.info("scheduled fire time :{}, fire time :{}, process id :{}", scheduledFireTime, fireTime, scheduleId);
@ -82,11 +81,10 @@ public class ProcessScheduleJob implements Job {
return;
}
ProcessDefinition processDefinition = getProcessService().findProcessDefineById(schedule.getProcessDefinitionId());
// release state : online/offline
ReleaseState releaseState = processDefinition.getReleaseState();
if (processDefinition == null || releaseState == ReleaseState.OFFLINE) {
if (releaseState == ReleaseState.OFFLINE) {
logger.warn("process definition does not exist in db or offline,need not to create command, projectId:{}, processId:{}", projectId, scheduleId);
return;
}
@ -107,7 +105,6 @@ public class ProcessScheduleJob implements Job {
getProcessService().createCommand(command);
}
/**
* delete job
*/

154
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java

@ -14,15 +14,76 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.quartz;
import static org.apache.dolphinscheduler.common.Constants.ORG_POSTGRESQL_DRIVER;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_CLASS;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_DATASOURCE;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_ISCLUSTERED;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_CLASS;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_THREADCOUNT;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY;
import static org.apache.dolphinscheduler.common.Constants.PROJECT_ID;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_ACQUIRETRIGGERSWITHINLOCK;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_CLUSTERCHECKININTERVAL;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_DATASOURCE;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_INSTANCEID;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_INSTANCENAME;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_GROUP_PRIFIX;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_PRIFIX;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_MISFIRETHRESHOLD;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_PROPERTIES_PATH;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_TABLE_PREFIX;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_THREADCOUNT;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_THREADPRIORITY;
import static org.apache.dolphinscheduler.common.Constants.SCHEDULE;
import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_ID;
import static org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME;
import static org.apache.dolphinscheduler.common.Constants.STRING_FALSE;
import static org.apache.dolphinscheduler.common.Constants.STRING_TRUE;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.quartz.CronScheduleBuilder.cronSchedule;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.quartz.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.quartz.CronTrigger;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.jdbcjobstore.JobStoreTX;
import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
@ -32,15 +93,6 @@ import org.quartz.simpl.SimpleThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.dolphinscheduler.common.Constants.*;
import static org.quartz.CronScheduleBuilder.cronSchedule;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;
/**
* single Quartz executors instance
*/
@ -70,28 +122,27 @@ public class QuartzExecutors {
private static final QuartzExecutors instance = new QuartzExecutors();
}
private QuartzExecutors() {
try {
conf = new PropertiesConfiguration(QUARTZ_PROPERTIES_PATH);
init();
}catch (ConfigurationException e){
logger.warn("not loaded quartz configuration file, will used default value",e);
} catch (ConfigurationException e) {
logger.warn("not loaded quartz configuration file, will used default value", e);
}
}
/**
* thread safe and performance promote
*
* @return instance of Quartz Executors
*/
public static QuartzExecutors getInstance() {
return Holder.instance;
}
/**
* init
*
* <p>
* Returns a client-usable handle to a Scheduler.
*/
private void init() {
@ -100,33 +151,33 @@ public class QuartzExecutors {
Properties properties = new Properties();
String dataSourceDriverClass = org.apache.dolphinscheduler.dao.utils.PropertyUtils.getString(SPRING_DATASOURCE_DRIVER_CLASS_NAME);
if (dataSourceDriverClass.equals(ORG_POSTGRESQL_DRIVER)){
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, PostgreSQLDelegate.class.getName()));
if (dataSourceDriverClass.equals(ORG_POSTGRESQL_DRIVER)) {
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, PostgreSQLDelegate.class.getName()));
} else {
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, StdJDBCDelegate.class.getName()));
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, StdJDBCDelegate.class.getName()));
}
properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCENAME, conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCENAME, QUARTZ_INSTANCENAME));
properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCEID, conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCEID, QUARTZ_INSTANCEID));
properties.setProperty(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,conf.getString(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,STRING_TRUE));
properties.setProperty(ORG_QUARTZ_JOBSTORE_USEPROPERTIES,conf.getString(ORG_QUARTZ_JOBSTORE_USEPROPERTIES,STRING_FALSE));
properties.setProperty(ORG_QUARTZ_THREADPOOL_CLASS,conf.getString(ORG_QUARTZ_THREADPOOL_CLASS, SimpleThreadPool.class.getName()));
properties.setProperty(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,conf.getString(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,STRING_TRUE));
properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADCOUNT,conf.getString(ORG_QUARTZ_THREADPOOL_THREADCOUNT, QUARTZ_THREADCOUNT));
properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADPRIORITY,conf.getString(ORG_QUARTZ_THREADPOOL_THREADPRIORITY, QUARTZ_THREADPRIORITY));
properties.setProperty(ORG_QUARTZ_JOBSTORE_CLASS,conf.getString(ORG_QUARTZ_JOBSTORE_CLASS, JobStoreTX.class.getName()));
properties.setProperty(ORG_QUARTZ_JOBSTORE_TABLEPREFIX,conf.getString(ORG_QUARTZ_JOBSTORE_TABLEPREFIX, QUARTZ_TABLE_PREFIX));
properties.setProperty(ORG_QUARTZ_JOBSTORE_ISCLUSTERED,conf.getString(ORG_QUARTZ_JOBSTORE_ISCLUSTERED,STRING_TRUE));
properties.setProperty(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD,conf.getString(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, QUARTZ_MISFIRETHRESHOLD));
properties.setProperty(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL,conf.getString(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, QUARTZ_CLUSTERCHECKININTERVAL));
properties.setProperty(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK,conf.getString(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, QUARTZ_ACQUIRETRIGGERSWITHINLOCK));
properties.setProperty(ORG_QUARTZ_JOBSTORE_DATASOURCE,conf.getString(ORG_QUARTZ_JOBSTORE_DATASOURCE, QUARTZ_DATASOURCE));
properties.setProperty(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,conf.getString(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,DruidConnectionProvider.class.getName()));
properties.setProperty(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON, conf.getString(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON, STRING_TRUE));
properties.setProperty(ORG_QUARTZ_JOBSTORE_USEPROPERTIES, conf.getString(ORG_QUARTZ_JOBSTORE_USEPROPERTIES, STRING_FALSE));
properties.setProperty(ORG_QUARTZ_THREADPOOL_CLASS, conf.getString(ORG_QUARTZ_THREADPOOL_CLASS, SimpleThreadPool.class.getName()));
properties.setProperty(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS, conf.getString(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS, STRING_TRUE));
properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADCOUNT, conf.getString(ORG_QUARTZ_THREADPOOL_THREADCOUNT, QUARTZ_THREADCOUNT));
properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADPRIORITY, conf.getString(ORG_QUARTZ_THREADPOOL_THREADPRIORITY, QUARTZ_THREADPRIORITY));
properties.setProperty(ORG_QUARTZ_JOBSTORE_CLASS, conf.getString(ORG_QUARTZ_JOBSTORE_CLASS, JobStoreTX.class.getName()));
properties.setProperty(ORG_QUARTZ_JOBSTORE_TABLEPREFIX, conf.getString(ORG_QUARTZ_JOBSTORE_TABLEPREFIX, QUARTZ_TABLE_PREFIX));
properties.setProperty(ORG_QUARTZ_JOBSTORE_ISCLUSTERED, conf.getString(ORG_QUARTZ_JOBSTORE_ISCLUSTERED, STRING_TRUE));
properties.setProperty(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, conf.getString(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, QUARTZ_MISFIRETHRESHOLD));
properties.setProperty(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, conf.getString(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, QUARTZ_CLUSTERCHECKININTERVAL));
properties.setProperty(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, conf.getString(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, QUARTZ_ACQUIRETRIGGERSWITHINLOCK));
properties.setProperty(ORG_QUARTZ_JOBSTORE_DATASOURCE, conf.getString(ORG_QUARTZ_JOBSTORE_DATASOURCE, QUARTZ_DATASOURCE));
properties.setProperty(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS, conf.getString(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS, DruidConnectionProvider.class.getName()));
schedulerFactory.initialize(properties);
scheduler = schedulerFactory.getScheduler();
} catch (SchedulerException e) {
logger.error(e.getMessage(),e);
logger.error(e.getMessage(), e);
System.exit(1);
}
@ -138,19 +189,20 @@ public class QuartzExecutors {
* @throws SchedulerException scheduler exception
*/
public void start() throws SchedulerException {
if (!scheduler.isStarted()){
if (!scheduler.isStarted()) {
scheduler.start();
logger.info("Quartz service started" );
logger.info("Quartz service started");
}
}
/**
* stop all scheduled tasks
*
* <p>
* Halts the Scheduler's firing of Triggers,
* and cleans up all resources associated with the Scheduler.
*
* <p>
* The scheduler cannot be re-started.
*
* @throws SchedulerException scheduler exception
*/
public void shutdown() throws SchedulerException {
@ -161,7 +213,6 @@ public class QuartzExecutors {
}
}
/**
* add task trigger , if this task already exists, return this task with updated trigger
*
@ -173,7 +224,7 @@ public class QuartzExecutors {
* @param cronExpression cron expression
* @param jobDataMap job parameters data map
*/
public void addJob(Class<? extends Job> clazz,String jobName,String jobGroupName,Date startDate, Date endDate,
public void addJob(Class<? extends Job> clazz, String jobName, String jobGroupName, Date startDate, Date endDate,
String cronExpression,
Map<String, Object> jobDataMap) {
lock.writeLock().lock();
@ -218,7 +269,7 @@ public class QuartzExecutors {
CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
String oldCronExpression = oldCronTrigger.getCronExpression();
if (!StringUtils.equalsIgnoreCase(cronExpression,oldCronExpression)) {
if (!StringUtils.equalsIgnoreCase(cronExpression, oldCronExpression)) {
// reschedule job trigger
scheduler.rescheduleJob(triggerKey, cronTrigger);
logger.info("reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
@ -231,14 +282,12 @@ public class QuartzExecutors {
}
} catch (Exception e) {
logger.error("add job failed", e);
throw new RuntimeException("add job failed", e);
throw new ServiceException("add job failed", e);
} finally {
lock.writeLock().unlock();
}
}
/**
* delete job
*
@ -249,16 +298,16 @@ public class QuartzExecutors {
public boolean deleteJob(String jobName, String jobGroupName) {
lock.writeLock().lock();
try {
JobKey jobKey = new JobKey(jobName,jobGroupName);
if(scheduler.checkExists(jobKey)){
JobKey jobKey = new JobKey(jobName, jobGroupName);
if (scheduler.checkExists(jobKey)) {
logger.info("try to delete job, job name: {}, job group name: {},", jobName, jobGroupName);
return scheduler.deleteJob(jobKey);
}else {
} else {
return true;
}
} catch (SchedulerException e) {
logger.error("delete job : {} failed",jobName, e);
logger.error("delete job : {} failed", jobName, e);
} finally {
lock.writeLock().unlock();
}
@ -269,7 +318,6 @@ public class QuartzExecutors {
* delete all jobs in job group
*
* @param jobGroupName job group name
*
* @return true if all of the Jobs were found and deleted, false if
* one or more were not deleted.
*/
@ -282,7 +330,7 @@ public class QuartzExecutors {
return scheduler.deleteJobs(jobKeys);
} catch (SchedulerException e) {
logger.error("delete all jobs in job group: {} failed",jobGroupName, e);
logger.error("delete all jobs in job group: {} failed", jobGroupName, e);
} finally {
lock.writeLock().unlock();
}
@ -291,6 +339,7 @@ public class QuartzExecutors {
/**
* build job name
*
* @param processId process id
* @return job name
*/
@ -302,6 +351,7 @@ public class QuartzExecutors {
/**
* build job group name
*
* @param projectId project id
* @return job group name
*/

48
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java

@ -14,13 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.quartz.cron;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import com.cronutils.model.Cron;
import com.cronutils.model.field.CronField;
import com.cronutils.model.field.CronFieldName;
import com.cronutils.model.field.expression.*;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import com.cronutils.model.field.expression.Always;
import com.cronutils.model.field.expression.And;
import com.cronutils.model.field.expression.Between;
import com.cronutils.model.field.expression.Every;
import com.cronutils.model.field.expression.FieldExpression;
import com.cronutils.model.field.expression.On;
/**
* Cycle
@ -42,9 +49,10 @@ public abstract class AbstractCycle {
/**
* cycle constructor
*
* @param cron cron
*/
public AbstractCycle(Cron cron) {
protected AbstractCycle(Cron cron) {
if (cron == null) {
throw new IllegalArgumentException("cron must not be null!");
}
@ -60,30 +68,32 @@ public abstract class AbstractCycle {
/**
* whether the minute field has a value
*
* @return if minute field has a value return trueelse return false
*/
protected boolean minFiledIsSetAll(){
protected boolean minFiledIsSetAll() {
FieldExpression minFieldExpression = minField.getExpression();
return (minFieldExpression instanceof Every || minFieldExpression instanceof Always
|| minFieldExpression instanceof Between || minFieldExpression instanceof And
|| minFieldExpression instanceof On);
}
/**
* whether the minute field has a value of every or always
*
* @return if minute field has a value of every or always return trueelse return false
*/
protected boolean minFiledIsEvery(){
protected boolean minFiledIsEvery() {
FieldExpression minFieldExpression = minField.getExpression();
return (minFieldExpression instanceof Every || minFieldExpression instanceof Always);
}
/**
* whether the hour field has a value
*
* @return if hour field has a value return trueelse return false
*/
protected boolean hourFiledIsSetAll(){
protected boolean hourFiledIsSetAll() {
FieldExpression hourFieldExpression = hourField.getExpression();
return (hourFieldExpression instanceof Every || hourFieldExpression instanceof Always
|| hourFieldExpression instanceof Between || hourFieldExpression instanceof And
@ -92,37 +102,40 @@ public abstract class AbstractCycle {
/**
* whether the hour field has a value of every or always
*
* @return if hour field has a value of every or always return trueelse return false
*/
protected boolean hourFiledIsEvery(){
protected boolean hourFiledIsEvery() {
FieldExpression hourFieldExpression = hourField.getExpression();
return (hourFieldExpression instanceof Every || hourFieldExpression instanceof Always);
}
/**
* whether the day Of month field has a value
*
* @return if day Of month field has a value return trueelse return false
*/
protected boolean dayOfMonthFieldIsSetAll(){
protected boolean dayOfMonthFieldIsSetAll() {
return (dayOfMonthField.getExpression() instanceof Every || dayOfMonthField.getExpression() instanceof Always
|| dayOfMonthField.getExpression() instanceof Between || dayOfMonthField.getExpression() instanceof And
|| dayOfMonthField.getExpression() instanceof On);
}
/**
* whether the day Of Month field has a value of every or always
*
* @return if day Of Month field has a value of every or always return trueelse return false
*/
protected boolean dayOfMonthFieldIsEvery(){
protected boolean dayOfMonthFieldIsEvery() {
return (dayOfMonthField.getExpression() instanceof Every || dayOfMonthField.getExpression() instanceof Always);
}
/**
* whether month field has a value
*
* @return if month field has a value return trueelse return false
*/
protected boolean monthFieldIsSetAll(){
protected boolean monthFieldIsSetAll() {
FieldExpression monthFieldExpression = monthField.getExpression();
return (monthFieldExpression instanceof Every || monthFieldExpression instanceof Always
|| monthFieldExpression instanceof Between || monthFieldExpression instanceof And
@ -131,18 +144,20 @@ public abstract class AbstractCycle {
/**
* whether the month field has a value of every or always
*
* @return if month field has a value of every or always return trueelse return false
*/
protected boolean monthFieldIsEvery(){
protected boolean monthFieldIsEvery() {
FieldExpression monthFieldExpression = monthField.getExpression();
return (monthFieldExpression instanceof Every || monthFieldExpression instanceof Always);
}
/**
* whether the day Of week field has a value
*
* @return if day Of week field has a value return trueelse return false
*/
protected boolean dayofWeekFieldIsSetAll(){
protected boolean dayofWeekFieldIsSetAll() {
FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression();
return (dayOfWeekFieldExpression instanceof Every || dayOfWeekFieldExpression instanceof Always
|| dayOfWeekFieldExpression instanceof Between || dayOfWeekFieldExpression instanceof And
@ -151,21 +166,24 @@ public abstract class AbstractCycle {
/**
* whether the day Of week field has a value of every or always
*
* @return if day Of week field has a value of every or always return trueelse return false
*/
protected boolean dayofWeekFieldIsEvery(){
protected boolean dayofWeekFieldIsEvery() {
FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression();
return (dayOfWeekFieldExpression instanceof Every || dayOfWeekFieldExpression instanceof Always);
}
/**
* get cycle enum
*
* @return CycleEnum
*/
protected abstract CycleEnum getCycle();
/**
* get mini level cycle enum
*
* @return CycleEnum
*/
protected abstract CycleEnum getMiniCycle();

147
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

@ -14,22 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.COLON;
import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.*;
import static org.apache.dolphinscheduler.common.Constants.*;
/**
* abstract zookeeper client
*/
@ -38,25 +51,23 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class);
/**
* remove dead server by host
*
* @param host host
* @param serverType serverType
* @throws Exception
*/
public void removeDeadServerByHost(String host, String serverType) throws Exception {
public void removeDeadServerByHost(String host, String serverType) {
List<String> deadServers = super.getChildrenKeys(getDeadZNodeParentPath());
for(String serverPath : deadServers){
if(serverPath.startsWith(serverType+UNDERLINE+host)){
for (String serverPath : deadServers) {
if (serverPath.startsWith(serverType + UNDERLINE + host)) {
String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
super.remove(server);
logger.info("{} server {} deleted from zk dead server path success" , serverType , host);
logger.info("{} server {} deleted from zk dead server path success", serverType, host);
}
}
}
/**
* opType(add): if find dead server , then add to zk deadServerPath
* opType(delete): delete path from zk
@ -64,25 +75,24 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
* @param zNode node path
* @param zkNodeType master or worker
* @param opType delete or add
* @throws Exception errors
*/
public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception {
public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) {
String host = getHostByEventDataPath(zNode);
String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
//check server restart, if restart , dead server path in zk should be delete
if(opType.equals(DELETE_ZK_OP)){
if (opType.equals(DELETE_ZK_OP)) {
removeDeadServerByHost(host, type);
}else if(opType.equals(ADD_ZK_OP)){
} else if (opType.equals(ADD_ZK_OP)) {
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
if(!super.isExisted(deadServerPath)){
if (!super.isExisted(deadServerPath)) {
//add dead server info to zk dead server path : /dead-servers/
super.persist(deadServerPath,(type + UNDERLINE + host));
super.persist(deadServerPath, (type + UNDERLINE + host));
logger.info("{} server dead , and {} added to zk dead server path success" ,
zkNodeType.toString(), zNode);
logger.info("{} server dead , and {} added to zk dead server path success",
zkNodeType, zNode);
}
}
@ -90,51 +100,52 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
/**
* get active master num
*
* @return active master number
*/
public int getActiveMasterNum(){
public int getActiveMasterNum() {
List<String> childrenList = new ArrayList<>();
try {
// read master node parent path from conf
if(super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))){
if (super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))) {
childrenList = super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
}
} catch (Exception e) {
logger.error("getActiveMasterNum error",e);
logger.error("getActiveMasterNum error", e);
}
return childrenList.size();
}
/**
*
* @return zookeeper quorum
*/
public String getZookeeperQuorum(){
public String getZookeeperQuorum() {
return getZookeeperConfig().getServerList();
}
/**
* get server list.
*
* @param zkNodeType zookeeper node type
* @return server list
*/
public List<Server> getServersList(ZKNodeType zkNodeType){
public List<Server> getServersList(ZKNodeType zkNodeType) {
Map<String, String> masterMap = getServerMaps(zkNodeType);
String parentPath = getZNodeParentPath(zkNodeType);
List<Server> masterServers = new ArrayList<>();
for (Map.Entry<String, String> entry : masterMap.entrySet()) {
Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue());
if(masterServer == null){
if (masterServer == null) {
continue;
}
String key = entry.getKey();
masterServer.setZkDirectory(parentPath + "/"+ key);
masterServer.setZkDirectory(parentPath + "/" + key);
//set host and port
String[] hostAndPort=key.split(COLON);
String[] hosts=hostAndPort[0].split(DIVISION_STRING);
String[] hostAndPort = key.split(COLON);
String[] hosts = hostAndPort[0].split(DIVISION_STRING);
// fetch the last one
masterServer.setHost(hosts[hosts.length-1]);
masterServer.setHost(hosts[hosts.length - 1]);
masterServer.setPort(Integer.parseInt(hostAndPort[1]));
masterServers.add(masterServer);
}
@ -143,26 +154,27 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
/**
* get master server list map.
*
* @param zkNodeType zookeeper node type
* @return result : {host : resource info}
*/
public Map<String, String> getServerMaps(ZKNodeType zkNodeType){
public Map<String, String> getServerMaps(ZKNodeType zkNodeType) {
Map<String, String> masterMap = new HashMap<>();
try {
String path = getZNodeParentPath(zkNodeType);
List<String> serverList = super.getChildrenKeys(path);
if(zkNodeType == ZKNodeType.WORKER){
if (zkNodeType == ZKNodeType.WORKER) {
List<String> workerList = new ArrayList<>();
for(String group : serverList){
for (String group : serverList) {
List<String> groupServers = super.getChildrenKeys(path + Constants.SLASH + group);
for(String groupServer : groupServers){
for (String groupServer : groupServers) {
workerList.add(group + Constants.SLASH + groupServer);
}
}
serverList = workerList;
}
for(String server : serverList){
for (String server : serverList) {
masterMap.putIfAbsent(server, super.get(path + Constants.SLASH + server));
}
} catch (Exception e) {
@ -174,20 +186,21 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
/**
* check the zookeeper node already exists
*
* @param host host
* @param zkNodeType zookeeper node type
* @return true if exists
*/
public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
String path = getZNodeParentPath(zkNodeType);
if(StringUtils.isEmpty(path)){
if (StringUtils.isEmpty(path)) {
logger.error("check zk node exists error, host:{}, zk node type:{}",
host, zkNodeType.toString());
host, zkNodeType);
return false;
}
Map<String, String> serverMaps = getServerMaps(zkNodeType);
for(String hostKey : serverMaps.keySet()){
if(hostKey.contains(host)){
for (String hostKey : serverMaps.keySet()) {
if (hostKey.contains(host)) {
return true;
}
}
@ -195,37 +208,33 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
}
/**
*
* @return get worker node parent path
*/
protected String getWorkerZNodeParentPath(){
protected String getWorkerZNodeParentPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
}
/**
*
* @return get master node parent path
*/
protected String getMasterZNodeParentPath(){
protected String getMasterZNodeParentPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS;
}
/**
*
* @return get master lock path
*/
public String getMasterLockPath(){
public String getMasterLockPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS;
}
/**
*
* @param zkNodeType zookeeper node type
* @return get zookeeper node parent path
*/
public String getZNodeParentPath(ZKNodeType zkNodeType) {
String path = "";
switch (zkNodeType){
switch (zkNodeType) {
case MASTER:
return getMasterZNodeParentPath();
case WORKER:
@ -239,50 +248,47 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
}
/**
*
* @return get dead server node parent path
*/
protected String getDeadZNodeParentPath(){
protected String getDeadZNodeParentPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
}
/**
*
* @return get master start up lock path
*/
public String getMasterStartUpLockPath(){
public String getMasterStartUpLockPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
}
/**
*
* @return get master failover lock path
*/
public String getMasterFailoverLockPath(){
public String getMasterFailoverLockPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
}
/**
*
* @return get worker failover lock path
*/
public String getWorkerFailoverLockPath(){
public String getWorkerFailoverLockPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
}
/**
* release mutex
*
* @param mutex mutex
*/
public void releaseMutex(InterProcessMutex mutex) {
if (mutex != null){
if (mutex != null) {
try {
mutex.release();
} catch (Exception e) {
if("instance must be started before calling this method".equals(e.getMessage())){
if ("instance must be started before calling this method".equals(e.getMessage())) {
logger.warn("lock release");
}else{
logger.error("lock release failed",e);
} else {
logger.error("lock release failed", e);
}
}
@ -292,7 +298,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
/**
* init system znode
*/
protected void initSystemZNode(){
protected void initSystemZNode() {
try {
persist(getMasterZNodeParentPath(), "");
persist(getWorkerZNodeParentPath(), "");
@ -300,22 +306,23 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
logger.info("initialize server nodes success.");
} catch (Exception e) {
logger.error("init system znode failed",e);
logger.error("init system znode failed", e);
}
}
/**
* get host ip, string format: masterParentPath/ip
*
* @param path path
* @return host ip, string format: masterParentPath/ip
*/
protected String getHostByEventDataPath(String path) {
if(StringUtils.isEmpty(path)){
if (StringUtils.isEmpty(path)) {
logger.error("empty path!");
return "";
}
String[] pathArray = path.split(SINGLE_SLASH);
if(pathArray.length < 1){
if (pathArray.length < 1) {
logger.error("parse ip error: {}", path);
return "";
}
@ -325,11 +332,11 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
@Override
public String toString() {
return "AbstractZKClient{" +
"zkClient=" + getZkClient() +
", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' +
", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\'' +
", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\'' +
'}';
return "AbstractZKClient{"
+ "zkClient=" + getZkClient()
+ ", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\''
+ ", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\''
+ ", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\''
+ '}';
}
}

20
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java

@ -14,9 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import org.apache.commons.lang.StringUtils;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
@ -25,18 +30,16 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
/**
* Shared Curator zookeeper client
*/
@ -49,7 +52,6 @@ public class CuratorZookeeperClient implements InitializingBean {
private CuratorFramework zkClient;
@Override
public void afterPropertiesSet() throws Exception {
this.zkClient = buildClient();
@ -91,7 +93,7 @@ public class CuratorZookeeperClient implements InitializingBean {
zkClient.blockUntilConnected(30, TimeUnit.SECONDS);
} catch (final Exception ex) {
throw new RuntimeException(ex);
throw new ServiceException(ex);
}
return zkClient;
}

31
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java

@ -14,19 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* just speed experience version
@ -51,10 +54,10 @@ public class ZKServer {
ZKServer zkServer;
if (args.length == 0) {
zkServer = new ZKServer();
} else if (args.length == 1){
zkServer = new ZKServer(Integer.valueOf(args[0]), "");
} else if (args.length == 1) {
zkServer = new ZKServer(Integer.parseInt(args[0]), "");
} else {
zkServer = new ZKServer(Integer.valueOf(args[0]), args[1]);
zkServer = new ZKServer(Integer.parseInt(args[0]), args[1]);
}
zkServer.registerHook();
zkServer.start();
@ -73,7 +76,7 @@ public class ZKServer {
}
private void registerHook() {
/**
/*
* register hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
@ -90,7 +93,7 @@ public class ZKServer {
}
}
public boolean isStarted(){
public boolean isStarted() {
return isStarted.get();
}
@ -119,8 +122,8 @@ public class ZKServer {
if (file.exists()) {
logger.warn("The path of zk server exists");
}
logger.info("zk server starting, data dir path:{}" , zkDataDir);
startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME,"60");
logger.info("zk server starting, data dir path:{}", zkDataDir);
startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME, "60");
}
/**
@ -131,7 +134,7 @@ public class ZKServer {
* @param tickTime zk tick time
* @param maxClientCnxns zk max client connections
*/
private void startLocalZkServer(final int port, final String dataDirPath,final int tickTime,String maxClientCnxns) {
private void startLocalZkServer(final int port, final String dataDirPath, final int tickTime, String maxClientCnxns) {
if (isStarted.compareAndSet(false, true)) {
zooKeeperServerMain = new PublicZooKeeperServerMain();
logger.info("Zookeeper data path : {} ", dataDirPath);
@ -144,8 +147,7 @@ public class ZKServer {
zooKeeperServerMain.initializeAndRun(args);
} catch (QuorumPeerConfig.ConfigException | IOException e) {
logger.warn("Caught exception while starting ZK", e);
throw new RuntimeException(e);
throw new ServiceException("Caught exception while starting ZK", e);
}
}
}
@ -159,7 +161,7 @@ public class ZKServer {
logger.info("zk server stopped");
} catch (Exception e) {
logger.error("Failed to stop ZK ",e);
logger.error("Failed to stop ZK ", e);
}
}
@ -180,8 +182,7 @@ public class ZKServer {
org.apache.commons.io.FileUtils.deleteDirectory(new File(dataDir));
}
} catch (Exception e) {
logger.warn("Caught exception while stopping ZK server", e);
throw new RuntimeException(e);
throw new ServiceException("Caught exception while starting ZK", e);
}
}
}

20
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java

@ -14,21 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import java.nio.charset.StandardCharsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
public class ZookeeperCachedOperator extends ZookeeperOperator {
@ -36,6 +39,7 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
private TreeCache treeCache;
/**
* register a unified listener of /${dsRoot},
*/
@ -59,14 +63,16 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
treeCache.start();
} catch (Exception e) {
logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot());
throw new RuntimeException(e);
throw new ServiceException(e);
}
}
//for sub class
protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path){}
protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) {
// Used by sub class
}
public String getFromCache(final String cachePath, final String key) {
public String getFromCache(final String key) {
ChildData resultInCache = treeCache.getCurrentData(key);
if (null != resultInCache) {
return null == resultInCache.getData() ? null : new String(resultInCache.getData(), StandardCharsets.UTF_8);
@ -74,11 +80,11 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
return null;
}
public TreeCache getTreeCache(final String cachePath) {
public TreeCache getTreeCache() {
return treeCache;
}
public void addListener(TreeCacheListener listener){
public void addListener(TreeCacheListener listener) {
this.treeCache.getListenable().addListener(listener);
}

43
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java

@ -14,13 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import org.apache.commons.lang.StringUtils;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
@ -29,18 +33,16 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
/**
* zk base operator
*/
@ -64,19 +66,23 @@ public class ZookeeperOperator implements InitializingBean {
/**
* this method is for sub class,
*/
protected void registerListener(){}
protected void registerListener() {
// Used by sub class
}
protected void treeCacheStart(){}
protected void treeCacheStart() {
// Used by sub class
}
public void initStateLister() {
checkNotNull(zkClient);
zkClient.getConnectionStateListenable().addListener((client, newState) -> {
if(newState == ConnectionState.LOST){
if (newState == ConnectionState.LOST) {
logger.error("connection lost from zookeeper");
} else if(newState == ConnectionState.RECONNECTED){
} else if (newState == ConnectionState.RECONNECTED) {
logger.info("reconnected to zookeeper");
} else if(newState == ConnectionState.SUSPENDED){
} else if (newState == ConnectionState.SUSPENDED) {
logger.warn("connection SUSPENDED to zookeeper");
}
});
@ -85,7 +91,8 @@ public class ZookeeperOperator implements InitializingBean {
private CuratorFramework buildClient() {
logger.info("zookeeper registry center init, server lists is: {}.", zookeeperConfig.getServerList());
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper quorum can't be null")))
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),
"zookeeper quorum can't be null")))
.retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs()));
//these has default value
@ -114,7 +121,7 @@ public class ZookeeperOperator implements InitializingBean {
try {
zkClient.blockUntilConnected();
} catch (final Exception ex) {
throw new RuntimeException(ex);
throw new ServiceException(ex);
}
return zkClient;
}
@ -138,12 +145,12 @@ public class ZookeeperOperator implements InitializingBean {
throw new IllegalStateException(ex);
} catch (Exception ex) {
logger.error("getChildrenKeys key : {}", key, ex);
throw new RuntimeException(ex);
throw new ServiceException(ex);
}
}
public boolean hasChildren(final String key){
Stat stat ;
public boolean hasChildren(final String key) {
Stat stat;
try {
stat = zkClient.checkExists().forPath(key);
return stat.getNumChildren() >= 1;

Loading…
Cancel
Save