From ace907e7ddf687c1f3d4de3f02804d9493358b9a Mon Sep 17 00:00:00 2001 From: dailidong Date: Wed, 25 Mar 2020 15:16:42 +0800 Subject: [PATCH] =?UTF-8?q?[refactor-worker]=20simplify=20master=E3=80=81?= =?UTF-8?q?=20worker=E3=80=81alert=E3=80=81dao=20properties=20(#2304)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * update logback * update log * refactor worker registry (#2107) * Refactor worker (#2115) * refactor worker registry * refactor master server * Modify workgroupid parameter name (#2105) * Delete worker group management page * Modify workgroupid parameter name * Refactor worker (#2121) * refactor worker registry * refactor master server * refactor MasterSchedulerService * cancelTaskInstance set TaskExecutionContext host,logPath,executePath (#2126) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * add TaskInstanceCacheManager receive Worker report result * TaskInstance setExecutePath * add TaskInstanceCacheManager to receive Worker Task result report * TaskInstanceCacheManager add remove method * add license * add dispatcht task method * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * 1,worker TaskPros use TaskExecutionContext replase 2,Master kill Task , KillTaskProcessor modify * worker remove db * ShellTask modify * master persistence processId and appIds * master persistence processId and appIds * master add kill task logic * master add kill task logic * master add kill task logic * javadoc error modify * remove chinese log * executeDirectly method add Override * remote module modify * TaskKillResponseProcessor command type modify * create buildKillCommand * host add host:port format * host add host:port format * TaskAckProcessor modify * TaskAckProcessor modify * task prioriry refator * remove ITaskQueue * task prioriry refator * remove ITaskQueue * TaskPriority refactor * remove logs * WorkerServer refactor * MasterSchedulerService modify * WorkerConfig listen port modify * modify master and worker listen port * cancelTaskInstance set TaskExecutionContext host,logPath,executePath * cancelTaskInstance set TaskExecutionContext host,logPath,executePath Co-authored-by: qiaozhanwei * not exist in openjdk,just delete * add master and worker properties * add master and worker properties * add master and worker properties * fix cpu 100% bug * simplify master、 worker、alert、dao properties * add master and worker properties * add master and worker properties * add master and worker properties Co-authored-by: Tboy Co-authored-by: break60 <790061044@qq.com> Co-authored-by: qiaozhanwei Co-authored-by: qiaozhanwei --- .../alert/utils/MailUtils.java | 2 +- .../alert/utils/PropertyUtils.java | 12 +++ .../src/main/resources/alert.properties | 18 ++--- .../dolphinscheduler/common/Constants.java | 76 +++++++++++++----- .../common/utils/CommonUtils.java | 20 ++++- .../common/utils/FileUtils.java | 38 +++++++-- .../common/utils/HadoopUtils.java | 45 ++++++++--- .../common/utils/PropertyUtils.java | 30 ++++++- .../src/main/resources/common.properties | 79 ++++++------------- .../dolphinscheduler/dao/TaskRecordDao.java | 26 ++---- .../dao/datasource/ConnectionFactory.java | 27 +------ .../datasource/SpringConnectionFactory.java | 60 ++++++-------- .../dao/utils/PropertyUtils.java | 55 ++++++++++++- ...ation.properties => datasource.properties} | 54 +++++++------ .../consumer/TaskUpdateQueueConsumer.java | 1 + .../service/quartz/QuartzExecutors.java | 51 +++++++++++- .../service/zk/ZookeeperConfig.java | 2 +- .../src/main/resources/quartz.properties | 41 ++++------ .../src/main/resources/zookeeper.properties | 32 ++++++++ 19 files changed, 429 insertions(+), 240 deletions(-) rename dolphinscheduler-dao/src/main/resources/{application.properties => datasource.properties} (61%) create mode 100644 dolphinscheduler-service/src/main/resources/zookeeper.properties diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java index 99efdc8a6a..b0aa418630 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java @@ -55,7 +55,7 @@ public class MailUtils { public static final Boolean mailUseSSL = PropertyUtils.getBoolean(Constants.MAIL_SMTP_SSL_ENABLE); - public static final String xlsFilePath = PropertyUtils.getString(Constants.XLS_FILE_PATH); + public static final String xlsFilePath = PropertyUtils.getString(Constants.XLS_FILE_PATH,"/tmp/xls"); public static final String starttlsEnable = PropertyUtils.getString(Constants.MAIL_SMTP_STARTTLS_ENABLE); diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java index c2f479d101..91f7261db2 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java @@ -79,6 +79,18 @@ public class PropertyUtils { return properties.getProperty(key.trim()); } + /** + * get property value + * + * @param key property name + * @param defaultVal default value + * @return property value + */ + public static String getString(String key, String defaultVal) { + String val = properties.getProperty(key.trim()); + return val == null ? defaultVal : val; + } + /** * get property value * diff --git a/dolphinscheduler-alert/src/main/resources/alert.properties b/dolphinscheduler-alert/src/main/resources/alert.properties index 000d0653b7..db34452fb4 100644 --- a/dolphinscheduler-alert/src/main/resources/alert.properties +++ b/dolphinscheduler-alert/src/main/resources/alert.properties @@ -36,18 +36,18 @@ mail.smtp.ssl.enable=false mail.smtp.ssl.trust=xxx.xxx.com #xls file path,need create if not exist -xls.file.path=/tmp/xls +#xls.file.path=/tmp/xls # Enterprise WeChat configuration enterprise.wechat.enable=false -enterprise.wechat.corp.id=xxxxxxx -enterprise.wechat.secret=xxxxxxx -enterprise.wechat.agent.id=xxxxxxx -enterprise.wechat.users=xxxxxxx -enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpId&corpsecret=$secret -enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$token -enterprise.wechat.team.send.msg={\"toparty\":\"$toParty\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"} -enterprise.wechat.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"$msg\"}} +#enterprise.wechat.corp.id=xxxxxxx +#enterprise.wechat.secret=xxxxxxx +#enterprise.wechat.agent.id=xxxxxxx +#enterprise.wechat.users=xxxxxxx +#enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpId&corpsecret=$secret +#enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$token +#enterprise.wechat.team.send.msg={\"toparty\":\"$toParty\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"} +#enterprise.wechat.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"$msg\"}} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 4a3355a0bf..fd95dfd25e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -25,9 +25,43 @@ import java.util.regex.Pattern; * Constants */ public final class Constants { + private Constants() { throw new IllegalStateException("Constants class"); } + + /** + * quartz config + */ + public static final String ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS = "org.quartz.jobStore.driverDelegateClass"; + public static final String ORG_QUARTZ_SCHEDULER_INSTANCENAME = "org.quartz.scheduler.instanceName"; + public static final String ORG_QUARTZ_SCHEDULER_INSTANCEID = "org.quartz.scheduler.instanceId"; + public static final String ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON = "org.quartz.scheduler.makeSchedulerThreadDaemon"; + public static final String ORG_QUARTZ_JOBSTORE_USEPROPERTIES = "org.quartz.jobStore.useProperties"; + public static final String ORG_QUARTZ_THREADPOOL_CLASS = "org.quartz.threadPool.class"; + public static final String ORG_QUARTZ_THREADPOOL_THREADCOUNT = "org.quartz.threadPool.threadCount"; + public static final String ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS = "org.quartz.threadPool.makeThreadsDaemons"; + public static final String ORG_QUARTZ_THREADPOOL_THREADPRIORITY = "org.quartz.threadPool.threadPriority"; + public static final String ORG_QUARTZ_JOBSTORE_CLASS = "org.quartz.jobStore.class"; + public static final String ORG_QUARTZ_JOBSTORE_TABLEPREFIX = "org.quartz.jobStore.tablePrefix"; + public static final String ORG_QUARTZ_JOBSTORE_ISCLUSTERED = "org.quartz.jobStore.isClustered"; + public static final String ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD = "org.quartz.jobStore.misfireThreshold"; + public static final String ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL = "org.quartz.jobStore.clusterCheckinInterval"; + public static final String ORG_QUARTZ_JOBSTORE_DATASOURCE = "org.quartz.jobStore.dataSource"; + public static final String ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS = "org.quartz.dataSource.myDs.connectionProvider.class"; + + /** + * quartz config default value + */ + public static final String QUARTZ_TABLE_PREFIX = "QRTZ_"; + public static final String QUARTZ_MISFIRETHRESHOLD = "60000"; + public static final String QUARTZ_CLUSTERCHECKININTERVAL = "5000"; + public static final String QUARTZ_DATASOURCE = "myDs"; + public static final String QUARTZ_THREADCOUNT = "25"; + public static final String QUARTZ_THREADPRIORITY = "5"; + public static final String QUARTZ_INSTANCENAME = "DolphinScheduler"; + public static final String QUARTZ_INSTANCEID = "AUTO"; + /** * common properties path */ @@ -56,9 +90,11 @@ public final class Constants { /** - * yarn.resourcemanager.ha.rm.idsfs.defaultFS + * yarn.resourcemanager.ha.rm.ids */ public static final String YARN_RESOURCEMANAGER_HA_RM_IDS = "yarn.resourcemanager.ha.rm.ids"; + public static final String YARN_RESOURCEMANAGER_HA_XX = "xx"; + /** * yarn.application.status.address @@ -72,31 +108,25 @@ public final class Constants { public static final String HDFS_ROOT_USER = "hdfs.root.user"; /** - * hdfs configuration - * data.store2hdfs.basepath + * hdfs/s3 configuration + * resource.upload.path */ - public static final String DATA_STORE_2_HDFS_BASEPATH = "data.store2hdfs.basepath"; + public static final String RESOURCE_UPLOAD_PATH = "resource.upload.path"; /** - * data.basedir.path + * data basedir path */ public static final String DATA_BASEDIR_PATH = "data.basedir.path"; - /** - * data.download.basedir.path - */ - public static final String DATA_DOWNLOAD_BASEDIR_PATH = "data.download.basedir.path"; - - /** - * process.exec.basepath - */ - public static final String PROCESS_EXEC_BASEPATH = "process.exec.basepath"; - /** * dolphinscheduler.env.path */ public static final String DOLPHINSCHEDULER_ENV_PATH = "dolphinscheduler.env.path"; + /** + * environment properties default path + */ + public static final String ENV_PATH = "env/dolphinscheduler_env.sh"; /** * python home @@ -108,15 +138,23 @@ public final class Constants { */ public static final String RESOURCE_VIEW_SUFFIXS = "resource.view.suffixs"; + public static final String RESOURCE_VIEW_SUFFIXS_DEFAULT_VALUE = "txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties"; + /** * development.state */ public static final String DEVELOPMENT_STATE = "development.state"; + public static final String DEVELOPMENT_STATE_DEFAULT_VALUE = "true"; + + /** + * string true + */ + public static final String STRING_TRUE = "true"; /** - * res.upload.startup.type + * resource storage type */ - public static final String RES_UPLOAD_STARTUP_TYPE = "res.upload.startup.type"; + public static final String RESOURCE_STORAGE_TYPE = "resource.storage.type"; /** * MasterServer directory registered in zookeeper @@ -346,9 +384,9 @@ public final class Constants { public static final String FLOWNODE_RUN_FLAG_FORBIDDEN = "FORBIDDEN"; /** - * task record configuration path + * datasource configuration path */ - public static final String APPLICATION_PROPERTIES = "application.properties"; + public static final String DATASOURCE_PROPERTIES = "/datasource.properties"; public static final String TASK_RECORD_URL = "task.record.datasource.url"; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java index d15ede2a27..303274073e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java @@ -20,13 +20,18 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ResUploadType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; +import java.net.URL; /** * common utils */ public class CommonUtils { + private static final Logger logger = LoggerFactory.getLogger(CommonUtils.class); + private CommonUtils() { throw new IllegalStateException("CommonUtils class"); } @@ -37,7 +42,14 @@ public class CommonUtils { public static String getSystemEnvPath() { String envPath = PropertyUtils.getString(Constants.DOLPHINSCHEDULER_ENV_PATH); if (StringUtils.isEmpty(envPath)) { - envPath = System.getProperty("user.home") + File.separator + ".bash_profile"; + URL envDefaultPath = CommonUtils.class.getClassLoader().getResource(Constants.ENV_PATH); + + if (envDefaultPath != null){ + envPath = envDefaultPath.getPath(); + logger.debug("env path :{}", envPath); + }else{ + envPath = System.getProperty("user.home") + File.separator + ".bash_profile"; + } } return envPath; @@ -55,7 +67,7 @@ public class CommonUtils { * @return is develop mode */ public static boolean isDevelopMode() { - return PropertyUtils.getBoolean(Constants.DEVELOPMENT_STATE); + return PropertyUtils.getBoolean(Constants.DEVELOPMENT_STATE, true); } @@ -65,9 +77,9 @@ public class CommonUtils { * @return true if upload resource is HDFS and kerberos startup */ public static boolean getKerberosStartupState(){ - String resUploadStartupType = PropertyUtils.getString(Constants.RES_UPLOAD_STARTUP_TYPE); + String resUploadStartupType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE); ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType); - Boolean kerberosStartupState = PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE); + Boolean kerberosStartupState = PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false); return resUploadType == ResUploadType.HDFS && kerberosStartupState; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java index c84848fbae..9ae315af0c 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java @@ -34,6 +34,8 @@ import static org.apache.dolphinscheduler.common.Constants.*; public class FileUtils { public static final Logger logger = LoggerFactory.getLogger(FileUtils.class); + public static final String DATA_BASEDIR = PropertyUtils.getString(DATA_BASEDIR_PATH,"/tmp/dolphinscheduler"); + /** * get file suffix * @@ -59,7 +61,14 @@ public class FileUtils { * @return download file name */ public static String getDownloadFilename(String filename) { - return String.format("%s/%s/%s", PropertyUtils.getString(DATA_DOWNLOAD_BASEDIR_PATH), DateUtils.getCurrentTime(YYYYMMDDHHMMSS), filename); + String fileName = String.format("%s/download/%s/%s", DATA_BASEDIR, DateUtils.getCurrentTime(YYYYMMDDHHMMSS), filename); + + File file = new File(fileName); + if (!file.getParentFile().exists()){ + file.getParentFile().mkdirs(); + } + + return fileName; } /** @@ -70,7 +79,13 @@ public class FileUtils { * @return local file path */ public static String getUploadFilename(String tenantCode, String filename) { - return String.format("%s/%s/resources/%s", PropertyUtils.getString(DATA_BASEDIR_PATH), tenantCode, filename); + String fileName = String.format("%s/%s/resources/%s", DATA_BASEDIR, tenantCode, filename); + File file = new File(fileName); + if (!file.getParentFile().exists()){ + file.getParentFile().mkdirs(); + } + + return fileName; } /** @@ -82,9 +97,14 @@ public class FileUtils { * @return directory of process execution */ public static String getProcessExecDir(int projectId, int processDefineId, int processInstanceId, int taskInstanceId) { - - return String.format("%s/process/%s/%s/%s/%s", PropertyUtils.getString(PROCESS_EXEC_BASEPATH), Integer.toString(projectId), + String fileName = String.format("%s/exec/process/%s/%s/%s/%s", DATA_BASEDIR, Integer.toString(projectId), Integer.toString(processDefineId), Integer.toString(processInstanceId),Integer.toString(taskInstanceId)); + File file = new File(fileName); + if (!file.getParentFile().exists()){ + file.getParentFile().mkdirs(); + } + + return fileName; } /** @@ -95,15 +115,21 @@ public class FileUtils { * @return directory of process instances */ public static String getProcessExecDir(int projectId, int processDefineId, int processInstanceId) { - return String.format("%s/process/%s/%s/%s", PropertyUtils.getString(PROCESS_EXEC_BASEPATH), Integer.toString(projectId), + String fileName = String.format("%s/exec/process/%s/%s/%s", DATA_BASEDIR, Integer.toString(projectId), Integer.toString(processDefineId), Integer.toString(processInstanceId)); + File file = new File(fileName); + if (!file.getParentFile().exists()){ + file.getParentFile().mkdirs(); + } + + return fileName; } /** * @return get suffixes for resource files that support online viewing */ public static String getResourceViewSuffixs() { - return PropertyUtils.getString(RESOURCE_VIEW_SUFFIXS); + return PropertyUtils.getString(RESOURCE_VIEW_SUFFIXS, RESOURCE_VIEW_SUFFIXS_DEFAULT_VALUE); } /** diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java index 541281f793..431d0156fc 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java @@ -38,6 +38,8 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_UPLOAD_PATH; + /** * hadoop utils * single instance @@ -47,8 +49,11 @@ public class HadoopUtils implements Closeable { private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class); private static String hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER); + public static final String resourceUploadPath = PropertyUtils.getString(RESOURCE_UPLOAD_PATH,"/dolphinscheduler"); + private static volatile HadoopUtils instance = new HadoopUtils(); private static volatile Configuration configuration; + private static volatile boolean yarnEnabled = false; private static FileSystem fs; @@ -72,8 +77,7 @@ public class HadoopUtils implements Closeable { * init dolphinscheduler root path in hdfs */ private void initHdfsPath(){ - String hdfsPath = PropertyUtils.getString(Constants.DATA_STORE_2_HDFS_BASEPATH); - Path path = new Path(hdfsPath); + Path path = new Path(resourceUploadPath); try { if (!fs.exists(path)) { @@ -95,11 +99,11 @@ public class HadoopUtils implements Closeable { try { configuration = new Configuration(); - String resUploadStartupType = PropertyUtils.getString(Constants.RES_UPLOAD_STARTUP_TYPE); + String resUploadStartupType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE); ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType); if (resUploadType == ResUploadType.HDFS){ - if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE)){ + if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false)){ System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF, PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH)); configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos"); @@ -151,14 +155,28 @@ public class HadoopUtils implements Closeable { fs = FileSystem.get(configuration); } - + /** + * if rmHaIds includes xx, it signs not use resourcemanager + * otherwise: + * if rmHaIds is empty, single resourcemanager enabled + * if rmHaIds not empty: resourcemanager HA enabled + */ String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS); String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS); - if (!StringUtils.isEmpty(rmHaIds)) { + //not use resourcemanager + if (rmHaIds.contains(Constants.YARN_RESOURCEMANAGER_HA_XX)){ + yarnEnabled = false; + } else if (!StringUtils.isEmpty(rmHaIds)) { + //resourcemanager HA enabled appAddress = getAppAddress(appAddress, rmHaIds); + yarnEnabled = true; logger.info("appAddress : {}", appAddress); + } else { + //single resourcemanager enabled + yarnEnabled = true; } configuration.set(Constants.YARN_APPLICATION_STATUS_ADDRESS, appAddress); + } catch (Exception e) { logger.error(e.getMessage(), e); } @@ -361,6 +379,16 @@ public class HadoopUtils implements Closeable { return fs.rename(new Path(src), new Path(dst)); } + /** + * + * haddop resourcemanager enabled or not + * + * @return true if haddop resourcemanager enabled + * @throws IOException errors + */ + public boolean isYarnEnabled() { + return yarnEnabled; + } /** * get the state of an application @@ -405,12 +433,11 @@ public class HadoopUtils implements Closeable { * @return data hdfs path */ public static String getHdfsDataBasePath() { - String basePath = PropertyUtils.getString(Constants.DATA_STORE_2_HDFS_BASEPATH); - if ("/".equals(basePath)) { + if ("/".equals(resourceUploadPath)) { // if basepath is configured to /, the generated url may be //default/resources (with extra leading /) return ""; } else { - return basePath; + return resourceUploadPath; } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java index c3e8197079..b3ec7e375d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java @@ -74,7 +74,7 @@ public class PropertyUtils { * @return judge whether resource upload startup */ public static Boolean getResUploadStartupState(){ - String resUploadStartupType = PropertyUtils.getString(Constants.RES_UPLOAD_STARTUP_TYPE); + String resUploadStartupType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE); ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType); return resUploadType == ResUploadType.HDFS || resUploadType == ResUploadType.S3; } @@ -89,6 +89,18 @@ public class PropertyUtils { return properties.getProperty(key.trim()); } + /** + * get property value + * + * @param key property name + * @param defaultVal default value + * @return property value + */ + public static String getString(String key, String defaultVal) { + String val = properties.getProperty(key.trim()); + return val == null ? defaultVal : val; + } + /** * get property value * @@ -134,6 +146,22 @@ public class PropertyUtils { return false; } + /** + * get property value + * + * @param key property name + * @param defaultValue default value + * @return property value + */ + public static Boolean getBoolean(String key, boolean defaultValue) { + String value = properties.getProperty(key.trim()); + if(null != value){ + return Boolean.parseBoolean(value); + } + + return defaultValue; + } + /** * get property long value * @param key key diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties index 5a4aa1441f..843aa18e64 100644 --- a/dolphinscheduler-common/src/main/resources/common.properties +++ b/dolphinscheduler-common/src/main/resources/common.properties @@ -15,80 +15,53 @@ # limitations under the License. # -#task queue implementation, default "zookeeper" +# task queue implementation, default "zookeeper" TODO dolphinscheduler.queue.impl=zookeeper -#zookeeper cluster. multiple are separated by commas. eg. 192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181 -zookeeper.quorum=localhost:2181 +# resource storage type : HDFS,S3,NONE +resource.storage.type=HDFS -#dolphinscheduler root directory -zookeeper.dolphinscheduler.root=/dolphinscheduler - -#dolphinscheduler failover directory -zookeeper.session.timeout=300 -zookeeper.connection.timeout=300 -zookeeper.retry.base.sleep=100 -zookeeper.retry.max.sleep=30000 -zookeeper.retry.maxtime=5 - -# resource upload startup type : HDFS,S3,NONE -res.upload.startup.type=NONE - -# Users who have permission to create directories under the HDFS root path -hdfs.root.user=hdfs - -# data base dir, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended -data.store2hdfs.basepath=/dolphinscheduler - -# user data directory path, self configuration, please make sure the directory exists and have read write permissions -data.basedir.path=/tmp/dolphinscheduler - -# directory path for user data download. self configuration, please make sure the directory exists and have read write permissions -data.download.basedir.path=/tmp/dolphinscheduler/download - -# process execute directory. self configuration, please make sure the directory exists and have read write permissions -process.exec.basepath=/tmp/dolphinscheduler/exec +# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended +#resource.upload.path=/dolphinscheduler +# user data local directory path, please make sure the directory exists and have read write permissions +#data.basedir.path=/tmp/dolphinscheduler # whether kerberos starts -hadoop.security.authentication.startup.state=false +#hadoop.security.authentication.startup.state=false # java.security.krb5.conf path -java.security.krb5.conf.path=/opt/krb5.conf +#java.security.krb5.conf.path=/opt/krb5.conf # loginUserFromKeytab user -login.user.keytab.username=hdfs-mycluster@ESZ.COM +#login.user.keytab.username=hdfs-mycluster@ESZ.COM # loginUserFromKeytab path -login.user.keytab.path=/opt/hdfs.headless.keytab - -# system env path. self configuration, please make sure the directory and file exists and have read write execute permissions -dolphinscheduler.env.path=/opt/dolphinscheduler_env.sh +#login.user.keytab.path=/opt/hdfs.headless.keytab #resource.view.suffixs -resource.view.suffixs=txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties - -# is development state? default "false" -development.state=true +#resource.view.suffixs=txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties +# if resource.storage.type=HDFS, the user need to have permission to create directories under the HDFS root path +hdfs.root.user=hdfs -# ha or single namenode,If namenode ha needs to copy core-site.xml and hdfs-site.xml -# to the conf directory,support s3,for example : s3a://dolphinscheduler -fs.defaultFS=hdfs://mycluster:8020 +# if resource.storage.type=S3,the value like: s3a://dolphinscheduler ; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir +fs.defaultFS=hdfs://l:8020 -# s3 need,s3 endpoint -fs.s3a.endpoint=http://192.168.199.91:9010 +# if resource.storage.type=S3,s3 endpoint +#fs.s3a.endpoint=http://192.168.199.91:9010 -# s3 need,s3 access key -fs.s3a.access.key=A3DXS30FO22544RE +# if resource.storage.type=S3,s3 access key +#fs.s3a.access.key=A3DXS30FO22544RE -# s3 need,s3 secret key -fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK +# if resource.storage.type=S3,s3 secret key +#fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK -#resourcemanager ha note this need ips , this empty if single +# if not use hadoop resourcemanager, please keep default value; if resourcemanager HA enable, please type the HA ips ; if resourcemanager is single, make this value empty TODO yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx -# If it is a single resourcemanager, you only need to configure one host name. If it is resourcemanager HA, the default configuration is fine +# If resourcemanager HA enable or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ark1 to actual resourcemanager hostname. yarn.application.status.address=http://ark1:8088/ws/v1/cluster/apps/%s - +# system env path. self configuration, please make sure the directory and file exists and have read write execute permissions, TODO +#dolphinscheduler.env.path=env/dolphinscheduler_env.sh diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java index e0c3761032..58e34076ff 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java @@ -22,9 +22,7 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.TaskRecord; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.dolphinscheduler.dao.utils.PropertyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,26 +40,12 @@ public class TaskRecordDao { private static Logger logger = LoggerFactory.getLogger(TaskRecordDao.class.getName()); - /** - * load conf - */ - private static Configuration conf; - - static { - try { - conf = new PropertiesConfiguration(Constants.APPLICATION_PROPERTIES); - }catch (ConfigurationException e){ - logger.error("load configuration exception",e); - System.exit(1); - } - } - /** * get task record flag * @return whether startup taskrecord */ public static boolean getTaskRecordFlag(){ - return conf.getBoolean(Constants.TASK_RECORD_FLAG,false); + return PropertyUtils.getBoolean(Constants.TASK_RECORD_FLAG,false); } /** * create connection @@ -72,9 +56,9 @@ public class TaskRecordDao { return null; } String driver = "com.mysql.jdbc.Driver"; - String url = conf.getString(Constants.TASK_RECORD_URL); - String username = conf.getString(Constants.TASK_RECORD_USER); - String password = conf.getString(Constants.TASK_RECORD_PWD); + String url = PropertyUtils.getString(Constants.TASK_RECORD_URL); + String username = PropertyUtils.getString(Constants.TASK_RECORD_USER); + String password = PropertyUtils.getString(Constants.TASK_RECORD_PWD); Connection conn = null; try { //classLoader,load driver diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java index a3bc6a0150..6aad2a3ed9 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java @@ -58,32 +58,7 @@ public class ConnectionFactory extends SpringConnectionFactory{ */ public static DruidDataSource getDataSource() { - DruidDataSource druidDataSource = new DruidDataSource(); - - druidDataSource.setDriverClassName(conf.getString(Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME)); - druidDataSource.setUrl(conf.getString(Constants.SPRING_DATASOURCE_URL)); - druidDataSource.setUsername(conf.getString(Constants.SPRING_DATASOURCE_USERNAME)); - druidDataSource.setPassword(conf.getString(Constants.SPRING_DATASOURCE_PASSWORD)); - druidDataSource.setValidationQuery(conf.getString(Constants.SPRING_DATASOURCE_VALIDATION_QUERY)); - - druidDataSource.setPoolPreparedStatements(conf.getBoolean(Constants.SPRING_DATASOURCE_POOL_PREPARED_STATEMENTS)); - druidDataSource.setTestWhileIdle(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_WHILE_IDLE)); - druidDataSource.setTestOnBorrow(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_BORROW)); - druidDataSource.setTestOnReturn(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_RETURN)); - druidDataSource.setKeepAlive(conf.getBoolean(Constants.SPRING_DATASOURCE_KEEP_ALIVE)); - - druidDataSource.setMinIdle(conf.getInt(Constants.SPRING_DATASOURCE_MIN_IDLE)); - druidDataSource.setMaxActive(conf.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE)); - druidDataSource.setMaxWait(conf.getInt(Constants.SPRING_DATASOURCE_MAX_WAIT)); - druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(conf.getInt(Constants.SPRING_DATASOURCE_MAX_POOL_PREPARED_STATEMENT_PER_CONNECTION_SIZE)); - druidDataSource.setInitialSize(conf.getInt(Constants.SPRING_DATASOURCE_INITIAL_SIZE)); - druidDataSource.setTimeBetweenEvictionRunsMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_EVICTION_RUNS_MILLIS)); - druidDataSource.setTimeBetweenConnectErrorMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_CONNECT_ERROR_MILLIS)); - druidDataSource.setMinEvictableIdleTimeMillis(conf.getLong(Constants.SPRING_DATASOURCE_MIN_EVICTABLE_IDLE_TIME_MILLIS)); - druidDataSource.setValidationQueryTimeout(conf.getInt(Constants.SPRING_DATASOURCE_VALIDATION_QUERY_TIMEOUT)); - //auto commit - druidDataSource.setDefaultAutoCommit(conf.getBoolean(Constants.SPRING_DATASOURCE_DEFAULT_AUTO_COMMIT)); - + DruidDataSource druidDataSource = dataSource(); return druidDataSource; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java index cb9f22eb54..4bdbaa29c1 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java @@ -25,6 +25,7 @@ import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.dao.utils.PropertyUtils; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.apache.ibatis.type.JdbcType; @@ -48,19 +49,6 @@ public class SpringConnectionFactory { private static final Logger logger = LoggerFactory.getLogger(SpringConnectionFactory.class); - /** - * Load configuration file - */ - protected static org.apache.commons.configuration.Configuration conf; - - static { - try { - conf = new PropertiesConfiguration(Constants.APPLICATION_PROPERTIES); - } catch (ConfigurationException e) { - logger.error("load configuration exception", e); - System.exit(1); - } - } /** * pagination interceptor @@ -76,33 +64,33 @@ public class SpringConnectionFactory { * @return druid dataSource */ @Bean(destroyMethod="") - public DruidDataSource dataSource() { + public static DruidDataSource dataSource() { DruidDataSource druidDataSource = new DruidDataSource(); - druidDataSource.setDriverClassName(conf.getString(Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME)); - druidDataSource.setUrl(conf.getString(Constants.SPRING_DATASOURCE_URL)); - druidDataSource.setUsername(conf.getString(Constants.SPRING_DATASOURCE_USERNAME)); - druidDataSource.setPassword(conf.getString(Constants.SPRING_DATASOURCE_PASSWORD)); - druidDataSource.setValidationQuery(conf.getString(Constants.SPRING_DATASOURCE_VALIDATION_QUERY)); - - druidDataSource.setPoolPreparedStatements(conf.getBoolean(Constants.SPRING_DATASOURCE_POOL_PREPARED_STATEMENTS)); - druidDataSource.setTestWhileIdle(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_WHILE_IDLE)); - druidDataSource.setTestOnBorrow(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_BORROW)); - druidDataSource.setTestOnReturn(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_RETURN)); - druidDataSource.setKeepAlive(conf.getBoolean(Constants.SPRING_DATASOURCE_KEEP_ALIVE)); - - druidDataSource.setMinIdle(conf.getInt(Constants.SPRING_DATASOURCE_MIN_IDLE)); - druidDataSource.setMaxActive(conf.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE)); - druidDataSource.setMaxWait(conf.getInt(Constants.SPRING_DATASOURCE_MAX_WAIT)); - druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(conf.getInt(Constants.SPRING_DATASOURCE_MAX_POOL_PREPARED_STATEMENT_PER_CONNECTION_SIZE)); - druidDataSource.setInitialSize(conf.getInt(Constants.SPRING_DATASOURCE_INITIAL_SIZE)); - druidDataSource.setTimeBetweenEvictionRunsMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_EVICTION_RUNS_MILLIS)); - druidDataSource.setTimeBetweenConnectErrorMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_CONNECT_ERROR_MILLIS)); - druidDataSource.setMinEvictableIdleTimeMillis(conf.getLong(Constants.SPRING_DATASOURCE_MIN_EVICTABLE_IDLE_TIME_MILLIS)); - druidDataSource.setValidationQueryTimeout(conf.getInt(Constants.SPRING_DATASOURCE_VALIDATION_QUERY_TIMEOUT)); + druidDataSource.setDriverClassName(PropertyUtils.getString(Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME)); + druidDataSource.setUrl(PropertyUtils.getString(Constants.SPRING_DATASOURCE_URL)); + druidDataSource.setUsername(PropertyUtils.getString(Constants.SPRING_DATASOURCE_USERNAME)); + druidDataSource.setPassword(PropertyUtils.getString(Constants.SPRING_DATASOURCE_PASSWORD)); + druidDataSource.setValidationQuery(PropertyUtils.getString(Constants.SPRING_DATASOURCE_VALIDATION_QUERY,"SELECT 1")); + + druidDataSource.setPoolPreparedStatements(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_POOL_PREPARED_STATEMENTS,true)); + druidDataSource.setTestWhileIdle(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_TEST_WHILE_IDLE,true)); + druidDataSource.setTestOnBorrow(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_BORROW,true)); + druidDataSource.setTestOnReturn(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_RETURN,true)); + druidDataSource.setKeepAlive(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_KEEP_ALIVE,true)); + + druidDataSource.setMinIdle(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MIN_IDLE,5)); + druidDataSource.setMaxActive(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE,50)); + druidDataSource.setMaxWait(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_WAIT,60000)); + druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_POOL_PREPARED_STATEMENT_PER_CONNECTION_SIZE,20)); + druidDataSource.setInitialSize(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_INITIAL_SIZE,5)); + druidDataSource.setTimeBetweenEvictionRunsMillis(PropertyUtils.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_EVICTION_RUNS_MILLIS,60000)); + druidDataSource.setTimeBetweenConnectErrorMillis(PropertyUtils.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_CONNECT_ERROR_MILLIS,60000)); + druidDataSource.setMinEvictableIdleTimeMillis(PropertyUtils.getLong(Constants.SPRING_DATASOURCE_MIN_EVICTABLE_IDLE_TIME_MILLIS,300000)); + druidDataSource.setValidationQueryTimeout(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_VALIDATION_QUERY_TIMEOUT,3)); //auto commit - druidDataSource.setDefaultAutoCommit(conf.getBoolean(Constants.SPRING_DATASOURCE_DEFAULT_AUTO_COMMIT)); + druidDataSource.setDefaultAutoCommit(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_DEFAULT_AUTO_COMMIT,true)); return druidDataSource; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/PropertyUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/PropertyUtils.java index cdd481a5d7..47cfadbf9a 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/PropertyUtils.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/PropertyUtils.java @@ -49,7 +49,7 @@ public class PropertyUtils { * init */ private void init(){ - String[] propertyFiles = new String[]{Constants.APPLICATION_PROPERTIES}; + String[] propertyFiles = new String[]{Constants.DATASOURCE_PROPERTIES}; for (String fileName : propertyFiles) { InputStream fis = null; try { @@ -77,6 +77,17 @@ public class PropertyUtils { return properties.getProperty(key); } + /** + * get property value + * + * @param key property name + * @param defaultVal default value + * @return property value + */ + public static String getString(String key, String defaultVal) { + String val = properties.getProperty(key.trim()); + return val == null ? defaultVal : val; + } /** * get property value @@ -106,4 +117,46 @@ public class PropertyUtils { } return defaultValue; } + + /** + * get property value + * + * @param key property name + * @return property value + */ + public static Boolean getBoolean(String key) { + String value = properties.getProperty(key.trim()); + if(null != value){ + return Boolean.parseBoolean(value); + } + + return false; + } + + /** + * get property value + * + * @param key property name + * @param defaultValue default value + * @return property value + */ + public static Boolean getBoolean(String key, boolean defaultValue) { + String value = properties.getProperty(key.trim()); + if(null != value){ + return Boolean.parseBoolean(value); + } + + return defaultValue; + } + + /** + * get property long value + * @param key key + * @param defaultVal default value + * @return property value + */ + public static long getLong(String key, long defaultVal) { + String val = getString(key); + return val == null ? defaultVal : Long.parseLong(val); + } } diff --git a/dolphinscheduler-dao/src/main/resources/application.properties b/dolphinscheduler-dao/src/main/resources/datasource.properties similarity index 61% rename from dolphinscheduler-dao/src/main/resources/application.properties rename to dolphinscheduler-dao/src/main/resources/datasource.properties index b79c1edf60..3e06ab467a 100644 --- a/dolphinscheduler-dao/src/main/resources/application.properties +++ b/dolphinscheduler-dao/src/main/resources/datasource.properties @@ -15,59 +15,61 @@ # limitations under the License. # -# base spring data source configuration -spring.datasource.type=com.alibaba.druid.pool.DruidDataSource + # postgre -spring.datasource.driver-class-name=org.postgresql.Driver -spring.datasource.url=jdbc:postgresql://localhost:5432/dolphinscheduler +#spring.datasource.driver-class-name=org.postgresql.Driver +#spring.datasource.url=jdbc:postgresql://localhost:5432/dolphinscheduler # mysql -#spring.datasource.driver-class-name=com.mysql.jdbc.Driver -#spring.datasource.url=jdbc:mysql://localhost:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8 -spring.datasource.username=test -spring.datasource.password=test +spring.datasource.driver-class-name=com.mysql.jdbc.Driver +spring.datasource.url=jdbc:mysql://localhost:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8 +spring.datasource.username=root +spring.datasource.password=root@123 + +## base spring data source configuration todo need to remove +#spring.datasource.type=com.alibaba.druid.pool.DruidDataSource # connection configuration -spring.datasource.initialSize=5 +#spring.datasource.initialSize=5 # min connection number -spring.datasource.minIdle=5 +#spring.datasource.minIdle=5 # max connection number -spring.datasource.maxActive=50 +#spring.datasource.maxActive=50 # max wait time for get a connection in milliseconds. if configuring maxWait, fair locks are enabled by default and concurrency efficiency decreases. # If necessary, unfair locks can be used by configuring the useUnfairLock attribute to true. -spring.datasource.maxWait=60000 +#spring.datasource.maxWait=60000 # milliseconds for check to close free connections -spring.datasource.timeBetweenEvictionRunsMillis=60000 +#spring.datasource.timeBetweenEvictionRunsMillis=60000 # the Destroy thread detects the connection interval and closes the physical connection in milliseconds if the connection idle time is greater than or equal to minEvictableIdleTimeMillis. -spring.datasource.timeBetweenConnectErrorMillis=60000 +#spring.datasource.timeBetweenConnectErrorMillis=60000 # the longest time a connection remains idle without being evicted, in milliseconds -spring.datasource.minEvictableIdleTimeMillis=300000 +#spring.datasource.minEvictableIdleTimeMillis=300000 #the SQL used to check whether the connection is valid requires a query statement. If validation Query is null, testOnBorrow, testOnReturn, and testWhileIdle will not work. -spring.datasource.validationQuery=SELECT 1 +#spring.datasource.validationQuery=SELECT 1 #check whether the connection is valid for timeout, in seconds -spring.datasource.validationQueryTimeout=3 +#spring.datasource.validationQueryTimeout=3 # when applying for a connection, if it is detected that the connection is idle longer than time Between Eviction Runs Millis, # validation Query is performed to check whether the connection is valid -spring.datasource.testWhileIdle=true +#spring.datasource.testWhileIdle=true #execute validation to check if the connection is valid when applying for a connection -spring.datasource.testOnBorrow=true +#spring.datasource.testOnBorrow=true #execute validation to check if the connection is valid when the connection is returned -spring.datasource.testOnReturn=false -spring.datasource.defaultAutoCommit=true -spring.datasource.keepAlive=true +#spring.datasource.testOnReturn=false +#spring.datasource.defaultAutoCommit=true +#spring.datasource.keepAlive=true # open PSCache, specify count PSCache for every connection -spring.datasource.poolPreparedStatements=true -spring.datasource.maxPoolPreparedStatementPerConnectionSize=20 +#spring.datasource.poolPreparedStatements=true +#spring.datasource.maxPoolPreparedStatementPerConnectionSize=20 -spring.datasource.spring.datasource.filters=stat,wall,log4j -spring.datasource.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 +#spring.datasource.filters=stat,wall,log4j +#spring.datasource.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java index e7b6327172..da2405ecfe 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.consumer; import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UdfType; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java index 9d96264a60..9fa8f8186b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java @@ -16,13 +16,20 @@ */ package org.apache.dolphinscheduler.service.quartz; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; +import org.quartz.impl.jdbcjobstore.JobStoreTX; +import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate; +import org.quartz.impl.jdbcjobstore.StdJDBCDelegate; import org.quartz.impl.matchers.GroupMatcher; +import org.quartz.simpl.SimpleThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +37,7 @@ import java.util.*; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import static org.apache.dolphinscheduler.common.Constants.ORG_POSTGRESQL_DRIVER; import static org.quartz.CronScheduleBuilder.cronSchedule; import static org.quartz.JobBuilder.newJob; import static org.quartz.TriggerBuilder.newTrigger; @@ -59,7 +67,21 @@ public class QuartzExecutors { */ private static volatile QuartzExecutors INSTANCE = null; - private QuartzExecutors() {} + /** + * load conf + */ + private static Configuration conf; + + static { + try { + conf = new PropertiesConfiguration(Constants.QUARTZ_PROPERTIES_PATH); + }catch (ConfigurationException e){ + logger.warn("not loaded quartz configuration file, will used default value",e); + } + } + + private QuartzExecutors() { + } /** * thread safe and performance promote @@ -87,7 +109,32 @@ public class QuartzExecutors { */ private void init() { try { - SchedulerFactory schedulerFactory = new StdSchedulerFactory(Constants.QUARTZ_PROPERTIES_PATH); + StdSchedulerFactory schedulerFactory = new StdSchedulerFactory(); + Properties properties = new Properties(); + + String dataSourceDriverClass = org.apache.dolphinscheduler.dao.utils.PropertyUtils.getString(Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME); + if (dataSourceDriverClass.contains(ORG_POSTGRESQL_DRIVER)){ + properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, PostgreSQLDelegate.class.getName())); + } else { + properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, StdJDBCDelegate.class.getName())); + } + properties.setProperty(Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME, conf.getString(Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME, Constants.QUARTZ_INSTANCENAME)); + properties.setProperty(Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID, conf.getString(Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID, Constants.QUARTZ_INSTANCEID)); + properties.setProperty(Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,conf.getString(Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,Constants.STRING_TRUE)); + properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES,Constants.STRING_TRUE)); + properties.setProperty(Constants.ORG_QUARTZ_THREADPOOL_CLASS,conf.getString(Constants.ORG_QUARTZ_THREADPOOL_CLASS, SimpleThreadPool.class.getName())); + properties.setProperty(Constants.ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,conf.getString(Constants.ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,Constants.STRING_TRUE)); + properties.setProperty(Constants.ORG_QUARTZ_THREADPOOL_THREADCOUNT,conf.getString(Constants.ORG_QUARTZ_THREADPOOL_THREADCOUNT, Constants.QUARTZ_THREADCOUNT)); + properties.setProperty(Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY,conf.getString(Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY, Constants.QUARTZ_THREADPRIORITY)); + properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_CLASS,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_CLASS, JobStoreTX.class.getName())); + properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX, Constants.QUARTZ_TABLE_PREFIX)); + properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_ISCLUSTERED,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_ISCLUSTERED,Constants.STRING_TRUE)); + properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, Constants.QUARTZ_MISFIRETHRESHOLD)); + properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, Constants.QUARTZ_CLUSTERCHECKININTERVAL)); + properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_DATASOURCE,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_DATASOURCE, Constants.QUARTZ_DATASOURCE)); + properties.setProperty(Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,conf.getString(Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,DruidConnectionProvider.class.getName())); + + schedulerFactory.initialize(properties); scheduler = schedulerFactory.getScheduler(); } catch (SchedulerException e) { diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java index c6bdfc3b02..5bdc6f8cd7 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java @@ -24,7 +24,7 @@ import org.springframework.stereotype.Component; * zookeeper conf */ @Component -@PropertySource("classpath:common.properties") +@PropertySource("classpath:zookeeper.properties") public class ZookeeperConfig { //zk connect config diff --git a/dolphinscheduler-service/src/main/resources/quartz.properties b/dolphinscheduler-service/src/main/resources/quartz.properties index 60a09683f3..3bd82c77f0 100644 --- a/dolphinscheduler-service/src/main/resources/quartz.properties +++ b/dolphinscheduler-service/src/main/resources/quartz.properties @@ -18,45 +18,36 @@ #============================================================================ # Configure Main Scheduler Properties #============================================================================ -org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate +#org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate #org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.PostgreSQLDelegate -# postgre -#org.quartz.dataSource.myDs.driver = org.postgresql.Driver -#org.quartz.dataSource.myDs.URL = jdbc:postgresql://localhost:5432/dolphinscheduler?characterEncoding=utf8 -# mysql -#org.quartz.dataSource.myDs.driver = com.mysql.jdbc.Driver -#org.quartz.dataSource.myDs.URL = jdbc:mysql://localhost:3306/dolphinscheduler?characterEncoding=utf8 -#org.quartz.dataSource.myDs.user = root -#org.quartz.dataSource.myDs.password = 123456 - -org.quartz.scheduler.instanceName = DolphinScheduler -org.quartz.scheduler.instanceId = AUTO -org.quartz.scheduler.makeSchedulerThreadDaemon = true -org.quartz.jobStore.useProperties = false +#org.quartz.scheduler.instanceName = DolphinScheduler +#org.quartz.scheduler.instanceId = AUTO +#org.quartz.scheduler.makeSchedulerThreadDaemon = true +#org.quartz.jobStore.useProperties = false #============================================================================ # Configure ThreadPool #============================================================================ -org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool -org.quartz.threadPool.makeThreadsDaemons = true -org.quartz.threadPool.threadCount = 25 -org.quartz.threadPool.threadPriority = 5 +#org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool +#org.quartz.threadPool.makeThreadsDaemons = true +#org.quartz.threadPool.threadCount = 25 +#org.quartz.threadPool.threadPriority = 5 #============================================================================ # Configure JobStore #============================================================================ -org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX +#org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX -org.quartz.jobStore.tablePrefix = QRTZ_ -org.quartz.jobStore.isClustered = true -org.quartz.jobStore.misfireThreshold = 60000 -org.quartz.jobStore.clusterCheckinInterval = 5000 +#org.quartz.jobStore.tablePrefix = QRTZ_ +#org.quartz.jobStore.isClustered = true +#org.quartz.jobStore.misfireThreshold = 60000 +#org.quartz.jobStore.clusterCheckinInterval = 5000 org.quartz.jobStore.acquireTriggersWithinLock=true -org.quartz.jobStore.dataSource = myDs +#org.quartz.jobStore.dataSource = myDs #============================================================================ # Configure Datasources #============================================================================ -org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.service.quartz.DruidConnectionProvider \ No newline at end of file +#org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.service.quartz.DruidConnectionProvider \ No newline at end of file diff --git a/dolphinscheduler-service/src/main/resources/zookeeper.properties b/dolphinscheduler-service/src/main/resources/zookeeper.properties new file mode 100644 index 0000000000..1d3c53adb1 --- /dev/null +++ b/dolphinscheduler-service/src/main/resources/zookeeper.properties @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# task queue implementation, default "zookeeper" +dolphinscheduler.queue.impl=zookeeper + +# zookeeper cluster. multiple are separated by commas. eg. 192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181 +zookeeper.quorum=192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181 + +# dolphinscheduler root directory +#zookeeper.dolphinscheduler.root=/dolphinscheduler + +# dolphinscheduler failover directory +#zookeeper.session.timeout=60000 +#zookeeper.connection.timeout=300 +#zookeeper.retry.base.sleep=100 +#zookeeper.retry.max.sleep=30000 +#zookeeper.retry.maxtime=5 \ No newline at end of file