failedNode = new ArrayList<>();
+ failedNode.add(newFailedNodeName);
+ conditionsParameters.setFailedNode(failedNode);
+ }
+ String conditionResultStr = conditionsParameters.getConditionResult();
+ taskNode.setConditionResult(conditionResultStr);
+ tasks.set(i, taskNode);
+ }
+ }
+ return JSONUtils.toJsonString(processData);
}
-
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
index 6ac847b8db..2921ce2bba 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
@@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.service.quartz;
+package org.apache.dolphinscheduler.service.quartz;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -25,6 +25,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Date;
+
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
@@ -34,8 +37,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
-import java.util.Date;
-
/**
* process schedule job
*/
@@ -46,7 +47,7 @@ public class ProcessScheduleJob implements Job {
*/
private static final Logger logger = LoggerFactory.getLogger(ProcessScheduleJob.class);
- public ProcessService getProcessService(){
+ public ProcessService getProcessService() {
return SpringApplicationContext.getBean(ProcessService.class);
}
@@ -66,10 +67,8 @@ public class ProcessScheduleJob implements Job {
int projectId = dataMap.getInt(Constants.PROJECT_ID);
int scheduleId = dataMap.getInt(Constants.SCHEDULE_ID);
-
Date scheduledFireTime = context.getScheduledFireTime();
-
Date fireTime = context.getFireTime();
logger.info("scheduled fire time :{}, fire time :{}, process id :{}", scheduledFireTime, fireTime, scheduleId);
@@ -82,11 +81,10 @@ public class ProcessScheduleJob implements Job {
return;
}
-
ProcessDefinition processDefinition = getProcessService().findProcessDefineById(schedule.getProcessDefinitionId());
// release state : online/offline
ReleaseState releaseState = processDefinition.getReleaseState();
- if (processDefinition == null || releaseState == ReleaseState.OFFLINE) {
+ if (releaseState == ReleaseState.OFFLINE) {
logger.warn("process definition does not exist in db or offline,need not to create command, projectId:{}, processId:{}", projectId, scheduleId);
return;
}
@@ -107,7 +105,6 @@ public class ProcessScheduleJob implements Job {
getProcessService().createCommand(command);
}
-
/**
* delete job
*/
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
index 3b15810e05..fd91e4076d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
@@ -14,15 +14,76 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.quartz;
+import static org.apache.dolphinscheduler.common.Constants.ORG_POSTGRESQL_DRIVER;
+import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS;
+import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK;
+import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_CLASS;
+import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL;
+import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_DATASOURCE;
+import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS;
+import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_ISCLUSTERED;
+import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD;
+import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX;
+import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES;
+import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID;
+import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME;
+import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON;
+import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_CLASS;
+import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS;
+import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_THREADCOUNT;
+import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY;
+import static org.apache.dolphinscheduler.common.Constants.PROJECT_ID;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_ACQUIRETRIGGERSWITHINLOCK;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_CLUSTERCHECKININTERVAL;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_DATASOURCE;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_INSTANCEID;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_INSTANCENAME;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_GROUP_PRIFIX;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_PRIFIX;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_MISFIRETHRESHOLD;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_PROPERTIES_PATH;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_TABLE_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_THREADCOUNT;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_THREADPRIORITY;
+import static org.apache.dolphinscheduler.common.Constants.SCHEDULE;
+import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_ID;
+import static org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME;
+import static org.apache.dolphinscheduler.common.Constants.STRING_FALSE;
+import static org.apache.dolphinscheduler.common.Constants.STRING_TRUE;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+
+import static org.quartz.CronScheduleBuilder.cronSchedule;
+import static org.quartz.JobBuilder.newJob;
+import static org.quartz.TriggerBuilder.newTrigger;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
+
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.lang.StringUtils;
-import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.dao.entity.Schedule;
-import org.quartz.*;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.quartz.CronTrigger;
+import org.quartz.Job;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.jdbcjobstore.JobStoreTX;
import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
@@ -32,300 +93,289 @@ import org.quartz.simpl.SimpleThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.dolphinscheduler.common.Constants.*;
-import static org.quartz.CronScheduleBuilder.cronSchedule;
-import static org.quartz.JobBuilder.newJob;
-import static org.quartz.TriggerBuilder.newTrigger;
-
/**
* single Quartz executors instance
*/
public class QuartzExecutors {
- /**
- * logger of QuartzExecutors
- */
- private static final Logger logger = LoggerFactory.getLogger(QuartzExecutors.class);
-
- /**
- * read write lock
- */
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- /**
- * A Scheduler maintains a registry of org.quartz.JobDetail and Trigger.
- */
- private static Scheduler scheduler;
-
- /**
- * load conf
- */
- private static Configuration conf;
-
- private static final class Holder {
- private static final QuartzExecutors instance = new QuartzExecutors();
- }
-
-
- private QuartzExecutors() {
- try {
- conf = new PropertiesConfiguration(QUARTZ_PROPERTIES_PATH);
- init();
- }catch (ConfigurationException e){
- logger.warn("not loaded quartz configuration file, will used default value",e);
+ /**
+ * logger of QuartzExecutors
+ */
+ private static final Logger logger = LoggerFactory.getLogger(QuartzExecutors.class);
+
+ /**
+ * read write lock
+ */
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /**
+ * A Scheduler maintains a registry of org.quartz.JobDetail and Trigger.
+ */
+ private static Scheduler scheduler;
+
+ /**
+ * load conf
+ */
+ private static Configuration conf;
+
+ private static final class Holder {
+ private static final QuartzExecutors instance = new QuartzExecutors();
}
- }
-
- /**
- * thread safe and performance promote
- * @return instance of Quartz Executors
- */
- public static QuartzExecutors getInstance() {
- return Holder.instance;
- }
-
-
- /**
- * init
- *
- * Returns a client-usable handle to a Scheduler.
- */
- private void init() {
- try {
- StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
- Properties properties = new Properties();
-
- String dataSourceDriverClass = org.apache.dolphinscheduler.dao.utils.PropertyUtils.getString(SPRING_DATASOURCE_DRIVER_CLASS_NAME);
- if (dataSourceDriverClass.equals(ORG_POSTGRESQL_DRIVER)){
- properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, PostgreSQLDelegate.class.getName()));
- } else {
- properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, StdJDBCDelegate.class.getName()));
- }
- properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCENAME, conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCENAME, QUARTZ_INSTANCENAME));
- properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCEID, conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCEID, QUARTZ_INSTANCEID));
- properties.setProperty(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,conf.getString(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,STRING_TRUE));
- properties.setProperty(ORG_QUARTZ_JOBSTORE_USEPROPERTIES,conf.getString(ORG_QUARTZ_JOBSTORE_USEPROPERTIES,STRING_FALSE));
- properties.setProperty(ORG_QUARTZ_THREADPOOL_CLASS,conf.getString(ORG_QUARTZ_THREADPOOL_CLASS, SimpleThreadPool.class.getName()));
- properties.setProperty(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,conf.getString(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,STRING_TRUE));
- properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADCOUNT,conf.getString(ORG_QUARTZ_THREADPOOL_THREADCOUNT, QUARTZ_THREADCOUNT));
- properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADPRIORITY,conf.getString(ORG_QUARTZ_THREADPOOL_THREADPRIORITY, QUARTZ_THREADPRIORITY));
- properties.setProperty(ORG_QUARTZ_JOBSTORE_CLASS,conf.getString(ORG_QUARTZ_JOBSTORE_CLASS, JobStoreTX.class.getName()));
- properties.setProperty(ORG_QUARTZ_JOBSTORE_TABLEPREFIX,conf.getString(ORG_QUARTZ_JOBSTORE_TABLEPREFIX, QUARTZ_TABLE_PREFIX));
- properties.setProperty(ORG_QUARTZ_JOBSTORE_ISCLUSTERED,conf.getString(ORG_QUARTZ_JOBSTORE_ISCLUSTERED,STRING_TRUE));
- properties.setProperty(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD,conf.getString(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, QUARTZ_MISFIRETHRESHOLD));
- properties.setProperty(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL,conf.getString(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, QUARTZ_CLUSTERCHECKININTERVAL));
- properties.setProperty(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK,conf.getString(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, QUARTZ_ACQUIRETRIGGERSWITHINLOCK));
- properties.setProperty(ORG_QUARTZ_JOBSTORE_DATASOURCE,conf.getString(ORG_QUARTZ_JOBSTORE_DATASOURCE, QUARTZ_DATASOURCE));
- properties.setProperty(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,conf.getString(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,DruidConnectionProvider.class.getName()));
-
- schedulerFactory.initialize(properties);
- scheduler = schedulerFactory.getScheduler();
-
- } catch (SchedulerException e) {
- logger.error(e.getMessage(),e);
- System.exit(1);
+
+ private QuartzExecutors() {
+ try {
+ conf = new PropertiesConfiguration(QUARTZ_PROPERTIES_PATH);
+ init();
+ } catch (ConfigurationException e) {
+ logger.warn("not loaded quartz configuration file, will used default value", e);
+ }
}
- }
-
- /**
- * Whether the scheduler has been started.
- *
- * @throws SchedulerException scheduler exception
- */
- public void start() throws SchedulerException {
- if (!scheduler.isStarted()){
- scheduler.start();
- logger.info("Quartz service started" );
+ /**
+ * thread safe and performance promote
+ *
+ * @return instance of Quartz Executors
+ */
+ public static QuartzExecutors getInstance() {
+ return Holder.instance;
}
- }
-
- /**
- * stop all scheduled tasks
- *
- * Halts the Scheduler's firing of Triggers,
- * and cleans up all resources associated with the Scheduler.
- *
- * The scheduler cannot be re-started.
- * @throws SchedulerException scheduler exception
- */
- public void shutdown() throws SchedulerException {
- if (!scheduler.isShutdown()) {
- // don't wait for the task to complete
- scheduler.shutdown();
- logger.info("Quartz service stopped, and halt all tasks");
+
+ /**
+ * init
+ *
+ * Returns a client-usable handle to a Scheduler.
+ */
+ private void init() {
+ try {
+ StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
+ Properties properties = new Properties();
+
+ String dataSourceDriverClass = org.apache.dolphinscheduler.dao.utils.PropertyUtils.getString(SPRING_DATASOURCE_DRIVER_CLASS_NAME);
+ if (dataSourceDriverClass.equals(ORG_POSTGRESQL_DRIVER)) {
+ properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, PostgreSQLDelegate.class.getName()));
+ } else {
+ properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, StdJDBCDelegate.class.getName()));
+ }
+ properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCENAME, conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCENAME, QUARTZ_INSTANCENAME));
+ properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCEID, conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCEID, QUARTZ_INSTANCEID));
+ properties.setProperty(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON, conf.getString(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON, STRING_TRUE));
+ properties.setProperty(ORG_QUARTZ_JOBSTORE_USEPROPERTIES, conf.getString(ORG_QUARTZ_JOBSTORE_USEPROPERTIES, STRING_FALSE));
+ properties.setProperty(ORG_QUARTZ_THREADPOOL_CLASS, conf.getString(ORG_QUARTZ_THREADPOOL_CLASS, SimpleThreadPool.class.getName()));
+ properties.setProperty(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS, conf.getString(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS, STRING_TRUE));
+ properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADCOUNT, conf.getString(ORG_QUARTZ_THREADPOOL_THREADCOUNT, QUARTZ_THREADCOUNT));
+ properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADPRIORITY, conf.getString(ORG_QUARTZ_THREADPOOL_THREADPRIORITY, QUARTZ_THREADPRIORITY));
+ properties.setProperty(ORG_QUARTZ_JOBSTORE_CLASS, conf.getString(ORG_QUARTZ_JOBSTORE_CLASS, JobStoreTX.class.getName()));
+ properties.setProperty(ORG_QUARTZ_JOBSTORE_TABLEPREFIX, conf.getString(ORG_QUARTZ_JOBSTORE_TABLEPREFIX, QUARTZ_TABLE_PREFIX));
+ properties.setProperty(ORG_QUARTZ_JOBSTORE_ISCLUSTERED, conf.getString(ORG_QUARTZ_JOBSTORE_ISCLUSTERED, STRING_TRUE));
+ properties.setProperty(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, conf.getString(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, QUARTZ_MISFIRETHRESHOLD));
+ properties.setProperty(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, conf.getString(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, QUARTZ_CLUSTERCHECKININTERVAL));
+ properties.setProperty(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, conf.getString(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, QUARTZ_ACQUIRETRIGGERSWITHINLOCK));
+ properties.setProperty(ORG_QUARTZ_JOBSTORE_DATASOURCE, conf.getString(ORG_QUARTZ_JOBSTORE_DATASOURCE, QUARTZ_DATASOURCE));
+ properties.setProperty(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS, conf.getString(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS, DruidConnectionProvider.class.getName()));
+
+ schedulerFactory.initialize(properties);
+ scheduler = schedulerFactory.getScheduler();
+
+ } catch (SchedulerException e) {
+ logger.error(e.getMessage(), e);
+ System.exit(1);
+ }
+
}
- }
-
-
- /**
- * add task trigger , if this task already exists, return this task with updated trigger
- *
- * @param clazz job class name
- * @param jobName job name
- * @param jobGroupName job group name
- * @param startDate job start date
- * @param endDate job end date
- * @param cronExpression cron expression
- * @param jobDataMap job parameters data map
- */
- public void addJob(Class extends Job> clazz,String jobName,String jobGroupName,Date startDate, Date endDate,
- String cronExpression,
- Map jobDataMap) {
- lock.writeLock().lock();
- try {
-
- JobKey jobKey = new JobKey(jobName, jobGroupName);
- JobDetail jobDetail;
- //add a task (if this task already exists, return this task directly)
- if (scheduler.checkExists(jobKey)) {
-
- jobDetail = scheduler.getJobDetail(jobKey);
- if (jobDataMap != null) {
- jobDetail.getJobDataMap().putAll(jobDataMap);
+
+ /**
+ * Whether the scheduler has been started.
+ *
+ * @throws SchedulerException scheduler exception
+ */
+ public void start() throws SchedulerException {
+ if (!scheduler.isStarted()) {
+ scheduler.start();
+ logger.info("Quartz service started");
}
- } else {
- jobDetail = newJob(clazz).withIdentity(jobKey).build();
+ }
- if (jobDataMap != null) {
- jobDetail.getJobDataMap().putAll(jobDataMap);
+ /**
+ * stop all scheduled tasks
+ *
+ * Halts the Scheduler's firing of Triggers,
+ * and cleans up all resources associated with the Scheduler.
+ *
+ * The scheduler cannot be re-started.
+ *
+ * @throws SchedulerException scheduler exception
+ */
+ public void shutdown() throws SchedulerException {
+ if (!scheduler.isShutdown()) {
+ // don't wait for the task to complete
+ scheduler.shutdown();
+ logger.info("Quartz service stopped, and halt all tasks");
}
+ }
- scheduler.addJob(jobDetail, false, true);
-
- logger.info("Add job, job name: {}, group name: {}",
- jobName, jobGroupName);
- }
-
- TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName);
- /**
- * Instructs the Scheduler that upon a mis-fire
- * situation, the CronTrigger wants to have it's
- * next-fire-time updated to the next time in the schedule after the
- * current time (taking into account any associated Calendar),
- * but it does not want to be fired now.
- */
- CronTrigger cronTrigger = newTrigger().withIdentity(triggerKey).startAt(startDate).endAt(endDate)
- .withSchedule(cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing())
- .forJob(jobDetail).build();
-
- if (scheduler.checkExists(triggerKey)) {
- // updateProcessInstance scheduler trigger when scheduler cycle changes
- CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
- String oldCronExpression = oldCronTrigger.getCronExpression();
-
- if (!StringUtils.equalsIgnoreCase(cronExpression,oldCronExpression)) {
- // reschedule job trigger
- scheduler.rescheduleJob(triggerKey, cronTrigger);
- logger.info("reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
- jobName, jobGroupName, cronExpression, startDate, endDate);
- }
- } else {
- scheduler.scheduleJob(cronTrigger);
- logger.info("schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
- jobName, jobGroupName, cronExpression, startDate, endDate);
- }
-
- } catch (Exception e) {
- logger.error("add job failed", e);
- throw new RuntimeException("add job failed", e);
- } finally {
- lock.writeLock().unlock();
+ /**
+ * add task trigger , if this task already exists, return this task with updated trigger
+ *
+ * @param clazz job class name
+ * @param jobName job name
+ * @param jobGroupName job group name
+ * @param startDate job start date
+ * @param endDate job end date
+ * @param cronExpression cron expression
+ * @param jobDataMap job parameters data map
+ */
+ public void addJob(Class extends Job> clazz, String jobName, String jobGroupName, Date startDate, Date endDate,
+ String cronExpression,
+ Map jobDataMap) {
+ lock.writeLock().lock();
+ try {
+
+ JobKey jobKey = new JobKey(jobName, jobGroupName);
+ JobDetail jobDetail;
+ //add a task (if this task already exists, return this task directly)
+ if (scheduler.checkExists(jobKey)) {
+
+ jobDetail = scheduler.getJobDetail(jobKey);
+ if (jobDataMap != null) {
+ jobDetail.getJobDataMap().putAll(jobDataMap);
+ }
+ } else {
+ jobDetail = newJob(clazz).withIdentity(jobKey).build();
+
+ if (jobDataMap != null) {
+ jobDetail.getJobDataMap().putAll(jobDataMap);
+ }
+
+ scheduler.addJob(jobDetail, false, true);
+
+ logger.info("Add job, job name: {}, group name: {}",
+ jobName, jobGroupName);
+ }
+
+ TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName);
+ /**
+ * Instructs the Scheduler that upon a mis-fire
+ * situation, the CronTrigger wants to have it's
+ * next-fire-time updated to the next time in the schedule after the
+ * current time (taking into account any associated Calendar),
+ * but it does not want to be fired now.
+ */
+ CronTrigger cronTrigger = newTrigger().withIdentity(triggerKey).startAt(startDate).endAt(endDate)
+ .withSchedule(cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing())
+ .forJob(jobDetail).build();
+
+ if (scheduler.checkExists(triggerKey)) {
+ // updateProcessInstance scheduler trigger when scheduler cycle changes
+ CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
+ String oldCronExpression = oldCronTrigger.getCronExpression();
+
+ if (!StringUtils.equalsIgnoreCase(cronExpression, oldCronExpression)) {
+ // reschedule job trigger
+ scheduler.rescheduleJob(triggerKey, cronTrigger);
+ logger.info("reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
+ jobName, jobGroupName, cronExpression, startDate, endDate);
+ }
+ } else {
+ scheduler.scheduleJob(cronTrigger);
+ logger.info("schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
+ jobName, jobGroupName, cronExpression, startDate, endDate);
+ }
+
+ } catch (Exception e) {
+ throw new ServiceException("add job failed", e);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
- }
-
-
- /**
- * delete job
- *
- * @param jobName job name
- * @param jobGroupName job group name
- * @return true if the Job was found and deleted.
- */
- public boolean deleteJob(String jobName, String jobGroupName) {
- lock.writeLock().lock();
- try {
- JobKey jobKey = new JobKey(jobName,jobGroupName);
- if(scheduler.checkExists(jobKey)){
- logger.info("try to delete job, job name: {}, job group name: {},", jobName, jobGroupName);
- return scheduler.deleteJob(jobKey);
- }else {
- return true;
- }
-
- } catch (SchedulerException e) {
- logger.error("delete job : {} failed",jobName, e);
- } finally {
- lock.writeLock().unlock();
+
+ /**
+ * delete job
+ *
+ * @param jobName job name
+ * @param jobGroupName job group name
+ * @return true if the Job was found and deleted.
+ */
+ public boolean deleteJob(String jobName, String jobGroupName) {
+ lock.writeLock().lock();
+ try {
+ JobKey jobKey = new JobKey(jobName, jobGroupName);
+ if (scheduler.checkExists(jobKey)) {
+ logger.info("try to delete job, job name: {}, job group name: {},", jobName, jobGroupName);
+ return scheduler.deleteJob(jobKey);
+ } else {
+ return true;
+ }
+
+ } catch (SchedulerException e) {
+ logger.error("delete job : {} failed", jobName, e);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ return false;
+ }
+
+ /**
+ * delete all jobs in job group
+ *
+ * @param jobGroupName job group name
+ * @return true if all of the Jobs were found and deleted, false if
+ * one or more were not deleted.
+ */
+ public boolean deleteAllJobs(String jobGroupName) {
+ lock.writeLock().lock();
+ try {
+ logger.info("try to delete all jobs in job group: {}", jobGroupName);
+ List jobKeys = new ArrayList<>();
+ jobKeys.addAll(scheduler.getJobKeys(GroupMatcher.groupEndsWith(jobGroupName)));
+
+ return scheduler.deleteJobs(jobKeys);
+ } catch (SchedulerException e) {
+ logger.error("delete all jobs in job group: {} failed", jobGroupName, e);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ return false;
}
- return false;
- }
-
- /**
- * delete all jobs in job group
- *
- * @param jobGroupName job group name
- *
- * @return true if all of the Jobs were found and deleted, false if
- * one or more were not deleted.
- */
- public boolean deleteAllJobs(String jobGroupName) {
- lock.writeLock().lock();
- try {
- logger.info("try to delete all jobs in job group: {}", jobGroupName);
- List jobKeys = new ArrayList<>();
- jobKeys.addAll(scheduler.getJobKeys(GroupMatcher.groupEndsWith(jobGroupName)));
-
- return scheduler.deleteJobs(jobKeys);
- } catch (SchedulerException e) {
- logger.error("delete all jobs in job group: {} failed",jobGroupName, e);
- } finally {
- lock.writeLock().unlock();
+
+ /**
+ * build job name
+ *
+ * @param processId process id
+ * @return job name
+ */
+ public static String buildJobName(int processId) {
+ StringBuilder sb = new StringBuilder(30);
+ sb.append(QUARTZ_JOB_PRIFIX).append(UNDERLINE).append(processId);
+ return sb.toString();
+ }
+
+ /**
+ * build job group name
+ *
+ * @param projectId project id
+ * @return job group name
+ */
+ public static String buildJobGroupName(int projectId) {
+ StringBuilder sb = new StringBuilder(30);
+ sb.append(QUARTZ_JOB_GROUP_PRIFIX).append(UNDERLINE).append(projectId);
+ return sb.toString();
+ }
+
+ /**
+ * add params to map
+ *
+ * @param projectId project id
+ * @param scheduleId schedule id
+ * @param schedule schedule
+ * @return data map
+ */
+ public static Map buildDataMap(int projectId, int scheduleId, Schedule schedule) {
+ Map dataMap = new HashMap<>(3);
+ dataMap.put(PROJECT_ID, projectId);
+ dataMap.put(SCHEDULE_ID, scheduleId);
+ dataMap.put(SCHEDULE, JSONUtils.toJsonString(schedule));
+
+ return dataMap;
}
- return false;
- }
-
- /**
- * build job name
- * @param processId process id
- * @return job name
- */
- public static String buildJobName(int processId) {
- StringBuilder sb = new StringBuilder(30);
- sb.append(QUARTZ_JOB_PRIFIX).append(UNDERLINE).append(processId);
- return sb.toString();
- }
-
- /**
- * build job group name
- * @param projectId project id
- * @return job group name
- */
- public static String buildJobGroupName(int projectId) {
- StringBuilder sb = new StringBuilder(30);
- sb.append(QUARTZ_JOB_GROUP_PRIFIX).append(UNDERLINE).append(projectId);
- return sb.toString();
- }
-
- /**
- * add params to map
- *
- * @param projectId project id
- * @param scheduleId schedule id
- * @param schedule schedule
- * @return data map
- */
- public static Map buildDataMap(int projectId, int scheduleId, Schedule schedule) {
- Map dataMap = new HashMap<>(3);
- dataMap.put(PROJECT_ID, projectId);
- dataMap.put(SCHEDULE_ID, scheduleId);
- dataMap.put(SCHEDULE, JSONUtils.toJsonString(schedule));
-
- return dataMap;
- }
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java
index 0a2e31b610..60c862340b 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java
@@ -14,159 +14,177 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.quartz.cron;
+import org.apache.dolphinscheduler.common.enums.CycleEnum;
+
import com.cronutils.model.Cron;
import com.cronutils.model.field.CronField;
import com.cronutils.model.field.CronFieldName;
-import com.cronutils.model.field.expression.*;
-import org.apache.dolphinscheduler.common.enums.CycleEnum;
+import com.cronutils.model.field.expression.Always;
+import com.cronutils.model.field.expression.And;
+import com.cronutils.model.field.expression.Between;
+import com.cronutils.model.field.expression.Every;
+import com.cronutils.model.field.expression.FieldExpression;
+import com.cronutils.model.field.expression.On;
/**
* Cycle
*/
public abstract class AbstractCycle {
- protected Cron cron;
-
- protected CronField minField;
- protected CronField hourField;
- protected CronField dayOfMonthField;
- protected CronField dayOfWeekField;
- protected CronField monthField;
- protected CronField yearField;
-
- public CycleLinks addCycle(AbstractCycle cycle) {
- return new CycleLinks(this.cron).addCycle(this).addCycle(cycle);
- }
-
- /**
- * cycle constructor
- * @param cron cron
- */
- public AbstractCycle(Cron cron) {
- if (cron == null) {
- throw new IllegalArgumentException("cron must not be null!");
+ protected Cron cron;
+
+ protected CronField minField;
+ protected CronField hourField;
+ protected CronField dayOfMonthField;
+ protected CronField dayOfWeekField;
+ protected CronField monthField;
+ protected CronField yearField;
+
+ public CycleLinks addCycle(AbstractCycle cycle) {
+ return new CycleLinks(this.cron).addCycle(this).addCycle(cycle);
+ }
+
+ /**
+ * cycle constructor
+ *
+ * @param cron cron
+ */
+ protected AbstractCycle(Cron cron) {
+ if (cron == null) {
+ throw new IllegalArgumentException("cron must not be null!");
+ }
+
+ this.cron = cron;
+ this.minField = cron.retrieve(CronFieldName.MINUTE);
+ this.hourField = cron.retrieve(CronFieldName.HOUR);
+ this.dayOfMonthField = cron.retrieve(CronFieldName.DAY_OF_MONTH);
+ this.dayOfWeekField = cron.retrieve(CronFieldName.DAY_OF_WEEK);
+ this.monthField = cron.retrieve(CronFieldName.MONTH);
+ this.yearField = cron.retrieve(CronFieldName.YEAR);
+ }
+
+ /**
+ * whether the minute field has a value
+ *
+ * @return if minute field has a value return true,else return false
+ */
+ protected boolean minFiledIsSetAll() {
+ FieldExpression minFieldExpression = minField.getExpression();
+ return (minFieldExpression instanceof Every || minFieldExpression instanceof Always
+ || minFieldExpression instanceof Between || minFieldExpression instanceof And
+ || minFieldExpression instanceof On);
+ }
+
+ /**
+ * whether the minute field has a value of every or always
+ *
+ * @return if minute field has a value of every or always return true,else return false
+ */
+ protected boolean minFiledIsEvery() {
+ FieldExpression minFieldExpression = minField.getExpression();
+ return (minFieldExpression instanceof Every || minFieldExpression instanceof Always);
+ }
+
+ /**
+ * whether the hour field has a value
+ *
+ * @return if hour field has a value return true,else return false
+ */
+ protected boolean hourFiledIsSetAll() {
+ FieldExpression hourFieldExpression = hourField.getExpression();
+ return (hourFieldExpression instanceof Every || hourFieldExpression instanceof Always
+ || hourFieldExpression instanceof Between || hourFieldExpression instanceof And
+ || hourFieldExpression instanceof On);
+ }
+
+ /**
+ * whether the hour field has a value of every or always
+ *
+ * @return if hour field has a value of every or always return true,else return false
+ */
+ protected boolean hourFiledIsEvery() {
+ FieldExpression hourFieldExpression = hourField.getExpression();
+ return (hourFieldExpression instanceof Every || hourFieldExpression instanceof Always);
+ }
+
+ /**
+ * whether the day Of month field has a value
+ *
+ * @return if day Of month field has a value return true,else return false
+ */
+ protected boolean dayOfMonthFieldIsSetAll() {
+ return (dayOfMonthField.getExpression() instanceof Every || dayOfMonthField.getExpression() instanceof Always
+ || dayOfMonthField.getExpression() instanceof Between || dayOfMonthField.getExpression() instanceof And
+ || dayOfMonthField.getExpression() instanceof On);
+ }
+
+ /**
+ * whether the day Of Month field has a value of every or always
+ *
+ * @return if day Of Month field has a value of every or always return true,else return false
+ */
+ protected boolean dayOfMonthFieldIsEvery() {
+ return (dayOfMonthField.getExpression() instanceof Every || dayOfMonthField.getExpression() instanceof Always);
+ }
+
+ /**
+ * whether month field has a value
+ *
+ * @return if month field has a value return true,else return false
+ */
+ protected boolean monthFieldIsSetAll() {
+ FieldExpression monthFieldExpression = monthField.getExpression();
+ return (monthFieldExpression instanceof Every || monthFieldExpression instanceof Always
+ || monthFieldExpression instanceof Between || monthFieldExpression instanceof And
+ || monthFieldExpression instanceof On);
+ }
+
+ /**
+ * whether the month field has a value of every or always
+ *
+ * @return if month field has a value of every or always return true,else return false
+ */
+ protected boolean monthFieldIsEvery() {
+ FieldExpression monthFieldExpression = monthField.getExpression();
+ return (monthFieldExpression instanceof Every || monthFieldExpression instanceof Always);
+ }
+
+ /**
+ * whether the day Of week field has a value
+ *
+ * @return if day Of week field has a value return true,else return false
+ */
+ protected boolean dayofWeekFieldIsSetAll() {
+ FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression();
+ return (dayOfWeekFieldExpression instanceof Every || dayOfWeekFieldExpression instanceof Always
+ || dayOfWeekFieldExpression instanceof Between || dayOfWeekFieldExpression instanceof And
+ || dayOfWeekFieldExpression instanceof On);
+ }
+
+ /**
+ * whether the day Of week field has a value of every or always
+ *
+ * @return if day Of week field has a value of every or always return true,else return false
+ */
+ protected boolean dayofWeekFieldIsEvery() {
+ FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression();
+ return (dayOfWeekFieldExpression instanceof Every || dayOfWeekFieldExpression instanceof Always);
}
- this.cron = cron;
- this.minField = cron.retrieve(CronFieldName.MINUTE);
- this.hourField = cron.retrieve(CronFieldName.HOUR);
- this.dayOfMonthField = cron.retrieve(CronFieldName.DAY_OF_MONTH);
- this.dayOfWeekField = cron.retrieve(CronFieldName.DAY_OF_WEEK);
- this.monthField = cron.retrieve(CronFieldName.MONTH);
- this.yearField = cron.retrieve(CronFieldName.YEAR);
- }
-
- /**
- * whether the minute field has a value
- * @return if minute field has a value return true,else return false
- */
- protected boolean minFiledIsSetAll(){
- FieldExpression minFieldExpression = minField.getExpression();
- return (minFieldExpression instanceof Every || minFieldExpression instanceof Always
- || minFieldExpression instanceof Between || minFieldExpression instanceof And
- || minFieldExpression instanceof On);
- }
-
-
- /**
- * whether the minute field has a value of every or always
- * @return if minute field has a value of every or always return true,else return false
- */
- protected boolean minFiledIsEvery(){
- FieldExpression minFieldExpression = minField.getExpression();
- return (minFieldExpression instanceof Every || minFieldExpression instanceof Always);
- }
-
- /**
- * whether the hour field has a value
- * @return if hour field has a value return true,else return false
- */
- protected boolean hourFiledIsSetAll(){
- FieldExpression hourFieldExpression = hourField.getExpression();
- return (hourFieldExpression instanceof Every || hourFieldExpression instanceof Always
- || hourFieldExpression instanceof Between || hourFieldExpression instanceof And
- || hourFieldExpression instanceof On);
- }
-
- /**
- * whether the hour field has a value of every or always
- * @return if hour field has a value of every or always return true,else return false
- */
- protected boolean hourFiledIsEvery(){
- FieldExpression hourFieldExpression = hourField.getExpression();
- return (hourFieldExpression instanceof Every || hourFieldExpression instanceof Always);
- }
-
- /**
- * whether the day Of month field has a value
- * @return if day Of month field has a value return true,else return false
- */
- protected boolean dayOfMonthFieldIsSetAll(){
- return (dayOfMonthField.getExpression() instanceof Every || dayOfMonthField.getExpression() instanceof Always
- || dayOfMonthField.getExpression() instanceof Between || dayOfMonthField.getExpression() instanceof And
- || dayOfMonthField.getExpression() instanceof On);
- }
-
-
- /**
- * whether the day Of Month field has a value of every or always
- * @return if day Of Month field has a value of every or always return true,else return false
- */
- protected boolean dayOfMonthFieldIsEvery(){
- return (dayOfMonthField.getExpression() instanceof Every || dayOfMonthField.getExpression() instanceof Always);
- }
-
- /**
- * whether month field has a value
- * @return if month field has a value return true,else return false
- */
- protected boolean monthFieldIsSetAll(){
- FieldExpression monthFieldExpression = monthField.getExpression();
- return (monthFieldExpression instanceof Every || monthFieldExpression instanceof Always
- || monthFieldExpression instanceof Between || monthFieldExpression instanceof And
- || monthFieldExpression instanceof On);
- }
-
- /**
- * whether the month field has a value of every or always
- * @return if month field has a value of every or always return true,else return false
- */
- protected boolean monthFieldIsEvery(){
- FieldExpression monthFieldExpression = monthField.getExpression();
- return (monthFieldExpression instanceof Every || monthFieldExpression instanceof Always);
- }
-
- /**
- * whether the day Of week field has a value
- * @return if day Of week field has a value return true,else return false
- */
- protected boolean dayofWeekFieldIsSetAll(){
- FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression();
- return (dayOfWeekFieldExpression instanceof Every || dayOfWeekFieldExpression instanceof Always
- || dayOfWeekFieldExpression instanceof Between || dayOfWeekFieldExpression instanceof And
- || dayOfWeekFieldExpression instanceof On);
- }
-
- /**
- * whether the day Of week field has a value of every or always
- * @return if day Of week field has a value of every or always return true,else return false
- */
- protected boolean dayofWeekFieldIsEvery(){
- FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression();
- return (dayOfWeekFieldExpression instanceof Every || dayOfWeekFieldExpression instanceof Always);
- }
-
- /**
- * get cycle enum
- * @return CycleEnum
- */
- protected abstract CycleEnum getCycle();
-
- /**
- * get mini level cycle enum
- * @return CycleEnum
- */
- protected abstract CycleEnum getMiniCycle();
+ /**
+ * get cycle enum
+ *
+ * @return CycleEnum
+ */
+ protected abstract CycleEnum getCycle();
+
+ /**
+ * get mini level cycle enum
+ *
+ * @return CycleEnum
+ */
+ protected abstract CycleEnum getMiniCycle();
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
index 8a7d891c2e..37d8f10c93 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
@@ -14,322 +14,329 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.zk;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.COLON;
+import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
+import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.dolphinscheduler.common.utils.StringUtils;
+
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
-import java.util.*;
-
-import static org.apache.dolphinscheduler.common.Constants.*;
-
/**
* abstract zookeeper client
*/
@Component
public abstract class AbstractZKClient extends ZookeeperCachedOperator {
- private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class);
-
-
- /**
- * remove dead server by host
- * @param host host
- * @param serverType serverType
- * @throws Exception
- */
- public void removeDeadServerByHost(String host, String serverType) throws Exception {
- List deadServers = super.getChildrenKeys(getDeadZNodeParentPath());
- for(String serverPath : deadServers){
- if(serverPath.startsWith(serverType+UNDERLINE+host)){
- String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
- super.remove(server);
- logger.info("{} server {} deleted from zk dead server path success" , serverType , host);
- }
- }
- }
-
-
- /**
- * opType(add): if find dead server , then add to zk deadServerPath
- * opType(delete): delete path from zk
- *
- * @param zNode node path
- * @param zkNodeType master or worker
- * @param opType delete or add
- * @throws Exception errors
- */
- public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception {
- String host = getHostByEventDataPath(zNode);
- String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
-
- //check server restart, if restart , dead server path in zk should be delete
- if(opType.equals(DELETE_ZK_OP)){
- removeDeadServerByHost(host, type);
-
- }else if(opType.equals(ADD_ZK_OP)){
- String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
- if(!super.isExisted(deadServerPath)){
- //add dead server info to zk dead server path : /dead-servers/
-
- super.persist(deadServerPath,(type + UNDERLINE + host));
-
- logger.info("{} server dead , and {} added to zk dead server path success" ,
- zkNodeType.toString(), zNode);
- }
- }
-
- }
-
- /**
- * get active master num
- * @return active master number
- */
- public int getActiveMasterNum(){
- List childrenList = new ArrayList<>();
- try {
- // read master node parent path from conf
- if(super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))){
- childrenList = super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
- }
- } catch (Exception e) {
- logger.error("getActiveMasterNum error",e);
- }
- return childrenList.size();
- }
-
- /**
- *
- * @return zookeeper quorum
- */
- public String getZookeeperQuorum(){
- return getZookeeperConfig().getServerList();
- }
-
- /**
- * get server list.
- * @param zkNodeType zookeeper node type
- * @return server list
- */
- public List getServersList(ZKNodeType zkNodeType){
- Map masterMap = getServerMaps(zkNodeType);
- String parentPath = getZNodeParentPath(zkNodeType);
-
- List masterServers = new ArrayList<>();
- for (Map.Entry entry : masterMap.entrySet()) {
- Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue());
- if(masterServer == null){
- continue;
- }
- String key = entry.getKey();
- masterServer.setZkDirectory(parentPath + "/"+ key);
- //set host and port
- String[] hostAndPort=key.split(COLON);
- String[] hosts=hostAndPort[0].split(DIVISION_STRING);
- // fetch the last one
- masterServer.setHost(hosts[hosts.length-1]);
- masterServer.setPort(Integer.parseInt(hostAndPort[1]));
- masterServers.add(masterServer);
- }
- return masterServers;
- }
-
- /**
- * get master server list map.
- * @param zkNodeType zookeeper node type
- * @return result : {host : resource info}
- */
- public Map getServerMaps(ZKNodeType zkNodeType){
-
- Map masterMap = new HashMap<>();
- try {
- String path = getZNodeParentPath(zkNodeType);
- List serverList = super.getChildrenKeys(path);
- if(zkNodeType == ZKNodeType.WORKER){
- List workerList = new ArrayList<>();
- for(String group : serverList){
- List groupServers = super.getChildrenKeys(path + Constants.SLASH + group);
- for(String groupServer : groupServers){
- workerList.add(group + Constants.SLASH + groupServer);
- }
- }
- serverList = workerList;
- }
- for(String server : serverList){
- masterMap.putIfAbsent(server, super.get(path + Constants.SLASH + server));
- }
- } catch (Exception e) {
- logger.error("get server list failed", e);
- }
-
- return masterMap;
- }
-
- /**
- * check the zookeeper node already exists
- * @param host host
- * @param zkNodeType zookeeper node type
- * @return true if exists
- */
- public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
- String path = getZNodeParentPath(zkNodeType);
- if(StringUtils.isEmpty(path)){
- logger.error("check zk node exists error, host:{}, zk node type:{}",
- host, zkNodeType.toString());
- return false;
- }
- Map serverMaps = getServerMaps(zkNodeType);
- for(String hostKey : serverMaps.keySet()){
- if(hostKey.contains(host)){
- return true;
- }
- }
- return false;
- }
-
- /**
- *
- * @return get worker node parent path
- */
- protected String getWorkerZNodeParentPath(){
- return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
- }
-
- /**
- *
- * @return get master node parent path
- */
- protected String getMasterZNodeParentPath(){
- return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS;
- }
-
- /**
- *
- * @return get master lock path
- */
- public String getMasterLockPath(){
- return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS;
- }
-
- /**
- *
- * @param zkNodeType zookeeper node type
- * @return get zookeeper node parent path
- */
- public String getZNodeParentPath(ZKNodeType zkNodeType) {
- String path = "";
- switch (zkNodeType){
- case MASTER:
- return getMasterZNodeParentPath();
- case WORKER:
- return getWorkerZNodeParentPath();
- case DEAD_SERVER:
- return getDeadZNodeParentPath();
- default:
- break;
- }
- return path;
- }
-
- /**
- *
- * @return get dead server node parent path
- */
- protected String getDeadZNodeParentPath(){
- return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
- }
-
- /**
- *
- * @return get master start up lock path
- */
- public String getMasterStartUpLockPath(){
- return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
- }
-
- /**
- *
- * @return get master failover lock path
- */
- public String getMasterFailoverLockPath(){
- return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
- }
-
- /**
- *
- * @return get worker failover lock path
- */
- public String getWorkerFailoverLockPath(){
- return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
- }
-
- /**
- * release mutex
- * @param mutex mutex
- */
- public void releaseMutex(InterProcessMutex mutex) {
- if (mutex != null){
- try {
- mutex.release();
- } catch (Exception e) {
- if("instance must be started before calling this method".equals(e.getMessage())){
- logger.warn("lock release");
- }else{
- logger.error("lock release failed",e);
- }
-
- }
- }
- }
-
- /**
- * init system znode
- */
- protected void initSystemZNode(){
- try {
- persist(getMasterZNodeParentPath(), "");
- persist(getWorkerZNodeParentPath(), "");
- persist(getDeadZNodeParentPath(), "");
-
- logger.info("initialize server nodes success.");
- } catch (Exception e) {
- logger.error("init system znode failed",e);
- }
- }
-
- /**
- * get host ip, string format: masterParentPath/ip
- * @param path path
- * @return host ip, string format: masterParentPath/ip
- */
- protected String getHostByEventDataPath(String path) {
- if(StringUtils.isEmpty(path)){
- logger.error("empty path!");
- return "";
- }
- String[] pathArray = path.split(SINGLE_SLASH);
- if(pathArray.length < 1){
- logger.error("parse ip error: {}", path);
- return "";
- }
- return pathArray[pathArray.length - 1];
-
- }
-
- @Override
- public String toString() {
- return "AbstractZKClient{" +
- "zkClient=" + getZkClient() +
- ", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' +
- ", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\'' +
- ", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\'' +
- '}';
- }
-}
\ No newline at end of file
+ private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class);
+
+ /**
+ * remove dead server by host
+ *
+ * @param host host
+ * @param serverType serverType
+ */
+ public void removeDeadServerByHost(String host, String serverType) {
+ List deadServers = super.getChildrenKeys(getDeadZNodeParentPath());
+ for (String serverPath : deadServers) {
+ if (serverPath.startsWith(serverType + UNDERLINE + host)) {
+ String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
+ super.remove(server);
+ logger.info("{} server {} deleted from zk dead server path success", serverType, host);
+ }
+ }
+ }
+
+ /**
+ * opType(add): if find dead server , then add to zk deadServerPath
+ * opType(delete): delete path from zk
+ *
+ * @param zNode node path
+ * @param zkNodeType master or worker
+ * @param opType delete or add
+ */
+ public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) {
+ String host = getHostByEventDataPath(zNode);
+ String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
+
+ //check server restart, if restart , dead server path in zk should be delete
+ if (opType.equals(DELETE_ZK_OP)) {
+ removeDeadServerByHost(host, type);
+
+ } else if (opType.equals(ADD_ZK_OP)) {
+ String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
+ if (!super.isExisted(deadServerPath)) {
+ //add dead server info to zk dead server path : /dead-servers/
+
+ super.persist(deadServerPath, (type + UNDERLINE + host));
+
+ logger.info("{} server dead , and {} added to zk dead server path success",
+ zkNodeType, zNode);
+ }
+ }
+
+ }
+
+ /**
+ * get active master num
+ *
+ * @return active master number
+ */
+ public int getActiveMasterNum() {
+ List childrenList = new ArrayList<>();
+ try {
+ // read master node parent path from conf
+ if (super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))) {
+ childrenList = super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
+ }
+ } catch (Exception e) {
+ logger.error("getActiveMasterNum error", e);
+ }
+ return childrenList.size();
+ }
+
+ /**
+ * @return zookeeper quorum
+ */
+ public String getZookeeperQuorum() {
+ return getZookeeperConfig().getServerList();
+ }
+
+ /**
+ * get server list.
+ *
+ * @param zkNodeType zookeeper node type
+ * @return server list
+ */
+ public List getServersList(ZKNodeType zkNodeType) {
+ Map masterMap = getServerMaps(zkNodeType);
+ String parentPath = getZNodeParentPath(zkNodeType);
+
+ List masterServers = new ArrayList<>();
+ for (Map.Entry entry : masterMap.entrySet()) {
+ Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue());
+ if (masterServer == null) {
+ continue;
+ }
+ String key = entry.getKey();
+ masterServer.setZkDirectory(parentPath + "/" + key);
+ //set host and port
+ String[] hostAndPort = key.split(COLON);
+ String[] hosts = hostAndPort[0].split(DIVISION_STRING);
+ // fetch the last one
+ masterServer.setHost(hosts[hosts.length - 1]);
+ masterServer.setPort(Integer.parseInt(hostAndPort[1]));
+ masterServers.add(masterServer);
+ }
+ return masterServers;
+ }
+
+ /**
+ * get master server list map.
+ *
+ * @param zkNodeType zookeeper node type
+ * @return result : {host : resource info}
+ */
+ public Map getServerMaps(ZKNodeType zkNodeType) {
+
+ Map masterMap = new HashMap<>();
+ try {
+ String path = getZNodeParentPath(zkNodeType);
+ List serverList = super.getChildrenKeys(path);
+ if (zkNodeType == ZKNodeType.WORKER) {
+ List workerList = new ArrayList<>();
+ for (String group : serverList) {
+ List groupServers = super.getChildrenKeys(path + Constants.SLASH + group);
+ for (String groupServer : groupServers) {
+ workerList.add(group + Constants.SLASH + groupServer);
+ }
+ }
+ serverList = workerList;
+ }
+ for (String server : serverList) {
+ masterMap.putIfAbsent(server, super.get(path + Constants.SLASH + server));
+ }
+ } catch (Exception e) {
+ logger.error("get server list failed", e);
+ }
+
+ return masterMap;
+ }
+
+ /**
+ * check the zookeeper node already exists
+ *
+ * @param host host
+ * @param zkNodeType zookeeper node type
+ * @return true if exists
+ */
+ public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
+ String path = getZNodeParentPath(zkNodeType);
+ if (StringUtils.isEmpty(path)) {
+ logger.error("check zk node exists error, host:{}, zk node type:{}",
+ host, zkNodeType);
+ return false;
+ }
+ Map serverMaps = getServerMaps(zkNodeType);
+ for (String hostKey : serverMaps.keySet()) {
+ if (hostKey.contains(host)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @return get worker node parent path
+ */
+ protected String getWorkerZNodeParentPath() {
+ return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
+ }
+
+ /**
+ * @return get master node parent path
+ */
+ protected String getMasterZNodeParentPath() {
+ return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS;
+ }
+
+ /**
+ * @return get master lock path
+ */
+ public String getMasterLockPath() {
+ return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS;
+ }
+
+ /**
+ * @param zkNodeType zookeeper node type
+ * @return get zookeeper node parent path
+ */
+ public String getZNodeParentPath(ZKNodeType zkNodeType) {
+ String path = "";
+ switch (zkNodeType) {
+ case MASTER:
+ return getMasterZNodeParentPath();
+ case WORKER:
+ return getWorkerZNodeParentPath();
+ case DEAD_SERVER:
+ return getDeadZNodeParentPath();
+ default:
+ break;
+ }
+ return path;
+ }
+
+ /**
+ * @return get dead server node parent path
+ */
+ protected String getDeadZNodeParentPath() {
+ return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
+ }
+
+ /**
+ * @return get master start up lock path
+ */
+ public String getMasterStartUpLockPath() {
+ return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
+ }
+
+ /**
+ * @return get master failover lock path
+ */
+ public String getMasterFailoverLockPath() {
+ return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
+ }
+
+ /**
+ * @return get worker failover lock path
+ */
+ public String getWorkerFailoverLockPath() {
+ return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
+ }
+
+ /**
+ * release mutex
+ *
+ * @param mutex mutex
+ */
+ public void releaseMutex(InterProcessMutex mutex) {
+ if (mutex != null) {
+ try {
+ mutex.release();
+ } catch (Exception e) {
+ if ("instance must be started before calling this method".equals(e.getMessage())) {
+ logger.warn("lock release");
+ } else {
+ logger.error("lock release failed", e);
+ }
+
+ }
+ }
+ }
+
+ /**
+ * init system znode
+ */
+ protected void initSystemZNode() {
+ try {
+ persist(getMasterZNodeParentPath(), "");
+ persist(getWorkerZNodeParentPath(), "");
+ persist(getDeadZNodeParentPath(), "");
+
+ logger.info("initialize server nodes success.");
+ } catch (Exception e) {
+ logger.error("init system znode failed", e);
+ }
+ }
+
+ /**
+ * get host ip, string format: masterParentPath/ip
+ *
+ * @param path path
+ * @return host ip, string format: masterParentPath/ip
+ */
+ protected String getHostByEventDataPath(String path) {
+ if (StringUtils.isEmpty(path)) {
+ logger.error("empty path!");
+ return "";
+ }
+ String[] pathArray = path.split(SINGLE_SLASH);
+ if (pathArray.length < 1) {
+ logger.error("parse ip error: {}", path);
+ return "";
+ }
+ return pathArray[pathArray.length - 1];
+
+ }
+
+ @Override
+ public String toString() {
+ return "AbstractZKClient{"
+ + "zkClient=" + getZkClient()
+ + ", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\''
+ + ", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\''
+ + ", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\''
+ + '}';
+ }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java
index 5a04c5a23b..e25a22f031 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java
@@ -14,9 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.zk;
-import org.apache.commons.lang.StringUtils;
+import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
@@ -25,18 +30,16 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
-
/**
* Shared Curator zookeeper client
*/
@@ -49,7 +52,6 @@ public class CuratorZookeeperClient implements InitializingBean {
private CuratorFramework zkClient;
-
@Override
public void afterPropertiesSet() throws Exception {
this.zkClient = buildClient();
@@ -91,7 +93,7 @@ public class CuratorZookeeperClient implements InitializingBean {
zkClient.blockUntilConnected(30, TimeUnit.SECONDS);
} catch (final Exception ex) {
- throw new RuntimeException(ex);
+ throw new ServiceException(ex);
}
return zkClient;
}
@@ -123,4 +125,4 @@ public class CuratorZookeeperClient implements InitializingBean {
public CuratorFramework getZkClient() {
return zkClient;
}
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
index c7a53ebdc0..7ac23a3c4d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
@@ -14,19 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.zk;
import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
+
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* just speed experience version
@@ -51,10 +54,10 @@ public class ZKServer {
ZKServer zkServer;
if (args.length == 0) {
zkServer = new ZKServer();
- } else if (args.length == 1){
- zkServer = new ZKServer(Integer.valueOf(args[0]), "");
+ } else if (args.length == 1) {
+ zkServer = new ZKServer(Integer.parseInt(args[0]), "");
} else {
- zkServer = new ZKServer(Integer.valueOf(args[0]), args[1]);
+ zkServer = new ZKServer(Integer.parseInt(args[0]), args[1]);
}
zkServer.registerHook();
zkServer.start();
@@ -73,7 +76,7 @@ public class ZKServer {
}
private void registerHook() {
- /**
+ /*
* register hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
@@ -90,7 +93,7 @@ public class ZKServer {
}
}
- public boolean isStarted(){
+ public boolean isStarted() {
return isStarted.get();
}
@@ -119,19 +122,19 @@ public class ZKServer {
if (file.exists()) {
logger.warn("The path of zk server exists");
}
- logger.info("zk server starting, data dir path:{}" , zkDataDir);
- startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME,"60");
+ logger.info("zk server starting, data dir path:{}", zkDataDir);
+ startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME, "60");
}
/**
* Starts a local Zk instance
*
- * @param port The port to listen on
+ * @param port The port to listen on
* @param dataDirPath The path for the Zk data directory
- * @param tickTime zk tick time
- * @param maxClientCnxns zk max client connections
+ * @param tickTime zk tick time
+ * @param maxClientCnxns zk max client connections
*/
- private void startLocalZkServer(final int port, final String dataDirPath,final int tickTime,String maxClientCnxns) {
+ private void startLocalZkServer(final int port, final String dataDirPath, final int tickTime, String maxClientCnxns) {
if (isStarted.compareAndSet(false, true)) {
zooKeeperServerMain = new PublicZooKeeperServerMain();
logger.info("Zookeeper data path : {} ", dataDirPath);
@@ -144,8 +147,7 @@ public class ZKServer {
zooKeeperServerMain.initializeAndRun(args);
} catch (QuorumPeerConfig.ConfigException | IOException e) {
- logger.warn("Caught exception while starting ZK", e);
- throw new RuntimeException(e);
+ throw new ServiceException("Caught exception while starting ZK", e);
}
}
}
@@ -159,7 +161,7 @@ public class ZKServer {
logger.info("zk server stopped");
} catch (Exception e) {
- logger.error("Failed to stop ZK ",e);
+ logger.error("Failed to stop ZK ", e);
}
}
@@ -180,8 +182,7 @@ public class ZKServer {
org.apache.commons.io.FileUtils.deleteDirectory(new File(dataDir));
}
} catch (Exception e) {
- logger.warn("Caught exception while stopping ZK server", e);
- throw new RuntimeException(e);
+ throw new ServiceException("Caught exception while starting ZK", e);
}
}
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
index 6dfce79a3a..88c339b045 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
@@ -14,21 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.zk;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+
+import java.nio.charset.StandardCharsets;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
-import java.nio.charset.StandardCharsets;
-
@Component
public class ZookeeperCachedOperator extends ZookeeperOperator {
@@ -36,6 +39,7 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
private TreeCache treeCache;
+
/**
* register a unified listener of /${dsRoot},
*/
@@ -59,14 +63,16 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
treeCache.start();
} catch (Exception e) {
logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot());
- throw new RuntimeException(e);
+ throw new ServiceException(e);
}
}
//for sub class
- protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path){}
+ protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) {
+ // Used by sub class
+ }
- public String getFromCache(final String cachePath, final String key) {
+ public String getFromCache(final String key) {
ChildData resultInCache = treeCache.getCurrentData(key);
if (null != resultInCache) {
return null == resultInCache.getData() ? null : new String(resultInCache.getData(), StandardCharsets.UTF_8);
@@ -74,11 +80,11 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
return null;
}
- public TreeCache getTreeCache(final String cachePath) {
+ public TreeCache getTreeCache() {
return treeCache;
}
- public void addListener(TreeCacheListener listener){
+ public void addListener(TreeCacheListener listener) {
this.treeCache.getListenable().addListener(listener);
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
index e7b049f8bf..8a219837b7 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
@@ -14,13 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.zk;
-import org.apache.commons.lang.StringUtils;
+import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
-import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
@@ -29,18 +33,16 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
-
/**
* zk base operator
*/
@@ -64,19 +66,23 @@ public class ZookeeperOperator implements InitializingBean {
/**
* this method is for sub class,
*/
- protected void registerListener(){}
+ protected void registerListener() {
+ // Used by sub class
+ }
- protected void treeCacheStart(){}
+ protected void treeCacheStart() {
+ // Used by sub class
+ }
public void initStateLister() {
checkNotNull(zkClient);
zkClient.getConnectionStateListenable().addListener((client, newState) -> {
- if(newState == ConnectionState.LOST){
+ if (newState == ConnectionState.LOST) {
logger.error("connection lost from zookeeper");
- } else if(newState == ConnectionState.RECONNECTED){
+ } else if (newState == ConnectionState.RECONNECTED) {
logger.info("reconnected to zookeeper");
- } else if(newState == ConnectionState.SUSPENDED){
+ } else if (newState == ConnectionState.SUSPENDED) {
logger.warn("connection SUSPENDED to zookeeper");
}
});
@@ -85,7 +91,8 @@ public class ZookeeperOperator implements InitializingBean {
private CuratorFramework buildClient() {
logger.info("zookeeper registry center init, server lists is: {}.", zookeeperConfig.getServerList());
- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper quorum can't be null")))
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),
+ "zookeeper quorum can't be null")))
.retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs()));
//these has default value
@@ -114,7 +121,7 @@ public class ZookeeperOperator implements InitializingBean {
try {
zkClient.blockUntilConnected();
} catch (final Exception ex) {
- throw new RuntimeException(ex);
+ throw new ServiceException(ex);
}
return zkClient;
}
@@ -138,12 +145,12 @@ public class ZookeeperOperator implements InitializingBean {
throw new IllegalStateException(ex);
} catch (Exception ex) {
logger.error("getChildrenKeys key : {}", key, ex);
- throw new RuntimeException(ex);
+ throw new ServiceException(ex);
}
}
- public boolean hasChildren(final String key){
- Stat stat ;
+ public boolean hasChildren(final String key) {
+ Stat stat;
try {
stat = zkClient.checkExists().forPath(key);
return stat.getNumChildren() >= 1;
@@ -241,4 +248,4 @@ public class ZookeeperOperator implements InitializingBean {
public void close() {
CloseableUtils.closeQuietly(zkClient);
}
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index db83e71725..551c9bb09c 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -25,9 +25,12 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
@@ -335,4 +338,108 @@ public class ProcessServiceTest {
processService.recurseFindSubProcessId(parentId, ids);
}
+
+ @Test
+ public void testChangeJson() {
+
+ ProcessData oldProcessData = new ProcessData();
+ ConditionsParameters conditionsParameters = new ConditionsParameters();
+ ArrayList tasks = new ArrayList<>();
+ TaskNode taskNode = new TaskNode();
+ TaskNode taskNode11 = new TaskNode();
+ TaskNode taskNode111 = new TaskNode();
+ ArrayList successNode = new ArrayList<>();
+ ArrayList faildNode = new ArrayList<>();
+
+ taskNode.setName("bbb");
+ taskNode.setType("SHELL");
+ taskNode.setId("222");
+
+ taskNode11.setName("vvv");
+ taskNode11.setType("CONDITIONS");
+ taskNode11.setId("444");
+ successNode.add("bbb");
+ faildNode.add("ccc");
+
+ taskNode111.setName("ccc");
+ taskNode111.setType("SHELL");
+ taskNode111.setId("333");
+
+ conditionsParameters.setSuccessNode(successNode);
+ conditionsParameters.setFailedNode(faildNode);
+ taskNode11.setConditionResult(conditionsParameters.getConditionResult());
+ tasks.add(taskNode);
+ tasks.add(taskNode11);
+ tasks.add(taskNode111);
+ oldProcessData.setTasks(tasks);
+
+ ProcessData newProcessData = new ProcessData();
+ ConditionsParameters conditionsParameters2 = new ConditionsParameters();
+ TaskNode taskNode2 = new TaskNode();
+ TaskNode taskNode22 = new TaskNode();
+ TaskNode taskNode222 = new TaskNode();
+ ArrayList tasks2 = new ArrayList<>();
+ ArrayList successNode2 = new ArrayList<>();
+ ArrayList faildNode2 = new ArrayList<>();
+
+ taskNode2.setName("bbbchange");
+ taskNode2.setType("SHELL");
+ taskNode2.setId("222");
+
+ taskNode22.setName("vv");
+ taskNode22.setType("CONDITIONS");
+ taskNode22.setId("444");
+ successNode2.add("bbb");
+ faildNode2.add("ccc");
+
+ taskNode222.setName("ccc");
+ taskNode222.setType("SHELL");
+ taskNode222.setId("333");
+
+ conditionsParameters2.setSuccessNode(successNode2);
+ conditionsParameters2.setFailedNode(faildNode2);
+ taskNode22.setConditionResult(conditionsParameters2.getConditionResult());
+ tasks2.add(taskNode2);
+ tasks2.add(taskNode22);
+ tasks2.add(taskNode222);
+
+ newProcessData.setTasks(tasks2);
+
+ ProcessData exceptProcessData = new ProcessData();
+ ConditionsParameters conditionsParameters3 = new ConditionsParameters();
+ TaskNode taskNode3 = new TaskNode();
+ TaskNode taskNode33 = new TaskNode();
+ TaskNode taskNode333 = new TaskNode();
+ ArrayList tasks3 = new ArrayList<>();
+ ArrayList successNode3 = new ArrayList<>();
+ ArrayList faildNode3 = new ArrayList<>();
+
+ taskNode3.setName("bbbchange");
+ taskNode3.setType("SHELL");
+ taskNode3.setId("222");
+
+ taskNode33.setName("vv");
+ taskNode33.setType("CONDITIONS");
+ taskNode33.setId("444");
+ successNode3.add("bbbchange");
+ faildNode3.add("ccc");
+
+ taskNode333.setName("ccc");
+ taskNode333.setType("SHELL");
+ taskNode333.setId("333");
+
+ conditionsParameters3.setSuccessNode(successNode3);
+ conditionsParameters3.setFailedNode(faildNode3);
+ taskNode33.setConditionResult(conditionsParameters3.getConditionResult());
+ tasks3.add(taskNode3);
+ tasks3.add(taskNode33);
+ tasks3.add(taskNode333);
+ exceptProcessData.setTasks(tasks3);
+
+ String expect = JSONUtils.toJsonString(exceptProcessData);
+ String oldJson = JSONUtils.toJsonString(oldProcessData);
+
+ Assert.assertEquals(expect, processService.changeJson(newProcessData,oldJson));
+
+ }
}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/alert/AlertInfo.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/alert/AlertInfo.java
index c91428ce12..d6e54561e7 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/alert/AlertInfo.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/alert/AlertInfo.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.spi.alert;
+import java.util.Map;
+
/**
* AlertInfo
*/
@@ -25,18 +27,18 @@ public class AlertInfo {
/**
* all params this plugin need is in alertProps
*/
- private String alertParams;
+ private Map alertParams;
/**
* the alert content
*/
private AlertData alertData;
- public String getAlertParams() {
+ public Map getAlertParams() {
return alertParams;
}
- public void setAlertParams(String alertParams) {
+ public void setAlertParams(Map alertParams) {
this.alertParams = alertParams;
}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/alert/AlertResult.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/alert/AlertResult.java
index a327d09403..6ce5425f7f 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/alert/AlertResult.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/alert/AlertResult.java
@@ -38,4 +38,12 @@ public class AlertResult {
public void setMessage(String message) {
this.message = message;
}
+
+ public AlertResult(String status, String message) {
+ this.status = status;
+ this.message = message;
+ }
+
+ public AlertResult() {
+ }
}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/params/base/PluginParams.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/params/base/PluginParams.java
index 1fdf0e0792..34d60a26c4 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/params/base/PluginParams.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/params/base/PluginParams.java
@@ -165,6 +165,10 @@ public class PluginParams {
public List getValidateList() {
return validateList;
}
+
+ public void setValue(Object value) {
+ this.value = value;
+ }
}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/JSONUtils.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/JSONUtils.java
index 6542ef86a4..89d6e50ea5 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/JSONUtils.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/JSONUtils.java
@@ -58,7 +58,7 @@ public class JSONUtils {
/**
* json representation of object
*
- * @param object object
+ * @param object object
* @param feature feature
* @return object to json string
*/
@@ -81,9 +81,9 @@ public class JSONUtils {
* the fields of the specified object are generics, just the object itself should not be a
* generic type.
*
- * @param json the string from which the object is to be deserialized
+ * @param json the string from which the object is to be deserialized
* @param clazz the class of T
- * @param T
+ * @param T
* @return an object of type T from the string
* classOfT
*/
@@ -103,9 +103,9 @@ public class JSONUtils {
/**
* json to list
*
- * @param json json string
+ * @param json json string
* @param clazz class
- * @param T
+ * @param T
* @return list
*/
public static List toList(String json, Class clazz) {
@@ -153,4 +153,4 @@ public class JSONUtils {
throw new RuntimeException("Json deserialization exception.", e);
}
}
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-ui/package.json b/dolphinscheduler-ui/package.json
index 74558c856f..b325561aad 100644
--- a/dolphinscheduler-ui/package.json
+++ b/dolphinscheduler-ui/package.json
@@ -3,6 +3,8 @@
"version": "1.0.0",
"description": "A vue.js project",
"author": "DolphinScheduler",
+ "repository": "https://github.com/apache/incubator-dolphinscheduler",
+ "license": "Apache-2.0",
"scripts": {
"build": "npm run clean && cross-env NODE_ENV=production webpack --config ./build/webpack.config.prod.js",
"dev": "cross-env NODE_ENV=development webpack-dev-server --config ./build/webpack.config.dev.js",
@@ -39,6 +41,7 @@
"vuex-router-sync": "^5.0.0"
},
"devDependencies": {
+ "acorn": "^7.4.1",
"autoprefixer": "^9.1.0",
"babel-core": "^6.25.0",
"babel-helper-vue-jsx-merge-props": "^2.0.2",
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue
index a61d289d3d..c56d73f443 100755
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue
@@ -34,30 +34,30 @@
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/dependItemList.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/dependItemList.vue
index abadc2c36b..fd8a2b8955 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/dependItemList.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/dependItemList.vue
@@ -16,26 +16,21 @@
*/
-
-
-
-
+
+
+
-
-
-
+
+
-
-
-
+
+
-
-
-
+
+
-
-
-
+
+
@@ -55,6 +50,7 @@
+
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/dependent.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/dependent.vue
index 978142891e..220ec99f46 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/dependent.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/dependent.vue
@@ -97,7 +97,6 @@
$('body').find('.tooltip.fade.top.in').remove()
},
_onDeleteAll (i) {
- this.dependTaskList[this.index].dependItemList.splice(i, 1)
this.dependTaskList.map((item, i) => {
if (item.dependItemList.length === 0) {
this.dependTaskList.splice(i, 1)
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue
index 706edcda6a..5f58d29021 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue
@@ -30,16 +30,7 @@
{{$t('SQL Type')}}
-
-
-
-
-
- {{$t('TableMode')}}
- {{$t('Attachment')}}
-
+
@@ -51,10 +42,17 @@
type="input"
size="small"
v-model="title"
+ :disabled="isDetails"
:placeholder="$t('Please enter the title of email')">
+
+ *{{$t('Alarm group')}}
+
+
+
+
{{$t('SQL Parameter')}}
@@ -142,6 +140,7 @@
import mDatasource from './_source/datasource'
import mLocalParams from './_source/localParams'
import mStatementList from './_source/statementList'
+ import mWarningGroups from './_source/warningGroups'
import disabledState from '@/module/mixin/disabledState'
import codemirror from '@/conf/home/pages/resource/pages/file/pages/_source/codemirror'
@@ -167,8 +166,6 @@
sqlType: '0',
// Email title
title: '',
- // Form/attachment
- showType: ['TABLE'],
// Sql parameter
connParams: '',
// Pre statements
@@ -176,7 +173,8 @@
// Post statements
postStatements: [],
item: '',
- scriptBoxDialog: false
+ scriptBoxDialog: false,
+ groupId: null
}
},
mixins: [disabledState],
@@ -197,9 +195,6 @@
*/
_onSqlType (a) {
this.sqlType = a
- if (a === 0) {
- this.showType = ['TABLE']
- }
},
/**
* return udfs
@@ -245,16 +240,12 @@
if (!this.$refs.refDs._verifDatasource()) {
return false
}
- if (this.sqlType === 0 && !this.showType.length) {
- this.$message.warning(`${i18n.$t('One form or attachment must be selected')}`)
- return false
- }
- if (this.sqlType === 0 && !this.title) {
+ if (this.sqlType === '0' && !this.title) {
this.$message.warning(`${i18n.$t('Mail subject required')}`)
return false
}
- if (this.sqlType === 0 && !this.receivers.length) {
- this.$message.warning(`${i18n.$t('Recipient required')}`)
+ if (this.sqlType === '0' && (this.groupId === '' || this.groupId === null)) {
+ this.$message.warning(`${i18n.$t('Alarm group required')}`)
return false
}
// udfs Subcomponent verification Verification only if the data type is HIVE
@@ -287,18 +278,7 @@
udfs: this.udfs,
sqlType: this.sqlType,
title: this.title,
- showType: (() => {
- /**
- * Special processing return order TABLE,ATTACHMENT
- * Handling checkout sequence
- */
- let showType = this.showType
- if (showType.length === 2 && showType[0] === 'ATTACHMENT') {
- return [showType[1], showType[0]].join(',')
- } else {
- return showType.join(',')
- }
- })(),
+ groupId: this.groupId,
localParams: this.localParams,
connParams: this.connParams,
preStatements: this.preStatements,
@@ -347,14 +327,7 @@
udfs: this.udfs,
sqlType: this.sqlType,
title: this.title,
- showType: (() => {
- let showType = this.showType
- if (showType.length === 2 && showType[0] === 'ATTACHMENT') {
- return [showType[1], showType[0]].join(',')
- } else {
- return showType.join(',')
- }
- })(),
+ groupId: this.groupId,
localParams: this.localParams,
connParams: this.connParams,
preStatements: this.preStatements,
@@ -372,11 +345,9 @@
watch: {
// Listening to sqlType
sqlType (val) {
- if (val === 0) {
- this.showType = []
- }
if (val !== 0) {
this.title = ''
+ this.groupId = null
}
},
// Listening data source
@@ -403,14 +374,10 @@
this.sqlType = o.params.sqlType
this.connParams = o.params.connParams || ''
this.localParams = o.params.localParams || []
- if (o.params.showType === '') {
- this.showType = []
- } else {
- this.showType = o.params.showType.split(',') || []
- }
this.preStatements = o.params.preStatements || []
this.postStatements = o.params.postStatements || []
this.title = o.params.title || ''
+ this.groupId = o.params.groupId
}
},
mounted () {
@@ -436,14 +403,7 @@
udfs: this.udfs,
sqlType: this.sqlType,
title: this.title,
- showType: (() => {
- let showType = this.showType
- if (showType.length === 2 && showType[0] === 'ATTACHMENT') {
- return [showType[1], showType[0]].join(',')
- } else {
- return showType.join(',')
- }
- })(),
+ groupId: this.groupId,
localParams: this.localParams,
connParams: this.connParams,
preStatements: this.preStatements,
@@ -451,6 +411,6 @@
}
}
},
- components: { mListBox, mDatasource, mLocalParams, mUdfs, mSqlType, mStatementList, mScriptBox }
+ components: { mListBox, mDatasource, mLocalParams, mUdfs, mSqlType, mStatementList, mScriptBox, mWarningGroups }
}
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js
index 0037af68b6..226324d256 100755
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js
@@ -573,7 +573,7 @@ JSP.prototype.copyNodes = function ($id) {
JSP.prototype.handleEventScreen = function ({ item, is }) {
let screenOpen = true
if (is) {
- item.icon = 'el-icon-minus'
+ item.icon = 'el-icon-aim'
screenOpen = true
} else {
item.icon = 'el-icon-full-screen'
@@ -627,6 +627,7 @@ JSP.prototype.saveStore = function () {
tasks.push(tasksParam)
}
})
+
if (store.state.dag.connects.length === this.JspInstance.getConnections().length) {
_.map(store.state.dag.connects, u => {
connects.push({
@@ -658,6 +659,14 @@ JSP.prototype.saveStore = function () {
label: v._jsPlumb.overlays.label.canvas.innerText
})
})
+ } else if (store.state.dag.connects.length > this.JspInstance.getConnections().length) {
+ _.map(this.JspInstance.getConnections(), v => {
+ connects.push({
+ endPointSourceId: v.sourceId,
+ endPointTargetId: v.targetId,
+ label: v._jsPlumb.overlays.label.canvas.innerText
+ })
+ })
}
_.map(tasksAll(), v => {
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/startingParam/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/startingParam/index.vue
index bcc91524ff..de42199a1a 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/startingParam/index.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/startingParam/index.vue
@@ -26,7 +26,7 @@
{{$t('Process priority')}}:{{startupParam.processInstancePriority}}
{{$t('Worker group')}}:{{startupParam.workerGroup}}
{{$t('Notification strategy')}}:{{_rtWarningType(startupParam.warningType)}}
- {{$t('Notification group')}}:{{_rtNotifyGroupName(startupParam.warningGroupId)}}
+ {{$t('Alarm group')}}:{{_rtNotifyGroupName(startupParam.warningGroupId)}}
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/definitionDetails.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/definitionDetails.vue
index d9acb38453..65510bf69b 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/definitionDetails.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/definitionDetails.vue
@@ -42,7 +42,7 @@
methods: {
...mapMutations('dag', ['resetParams', 'setIsDetails']),
...mapActions('dag', ['getProcessList', 'getProjectList', 'getResourcesList', 'getProcessDetails', 'getResourcesListJar']),
- ...mapActions('security', ['getTenantList', 'getWorkerGroupsAll']),
+ ...mapActions('security', ['getTenantList', 'getWorkerGroupsAll', 'getAlarmGroupsAll']),
/**
* init
*/
@@ -64,6 +64,8 @@
this.getResourcesListJar(),
// get worker group list
this.getWorkerGroupsAll(),
+ // get alarm group list
+ this.getAlarmGroupsAll(),
this.getTenantList()
]).then((data) => {
let item = data[0]
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/index.vue
index d232e2dd86..81d129a47e 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/index.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/index.vue
@@ -41,7 +41,7 @@
methods: {
...mapMutations('dag', ['resetParams']),
...mapActions('dag', ['getProcessList', 'getProjectList', 'getResourcesList', 'getResourcesListJar', 'getResourcesListJar']),
- ...mapActions('security', ['getTenantList', 'getWorkerGroupsAll']),
+ ...mapActions('security', ['getTenantList', 'getWorkerGroupsAll', 'getAlarmGroupsAll']),
/**
* init
*/
@@ -63,6 +63,8 @@
this.getResourcesListJar(),
// get worker group list
this.getWorkerGroupsAll(),
+ // get alarm group list
+ this.getAlarmGroupsAll(),
this.getTenantList()
]).then((data) => {
this.isLoading = false
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/instanceDetails.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/instanceDetails.vue
index 0727d12eed..f9561bd52c 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/instanceDetails.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/instanceDetails.vue
@@ -44,7 +44,7 @@
methods: {
...mapMutations('dag', ['setIsDetails', 'resetParams']),
...mapActions('dag', ['getProcessList', 'getProjectList', 'getResourcesList', 'getInstancedetail', 'getResourcesListJar']),
- ...mapActions('security', ['getTenantList', 'getWorkerGroupsAll']),
+ ...mapActions('security', ['getTenantList', 'getWorkerGroupsAll', 'getAlarmGroupsAll']),
/**
* init
*/
@@ -66,6 +66,8 @@
this.getResourcesListJar(),
// get worker group list
this.getWorkerGroupsAll(),
+ // get alarm group list
+ this.getAlarmGroupsAll(),
this.getTenantList()
]).then((data) => {
let item = data[0]
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/index.vue
index 04683ebe4e..545c6c0b70 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/index.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/index.vue
@@ -22,6 +22,7 @@
{{$t('Create Datasource')}}
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperDirectories.vue b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperDirectories.vue
index ca4f6ad728..088063b01e 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperDirectories.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperDirectories.vue
@@ -15,13 +15,9 @@
* limitations under the License.
*/
-
-
- {{$t('zkDirectory')}}
-
-
+
-
+
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperList.vue b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperList.vue
index df987908f9..46cde99551 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperList.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperList.vue
@@ -24,7 +24,7 @@
-
+
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue
index 8c22adce8f..91bae2b41f 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue
@@ -94,6 +94,7 @@
methods: {
...mapActions('monitor', ['getWorkerData']),
_showZkDirectories (item) {
+ this.zkDirectories = []
item.zkDirectories.forEach(zkDirectory => {
this.zkDirectories.push({
zkDirectory: zkDirectory
@@ -104,7 +105,6 @@
},
watch: {},
created () {
-
},
mounted () {
this.isLoading = true
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/conditions/instance/processInstance.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/conditions/instance/processInstance.vue
index 37566e517e..2cfc5afcbe 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/conditions/instance/processInstance.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/conditions/instance/processInstance.vue
@@ -18,7 +18,7 @@
-
+
{{ scope.row.name }}
-
- {{scope.row.name}}
+
+ {{scope.row.name}}
@@ -84,7 +84,6 @@
-
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue
index e2ab711528..d6aeaf9cd9 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue
@@ -78,15 +78,16 @@
- {{$t('Notification group')}}
+ {{$t('Alarm group')}}
-
+
@@ -119,8 +120,7 @@
ref="refLocalParams"
@on-local-params="_onLocalParams"
:udp-list="udpList"
- :hide="false"
- :isStartProcess="true">
+ :hide="false">
@@ -263,7 +263,6 @@
})
},
_getGlobalParams () {
- this.setIsDetails(true)
this.store.dispatch('dag/getProcessDetails', this.startData.id).then(res => {
this.definitionGlobalParams = _.cloneDeep(this.store.state.dag.globalParams)
this.udpList = _.cloneDeep(this.store.state.dag.globalParams)
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue
index 526abfb16e..8d19505482 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue
@@ -35,7 +35,7 @@
-
{{$t('Execute time')}}
+
{{$t('Execute time')}}
{{$t('Timing')}}
@@ -115,15 +115,16 @@
- {{$t('Notification group')}}
+ {{$t('Alarm group')}}
-
+
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/timing/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/timing/index.vue
index 0540a32851..d38434510f 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/timing/index.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/timing/index.vue
@@ -18,7 +18,9 @@
-
+
+
+
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/taskStatusCount.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/taskStatusCount.vue
index 9fd45418f9..9ebfcc6288 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/taskStatusCount.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/index/_source/taskStatusCount.vue
@@ -139,9 +139,3 @@
components: { mNoData }
}
-
-
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue
index 980a755120..01b7574689 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue
@@ -25,7 +25,7 @@
{{ scope.row.name }}
- {{scope.row.name}}
+ {{ scope.row.name }}
@@ -78,13 +78,12 @@
-
+
-
+
-
-
+
@@ -438,22 +437,9 @@
})
this._arrDelChange()
},
- // _arrDelChange (v) {
- // let arr = []
- // this.list.forEach((item)=>{
- // if (item.isCheck) {
- // arr.push(item.id)
- // }
- // })
- // this.strDelete = _.join(arr, ',')
- // if (v === false) {
- // this.checkAll = false
- // }
- // },
_arrDelChange (v) {
let arr = []
arr = _.map(v, 'id')
- console.log(arr)
this.strDelete = _.join(arr, ',')
},
_batchDelete () {
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/index.vue
index 162df69d7e..c07eacbae9 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/index.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/index.vue
@@ -18,7 +18,7 @@
-
+
@@ -95,6 +95,7 @@
* Query
*/
_onQuery (o) {
+ this.searchParams.pageNo = 1
this.searchParams = _.assign(this.searchParams, o)
setUrlParams(this.searchParams)
this._debounceGET()
@@ -222,4 +223,9 @@
}
}
}
+ @media screen and (max-width: 1246px) {
+ .searchNav {
+ margin-bottom: 30px;
+ }
+ }
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/_source/createProject.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/_source/createProject.vue
index db37d8b90d..8e9c5afa32 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/_source/createProject.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/_source/createProject.vue
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-