From 362aa2e8e7d29c9a6549d86458bca09a0619e768 Mon Sep 17 00:00:00 2001
From: Zed
Date: Thu, 18 Apr 2019 16:04:21 +0800
Subject: [PATCH] =?UTF-8?q?DEC-7376=20=E5=AE=9A=E6=97=B6=E8=B0=83=E5=BA=A6?=
=?UTF-8?q?=E4=BB=BB=E5=8A=A1=E8=A1=A8=E4=B8=AD=E5=AD=98=E5=9C=A8=E8=84=8F?=
=?UTF-8?q?=E6=95=B0=E6=8D=AE=E7=9A=84=E6=97=B6=E5=80=99=EF=BC=8C=E4=B8=8D?=
=?UTF-8?q?=E8=83=BD=E5=BD=B1=E5=93=8D=E6=95=B4=E4=B8=AAscheduler?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../impl/jdbcjobstore/JobStoreSupport.java | 1611 ++++++++---------
1 file changed, 805 insertions(+), 806 deletions(-)
diff --git a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java
index 7bbef3c4d..ab378483a 100644
--- a/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java
+++ b/fine-quartz/src/com/fr/third/v2/org/quartz/impl/jdbcjobstore/JobStoreSupport.java
@@ -1,54 +1,37 @@
-/*
- * Copyright 2001-2009 Terracotta, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy
- * of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
+/*
+ * Copyright 2001-2009 Terracotta, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
+ * of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
* under the License.
- *
+ *
*/
package com.fr.third.v2.org.quartz.impl.jdbcjobstore;
-import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Proxy;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import com.fr.third.v2.org.quartz.Calendar;
import com.fr.third.v2.org.quartz.Job;
+import com.fr.third.v2.org.quartz.JobDataMap;
import com.fr.third.v2.org.quartz.JobDetail;
+import com.fr.third.v2.org.quartz.JobKey;
+import com.fr.third.v2.org.quartz.JobPersistenceException;
import com.fr.third.v2.org.quartz.ObjectAlreadyExistsException;
import com.fr.third.v2.org.quartz.Scheduler;
import com.fr.third.v2.org.quartz.SchedulerConfigException;
-import com.fr.third.v2.org.quartz.TriggerKey;
-import com.fr.third.v2.org.quartz.spi.OperableTrigger;
-import com.fr.third.v2.org.quartz.spi.ThreadExecutor;
-import com.fr.third.v2.org.quartz.spi.TriggerFiredResult;
-import com.fr.third.v2.org.quartz.JobDataMap;
-import com.fr.third.v2.org.quartz.JobKey;
-import com.fr.third.v2.org.quartz.JobPersistenceException;
import com.fr.third.v2.org.quartz.SchedulerException;
import com.fr.third.v2.org.quartz.SimpleTrigger;
import com.fr.third.v2.org.quartz.Trigger;
import com.fr.third.v2.org.quartz.Trigger.CompletedExecutionInstruction;
import com.fr.third.v2.org.quartz.Trigger.TriggerState;
+import com.fr.third.v2.org.quartz.TriggerKey;
import com.fr.third.v2.org.quartz.impl.DefaultThreadExecutor;
import com.fr.third.v2.org.quartz.impl.matchers.GroupMatcher;
import com.fr.third.v2.org.quartz.impl.matchers.StringMatcher;
@@ -56,18 +39,35 @@ import com.fr.third.v2.org.quartz.impl.matchers.StringMatcher.StringOperatorName
import com.fr.third.v2.org.quartz.impl.triggers.SimpleTriggerImpl;
import com.fr.third.v2.org.quartz.spi.ClassLoadHelper;
import com.fr.third.v2.org.quartz.spi.JobStore;
+import com.fr.third.v2.org.quartz.spi.OperableTrigger;
import com.fr.third.v2.org.quartz.spi.SchedulerSignaler;
+import com.fr.third.v2.org.quartz.spi.ThreadExecutor;
import com.fr.third.v2.org.quartz.spi.TriggerFiredBundle;
+import com.fr.third.v2.org.quartz.spi.TriggerFiredResult;
import com.fr.third.v2.org.quartz.utils.DBConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
/**
*
* Contains base functionality for JDBC-based JobStore implementations.
*
- *
+ *
* @author Jeffrey Wescott
* @author James House
*/
@@ -75,9 +75,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- *
+ *
* Constants.
- *
+ *
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
@@ -87,9 +87,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- *
+ *
* Data members.
- *
+ *
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
@@ -102,11 +102,11 @@ public abstract class JobStoreSupport implements JobStore, Constants {
protected String instanceId;
protected String instanceName;
-
+
protected String delegateClassName;
protected String delegateInitString;
-
+
protected Class extends DriverDelegate> delegateClass = StdJDBCDelegate.class;
protected HashMap calendarCache = new HashMap();
@@ -120,7 +120,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
private boolean isClustered = false;
private boolean useDBLocks = false;
-
+
private boolean lockOnInsert = true;
private Semaphore lockHandler = null; // set in initialize() method...
@@ -138,32 +138,32 @@ public abstract class JobStoreSupport implements JobStore, Constants {
private SchedulerSignaler schedSignaler;
protected int maxToRecoverAtATime = 20;
-
+
private boolean setTxIsolationLevelSequential = false;
-
+
private boolean acquireTriggersWithinLock = false;
-
+
private long dbRetryInterval = 15000L; // 15 secs
-
+
private boolean makeThreadsDaemons = false;
private boolean threadsInheritInitializersClassLoadContext = false;
private ClassLoader initializersLoader = null;
-
+
private boolean doubleCheckLockMisfireHandler = true;
-
+
private final Logger log = LoggerFactory.getLogger(getClass());
-
+
private ThreadExecutor threadExecutor = new DefaultThreadExecutor();
-
+
private volatile boolean schedulerRunning = false;
private volatile boolean shutdown = false;
-
+
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- *
+ *
* Interface.
- *
+ *
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
@@ -261,15 +261,15 @@ public abstract class JobStoreSupport implements JobStore, Constants {
public void setThreadPoolSize(final int poolSize) {
//
}
-
+
public void setThreadExecutor(ThreadExecutor threadExecutor) {
this.threadExecutor = threadExecutor;
}
-
+
public ThreadExecutor getThreadExecutor() {
return threadExecutor;
}
-
+
/**
* Get the instance name of the Scheduler (must be unique within this server instance).
@@ -354,13 +354,14 @@ public abstract class JobStoreSupport implements JobStore, Constants {
public long getDbRetryInterval() {
return dbRetryInterval;
}
+
/**
* @param dbRetryInterval The dbRetryInterval to set.
*/
public void setDbRetryInterval(long dbRetryInterval) {
this.dbRetryInterval = dbRetryInterval;
}
-
+
/**
*
* Set whether this instance should use database-based thread
@@ -384,28 +385,28 @@ public abstract class JobStoreSupport implements JobStore, Constants {
public boolean isLockOnInsert() {
return lockOnInsert;
}
-
+
/**
- * Whether or not to obtain locks when inserting new jobs/triggers.
+ * Whether or not to obtain locks when inserting new jobs/triggers.
*
- * Defaults to true
, which is safest. Some databases (such as
+ * Defaults to true
, which is safest. Some databases (such as
* MS SQLServer) seem to require this to avoid deadlocks under high load,
* while others seem to do fine without. Settings this to false means
* isolation guarantees between job scheduling and trigger acquisition are
* entirely enforced by the database. Depending on the database and it's
* configuration this may cause unusual scheduling behaviors.
- *
- *
Setting this property to false
will provide a
- * significant performance increase during the addition of new jobs
+ *
+ *
Setting this property to false
will provide a
+ * significant performance increase during the addition of new jobs
* and triggers.
- *
+ *
* @param lockOnInsert whether locking should be used when inserting new jobs/triggers
*/
@SuppressWarnings("UnusedDeclaration") /* called reflectively */
public void setLockOnInsert(boolean lockOnInsert) {
this.lockOnInsert = lockOnInsert;
}
-
+
public long getMisfireThreshold() {
return misfireThreshold;
}
@@ -414,7 +415,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* The the number of milliseconds by which a trigger must have missed its
* next-fire-time, in order for it to be considered "misfired" and thus
* have its misfire instruction applied.
- *
+ *
* @param misfireThreshold the misfire threshold to use, in millis
*/
@SuppressWarnings("UnusedDeclaration") /* called reflectively */
@@ -434,7 +435,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Don't call set autocommit(false) on connections obtained from the
* DataSource. This can be helpful in a few situations, such as if you
* have a driver that complains if it is called when it is already off.
- *
+ *
* @param b whether or not autocommit should be set to false on db connections
*/
@SuppressWarnings("UnusedDeclaration") /* called reflectively */
@@ -448,7 +449,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
/**
* Set the transaction isolation level of DB connections to sequential.
- *
+ *
* @param b whether isolation level should be set to sequential.
*/
@SuppressWarnings("UnusedDeclaration") /* called reflectively */
@@ -458,11 +459,11 @@ public abstract class JobStoreSupport implements JobStore, Constants {
/**
* Whether or not the query and update to acquire a Trigger for firing
- * should be performed after obtaining an explicit DB lock (to avoid
+ * should be performed after obtaining an explicit DB lock (to avoid
* possible race conditions on the trigger's db row). This is the
* behavior prior to Quartz 1.6.3, but is considered unnecessary for most
- * databases (due to the nature of the SQL update that is performed),
- * and therefore a superfluous performance hit.
+ * databases (due to the nature of the SQL update that is performed),
+ * and therefore a superfluous performance hit.
*/
public boolean isAcquireTriggersWithinLock() {
return acquireTriggersWithinLock;
@@ -472,8 +473,8 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Whether or not the query and update to acquire a Trigger for firing
* should be performed after obtaining an explicit DB lock. This is the
* behavior prior to Quartz 1.6.3, but is considered unnecessary for most
- * databases, and therefore a superfluous performance hit.
- *
+ * databases, and therefore a superfluous performance hit.
+ *
* However, if batch acquisition is used, it is important for this behavior
* to be used for all dbs.
*/
@@ -482,19 +483,18 @@ public abstract class JobStoreSupport implements JobStore, Constants {
this.acquireTriggersWithinLock = acquireTriggersWithinLock;
}
-
+
/**
*
* Set the JDBC driver delegate class.
*
- *
- * @param delegateClassName
- * the delegate class name
+ *
+ * @param delegateClassName the delegate class name
*/
@SuppressWarnings("UnusedDeclaration") /* called reflectively */
public void setDriverDelegateClass(String delegateClassName)
- throws InvalidConfigurationException {
- synchronized(this) {
+ throws InvalidConfigurationException {
+ synchronized (this) {
this.delegateClassName = delegateClassName;
}
}
@@ -503,7 +503,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*
* Get the JDBC driver delegate class name.
*
- *
+ *
* @return the delegate class name
*/
public String getDriverDelegateClass() {
@@ -514,13 +514,12 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*
* Set the JDBC driver delegate's initialization string.
*
- *
- * @param delegateInitString
- * the delegate init string
+ *
+ * @param delegateInitString the delegate init string
*/
@SuppressWarnings("UnusedDeclaration") /* called reflectively */
public void setDriverDelegateInitString(String delegateInitString)
- throws InvalidConfigurationException {
+ throws InvalidConfigurationException {
this.delegateInitString = delegateInitString;
}
@@ -528,7 +527,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*
* Get the JDBC driver delegate's initialization string.
*
- *
+ *
* @return the delegate init string
*/
public String getDriverDelegateInitString() {
@@ -544,7 +543,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* set the SQL statement to use to select and lock a row in the "locks"
* table.
*
- *
+ *
* @see StdRowLockSemaphore
*/
public void setSelectWithLockSQL(String string) {
@@ -557,9 +556,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
/**
* Get whether the threads spawned by this JobStore should be
- * marked as daemon. Possible threads include the MisfireHandler
+ * marked as daemon. Possible threads include the MisfireHandler
* and the ClusterManager
.
- *
+ *
* @see Thread#setDaemon(boolean)
*/
public boolean getMakeThreadsDaemons() {
@@ -568,7 +567,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
/**
* Set whether the threads spawned by this JobStore should be
- * marked as daemon. Possible threads include the MisfireHandler
+ * marked as daemon. Possible threads include the MisfireHandler
* and the ClusterManager
.
*
* @see Thread#setDaemon(boolean)
@@ -577,7 +576,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
public void setMakeThreadsDaemons(boolean makeThreadsDaemons) {
this.makeThreadsDaemons = makeThreadsDaemons;
}
-
+
/**
* Get whether to set the class load context of spawned threads to that
* of the initializing thread.
@@ -597,7 +596,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
/**
* Get whether to check to see if there are Triggers that have misfired
- * before actually acquiring the lock to recover them. This should be
+ * before actually acquiring the lock to recover them. This should be
* set to false if the majority of the time, there are are misfired
* Triggers.
*/
@@ -607,7 +606,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
/**
* Set whether to check to see if there are Triggers that have misfired
- * before actually acquiring the lock to recover them. This should be
+ * before actually acquiring the lock to recover them. This should be
* set to false if the majority of the time, there are are misfired
* Triggers.
*/
@@ -616,7 +615,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
boolean doubleCheckLockMisfireHandler) {
this.doubleCheckLockMisfireHandler = doubleCheckLockMisfireHandler;
}
-
+
//---------------------------------------------------------------------------
// interface methods
//---------------------------------------------------------------------------
@@ -632,33 +631,33 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*
*/
public void initialize(ClassLoadHelper loadHelper,
- SchedulerSignaler signaler) throws SchedulerConfigException {
+ SchedulerSignaler signaler) throws SchedulerConfigException {
- if (dsName == null) {
- throw new SchedulerConfigException("DataSource name not set.");
+ if (dsName == null) {
+ throw new SchedulerConfigException("DataSource name not set.");
}
classLoadHelper = loadHelper;
- if(isThreadsInheritInitializersClassLoadContext()) {
+ if (isThreadsInheritInitializersClassLoadContext()) {
log.info("JDBCJobStore threads will inherit ContextClassLoader of thread: " + Thread.currentThread().getName());
initializersLoader = Thread.currentThread().getContextClassLoader();
}
-
+
this.schedSignaler = signaler;
// If the user hasn't specified an explicit lock handler, then
// choose one based on CMT/Clustered/UseDBLocks.
if (getLockHandler() == null) {
-
+
// If the user hasn't specified an explicit lock handler,
// then we *must* use DB locks with clustering
if (isClustered()) {
setUseDBLocks(true);
}
-
+
if (getUseDBLocks()) {
- if(getDriverDelegateClass() != null && getDriverDelegateClass().equals(MSSQLDelegate.class.getName())) {
- if(getSelectWithLockSQL() == null) {
+ if (getDriverDelegateClass() != null && getDriverDelegateClass().equals(MSSQLDelegate.class.getName())) {
+ if (getSelectWithLockSQL() == null) {
String msSqlDflt = "SELECT * FROM {0}LOCKS WITH (UPDLOCK,ROWLOCK) WHERE " + COL_SCHEDULER_NAME + " = {1} AND LOCK_NAME = ?";
getLog().info("Detected usage of MSSQLDelegate class - defaulting 'selectWithLockSQL' to '" + msSqlDflt + "'.");
setSelectWithLockSQL(msSqlDflt);
@@ -668,13 +667,13 @@ public abstract class JobStoreSupport implements JobStore, Constants {
setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL()));
} else {
getLog().info(
- "Using thread monitor-based data access locking (synchronization).");
+ "Using thread monitor-based data access locking (synchronization).");
setLockHandler(new SimpleSemaphore());
}
}
}
-
+
/**
* @see JobStore#schedulerStarted()
*/
@@ -682,7 +681,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
if (isClustered()) {
clusterManagementThread = new ClusterManager();
- if(initializersLoader != null)
+ if (initializersLoader != null)
clusterManagementThread.setContextClassLoader(initializersLoader);
clusterManagementThread.initialize();
} else {
@@ -695,22 +694,22 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
misfireHandler = new MisfireHandler();
- if(initializersLoader != null)
+ if (initializersLoader != null)
misfireHandler.setContextClassLoader(initializersLoader);
misfireHandler.initialize();
schedulerRunning = true;
-
+
getLog().debug("JobStore background threads started (as scheduler was started).");
}
-
+
public void schedulerPaused() {
schedulerRunning = false;
}
-
+
public void schedulerResumed() {
schedulerRunning = true;
}
-
+
/**
*
* Called by the QuartzScheduler to inform the JobStore
that
@@ -720,7 +719,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*/
public void shutdown() {
shutdown = true;
-
+
if (misfireHandler != null) {
misfireHandler.shutdown();
try {
@@ -741,8 +740,8 @@ public abstract class JobStoreSupport implements JobStore, Constants {
DBConnectionManager.getInstance().shutdown(getDataSource());
} catch (SQLException sqle) {
getLog().warn("Database connection shutdown unsuccessful.", sqle);
- }
-
+ }
+
getLog().debug("JobStore background threads shutdown.");
}
@@ -755,20 +754,20 @@ public abstract class JobStoreSupport implements JobStore, Constants {
//---------------------------------------------------------------------------
protected abstract Connection getNonManagedTXConnection()
- throws JobPersistenceException;
+ throws JobPersistenceException;
/**
- * Wrap the given Connection
in a Proxy such that attributes
- * that might be set will be restored before the connection is closed
+ * Wrap the given Connection
in a Proxy such that attributes
+ * that might be set will be restored before the connection is closed
* (and potentially restored to a pool).
*/
protected Connection getAttributeRestoringConnection(Connection conn) {
- return (Connection)Proxy.newProxyInstance(
+ return (Connection) Proxy.newProxyInstance(
Thread.currentThread().getContextClassLoader(),
- new Class[] { Connection.class },
+ new Class[]{Connection.class},
new AttributeRestoringConnectionInvocationHandler(conn));
}
-
+
protected Connection getConnection() throws JobPersistenceException {
Connection conn;
try {
@@ -777,17 +776,17 @@ public abstract class JobStoreSupport implements JobStore, Constants {
} catch (SQLException sqle) {
throw new JobPersistenceException(
"Failed to obtain DB connection from data source '"
- + getDataSource() + "': " + sqle.toString(), sqle);
+ + getDataSource() + "': " + sqle.toString(), sqle);
} catch (Throwable e) {
throw new JobPersistenceException(
"Failed to obtain DB connection from data source '"
- + getDataSource() + "': " + e.toString(), e);
+ + getDataSource() + "': " + e.toString(), e);
}
- if (conn == null) {
+ if (conn == null) {
throw new JobPersistenceException(
- "Could not get connection from DataSource '"
- + getDataSource() + "'");
+ "Could not get connection from DataSource '"
+ + getDataSource() + "'");
}
// Protect connection attributes we might change.
@@ -799,18 +798,21 @@ public abstract class JobStoreSupport implements JobStore, Constants {
conn.setAutoCommit(false);
}
- if(isTxIsolationLevelSerializable()) {
+ if (isTxIsolationLevelSerializable()) {
conn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
}
} catch (SQLException sqle) {
getLog().warn("Failed to override connection auto commit/transaction isolation.", sqle);
} catch (Throwable e) {
- try { conn.close(); } catch(Throwable ignored) {}
-
+ try {
+ conn.close();
+ } catch (Throwable ignored) {
+ }
+
throw new JobPersistenceException(
- "Failure setting up connection.", e);
+ "Failure setting up connection.", e);
}
-
+
return conn;
}
@@ -827,27 +829,26 @@ public abstract class JobStoreSupport implements JobStore, Constants {
/**
* Recover any failed or misfired jobs and clean up the data store as
* appropriate.
- *
+ *
* @throws JobPersistenceException if jobs could not be recovered
*/
protected void recoverJobs() throws JobPersistenceException {
executeInNonManagedTXLock(
- LOCK_TRIGGER_ACCESS,
- new VoidTransactionCallback() {
- public void executeVoid(Connection conn) throws JobPersistenceException {
- recoverJobs(conn);
- }
- }, null);
+ LOCK_TRIGGER_ACCESS,
+ new VoidTransactionCallback() {
+ public void executeVoid(Connection conn) throws JobPersistenceException {
+ recoverJobs(conn);
+ }
+ }, null);
}
-
+
/**
*
* Will recover any failed or misfired jobs and clean up the data store as
* appropriate.
*
- *
- * @throws JobPersistenceException
- * if jobs could not be recovered
+ *
+ * @throws JobPersistenceException if jobs could not be recovered
*/
protected void recoverJobs(Connection conn) throws JobPersistenceException {
try {
@@ -856,15 +857,15 @@ public abstract class JobStoreSupport implements JobStore, Constants {
STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED);
rows += getDelegate().updateTriggerStatesFromOtherStates(conn,
- STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED);
-
+ STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED);
+
getLog().info(
"Freed " + rows
+ " triggers from 'acquired' / 'blocked' state.");
// clean up misfired jobs
recoverMisfiredJobs(conn, true);
-
+
// recover jobs marked for recovery that were not fully executed
List recoveringJobTriggers = getDelegate()
.selectTriggersForRecoveringJobs(conn);
@@ -874,23 +875,27 @@ public abstract class JobStoreSupport implements JobStore, Constants {
+ recoveringJobTriggers.size()
+ " jobs that were in-progress at the time of the last shut-down.");
- for (OperableTrigger recoveringJobTrigger: recoveringJobTriggers) {
- if (jobExists(conn, recoveringJobTrigger.getJobKey())) {
- recoveringJobTrigger.computeFirstFireTime(null);
- storeTrigger(conn, recoveringJobTrigger, null, false,
- STATE_WAITING, false, true);
+ for (OperableTrigger recoveringJobTrigger : recoveringJobTriggers) {
+ try {
+ if (jobExists(conn, recoveringJobTrigger.getJobKey())) {
+ recoveringJobTrigger.computeFirstFireTime(null);
+ storeTrigger(conn, recoveringJobTrigger, null, false,
+ STATE_WAITING, false, true);
+ }
+ } catch (JobPersistenceException e) {
+ getLog().error(e.getMessage());
}
}
getLog().info("Recovery complete.");
// remove lingering 'complete' triggers...
List cts = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
- for(TriggerKey ct: cts) {
+ for (TriggerKey ct : cts) {
removeTrigger(conn, ct);
}
getLog().info(
- "Removed " + cts.size() + " 'complete' triggers.");
-
+ "Removed " + cts.size() + " 'complete' triggers.");
+
// clean up any fired trigger entries
int n = getDelegate().deleteFiredTriggers(conn);
getLog().info("Removed " + n + " stale fired job entries.");
@@ -917,67 +922,69 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*/
protected static class RecoverMisfiredJobsResult {
public static final RecoverMisfiredJobsResult NO_OP =
- new RecoverMisfiredJobsResult(false, 0, Long.MAX_VALUE);
-
+ new RecoverMisfiredJobsResult(false, 0, Long.MAX_VALUE);
+
private boolean _hasMoreMisfiredTriggers;
private int _processedMisfiredTriggerCount;
private long _earliestNewTime;
-
+
public RecoverMisfiredJobsResult(
- boolean hasMoreMisfiredTriggers, int processedMisfiredTriggerCount, long earliestNewTime) {
+ boolean hasMoreMisfiredTriggers, int processedMisfiredTriggerCount, long earliestNewTime) {
_hasMoreMisfiredTriggers = hasMoreMisfiredTriggers;
_processedMisfiredTriggerCount = processedMisfiredTriggerCount;
_earliestNewTime = earliestNewTime;
}
-
+
public boolean hasMoreMisfiredTriggers() {
return _hasMoreMisfiredTriggers;
}
+
public int getProcessedMisfiredTriggerCount() {
return _processedMisfiredTriggerCount;
- }
+ }
+
public long getEarliestNewTime() {
return _earliestNewTime;
- }
+ }
}
-
+
protected RecoverMisfiredJobsResult recoverMisfiredJobs(
- Connection conn, boolean recovering)
- throws JobPersistenceException, SQLException {
+ Connection conn, boolean recovering)
+ throws JobPersistenceException, SQLException {
// If recovering, we want to handle all of the misfired
// triggers right away.
- int maxMisfiresToHandleAtATime =
- (recovering) ? -1 : getMaxMisfiresToHandleAtATime();
-
+ int maxMisfiresToHandleAtATime =
+ (recovering) ? -1 : getMaxMisfiresToHandleAtATime();
+
List misfiredTriggers = new LinkedList();
long earliestNewTime = Long.MAX_VALUE;
// We must still look for the MISFIRED state in case triggers were left
// in this state when upgrading to this version that does not support it.
boolean hasMoreMisfiredTriggers =
- getDelegate().hasMisfiredTriggersInState(
- conn, STATE_WAITING, getMisfireTime(),
- maxMisfiresToHandleAtATime, misfiredTriggers);
+ getDelegate().hasMisfiredTriggersInState(
+ conn, STATE_WAITING, getMisfireTime(),
+ maxMisfiresToHandleAtATime, misfiredTriggers);
if (hasMoreMisfiredTriggers) {
getLog().info(
- "Handling the first " + misfiredTriggers.size() +
- " triggers that missed their scheduled fire-time. " +
- "More misfired triggers remain to be processed.");
- } else if (misfiredTriggers.size() > 0) {
+ "Handling the first " + misfiredTriggers.size() +
+ " triggers that missed their scheduled fire-time. " +
+ "More misfired triggers remain to be processed.");
+ } else if (misfiredTriggers.size() > 0) {
getLog().info(
- "Handling " + misfiredTriggers.size() +
- " trigger(s) that missed their scheduled fire-time.");
+ "Handling " + misfiredTriggers.size() +
+ " trigger(s) that missed their scheduled fire-time.");
} else {
getLog().debug(
- "Found 0 triggers that missed their scheduled fire-time.");
- return RecoverMisfiredJobsResult.NO_OP;
+ "Found 0 triggers that missed their scheduled fire-time.");
+ return RecoverMisfiredJobsResult.NO_OP;
}
- for (TriggerKey triggerKey: misfiredTriggers) {
-
- OperableTrigger trig =
- retrieveTrigger(conn, triggerKey);
+ for (TriggerKey triggerKey : misfiredTriggers) {
+
+ OperableTrigger trig =
+ retrieveTrigger(conn, triggerKey);
if (trig == null) {
continue;
@@ -985,7 +992,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
doUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering);
- if(trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime)
+ if (trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime)
earliestNewTime = trig.getNextFireTime().getTime();
}
@@ -994,8 +1001,8 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
protected boolean updateMisfiredTrigger(Connection conn,
- TriggerKey triggerKey, String newStateIfNotComplete, boolean forceState)
- throws JobPersistenceException {
+ TriggerKey triggerKey, String newStateIfNotComplete, boolean forceState)
+ throws JobPersistenceException {
try {
OperableTrigger trig = retrieveTrigger(conn, triggerKey);
@@ -1031,7 +1038,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
if (trig.getNextFireTime() == null) {
storeTrigger(conn, trig,
- null, true, STATE_COMPLETE, forceState, recovering);
+ null, true, STATE_COMPLETE, forceState, recovering);
schedSignaler.notifySchedulerListenersFinalized(trig);
} else {
storeTrigger(conn, trig, null, true, newStateIfNotComplete,
@@ -1043,55 +1050,49 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*
* Store the given {@link JobDetail}
and {@link Trigger}
.
*
- *
- * @param newJob
- * The JobDetail
to be stored.
- * @param newTrigger
- * The Trigger
to be stored.
- * @throws ObjectAlreadyExistsException
- * if a Job
with the same name/group already
- * exists.
+ *
+ * @param newJob The JobDetail
to be stored.
+ * @param newTrigger The Trigger
to be stored.
+ * @throws ObjectAlreadyExistsException if a Job
with the same name/group already
+ * exists.
*/
public void storeJobAndTrigger(final JobDetail newJob,
- final OperableTrigger newTrigger)
- throws JobPersistenceException {
+ final OperableTrigger newTrigger)
+ throws JobPersistenceException {
executeInLock(
- (isLockOnInsert()) ? LOCK_TRIGGER_ACCESS : null,
- new VoidTransactionCallback() {
- public void executeVoid(Connection conn) throws JobPersistenceException {
- storeJob(conn, newJob, false);
- storeTrigger(conn, newTrigger, newJob, false,
- STATE_WAITING, false, false);
- }
- });
+ (isLockOnInsert()) ? LOCK_TRIGGER_ACCESS : null,
+ new VoidTransactionCallback() {
+ public void executeVoid(Connection conn) throws JobPersistenceException {
+ storeJob(conn, newJob, false);
+ storeTrigger(conn, newTrigger, newJob, false,
+ STATE_WAITING, false, false);
+ }
+ });
}
-
+
/**
*
* Store the given {@link JobDetail}
.
*
- *
- * @param newJob
- * The JobDetail
to be stored.
- * @param replaceExisting
- * If true
, any Job
existing in the
- * JobStore
with the same name & group should be
- * over-written.
- * @throws ObjectAlreadyExistsException
- * if a Job
with the same name/group already
- * exists, and replaceExisting is set to false.
+ *
+ * @param newJob The JobDetail
to be stored.
+ * @param replaceExisting If true
, any Job
existing in the
+ * JobStore
with the same name & group should be
+ * over-written.
+ * @throws ObjectAlreadyExistsException if a Job
with the same name/group already
+ * exists, and replaceExisting is set to false.
*/
public void storeJob(final JobDetail newJob,
- final boolean replaceExisting) throws JobPersistenceException {
+ final boolean replaceExisting) throws JobPersistenceException {
executeInLock(
- (isLockOnInsert() || replaceExisting) ? LOCK_TRIGGER_ACCESS : null,
- new VoidTransactionCallback() {
- public void executeVoid(Connection conn) throws JobPersistenceException {
- storeJob(conn, newJob, replaceExisting);
- }
- });
+ (isLockOnInsert() || replaceExisting) ? LOCK_TRIGGER_ACCESS : null,
+ new VoidTransactionCallback() {
+ public void executeVoid(Connection conn) throws JobPersistenceException {
+ storeJob(conn, newJob, replaceExisting);
+ }
+ });
}
-
+
/**
*
* Insert or update a job.
@@ -1099,12 +1100,12 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*/
protected void storeJob(Connection conn,
JobDetail newJob, boolean replaceExisting)
- throws JobPersistenceException {
+ throws JobPersistenceException {
boolean existingJob = jobExists(conn, newJob.getKey());
try {
if (existingJob) {
- if (!replaceExisting) {
+ if (!replaceExisting) {
throw new ObjectAlreadyExistsException(newJob);
}
getDelegate().updateJobDetail(conn, newJob);
@@ -1139,29 +1140,26 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*
* Store the given {@link Trigger}
.
*
- *
- * @param newTrigger
- * The Trigger
to be stored.
- * @param replaceExisting
- * If true
, any Trigger
existing in
- * the JobStore
with the same name & group should
- * be over-written.
- * @throws ObjectAlreadyExistsException
- * if a Trigger
with the same name/group already
- * exists, and replaceExisting is set to false.
+ *
+ * @param newTrigger The Trigger
to be stored.
+ * @param replaceExisting If true
, any Trigger
existing in
+ * the JobStore
with the same name & group should
+ * be over-written.
+ * @throws ObjectAlreadyExistsException if a Trigger
with the same name/group already
+ * exists, and replaceExisting is set to false.
*/
public void storeTrigger(final OperableTrigger newTrigger,
- final boolean replaceExisting) throws JobPersistenceException {
+ final boolean replaceExisting) throws JobPersistenceException {
executeInLock(
- (isLockOnInsert() || replaceExisting) ? LOCK_TRIGGER_ACCESS : null,
- new VoidTransactionCallback() {
- public void executeVoid(Connection conn) throws JobPersistenceException {
- storeTrigger(conn, newTrigger, null, replaceExisting,
- STATE_WAITING, false, false);
- }
- });
+ (isLockOnInsert() || replaceExisting) ? LOCK_TRIGGER_ACCESS : null,
+ new VoidTransactionCallback() {
+ public void executeVoid(Connection conn) throws JobPersistenceException {
+ storeTrigger(conn, newTrigger, null, replaceExisting,
+ STATE_WAITING, false, false);
+ }
+ });
}
-
+
/**
*
* Insert or update a trigger.
@@ -1171,14 +1169,14 @@ public abstract class JobStoreSupport implements JobStore, Constants {
protected void storeTrigger(Connection conn,
OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state,
boolean forceState, boolean recovering)
- throws JobPersistenceException {
+ throws JobPersistenceException {
boolean existingTrigger = triggerExists(conn, newTrigger.getKey());
- if ((existingTrigger) && (!replaceExisting)) {
+ if ((existingTrigger) && (!replaceExisting)) {
throw new ObjectAlreadyExistsException(newTrigger);
}
-
+
try {
boolean shouldBepaused;
@@ -1187,7 +1185,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
shouldBepaused = getDelegate().isTriggerGroupPaused(
conn, newTrigger.getKey().getGroup());
- if(!shouldBepaused) {
+ if (!shouldBepaused) {
shouldBepaused = getDelegate().isTriggerGroupPaused(conn,
ALL_GROUPS_PAUSED);
@@ -1201,7 +1199,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
}
- if(job == null) {
+ if (job == null) {
job = retrieveJob(conn, newTrigger.getJobKey());
}
if (job == null) {
@@ -1210,17 +1208,17 @@ public abstract class JobStoreSupport implements JobStore, Constants {
+ ") referenced by the trigger does not exist.");
}
- if (job.isConcurrentExectionDisallowed() && !recovering) {
+ if (job.isConcurrentExectionDisallowed() && !recovering) {
state = checkBlockedState(conn, job.getKey(), state);
}
-
+
if (existingTrigger) {
getDelegate().updateTrigger(conn, newTrigger, state, job);
} else {
getDelegate().insertTrigger(conn, newTrigger, state, job);
}
} catch (Exception e) {
- throw new JobPersistenceException("Couldn't store trigger '" + newTrigger.getKey() + "' for '"
+ throw new JobPersistenceException("Couldn't store trigger '" + newTrigger.getKey() + "' for '"
+ newTrigger.getJobKey() + "' job:" + e.getMessage(), e);
}
}
@@ -1245,15 +1243,15 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* name, and any {@link Trigger}
s that reference
* it.
*
- *
+ *
*
* If removal of the Job
results in an empty group, the
* group should be removed from the JobStore
's list of
* known group names.
*
- *
+ *
* @return true
if a Job
with the given name &
- * group was found and removed from the store.
+ * group was found and removed from the store.
*/
public boolean removeJob(final JobKey jobKey) throws JobPersistenceException {
return (Boolean) executeInLock(
@@ -1265,13 +1263,13 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
});
}
-
+
protected boolean removeJob(Connection conn, final JobKey jobKey)
- throws JobPersistenceException {
+ throws JobPersistenceException {
try {
List jobTriggers = getDelegate().selectTriggerKeysForJob(conn, jobKey);
- for (TriggerKey jobTrigger: jobTriggers) {
+ for (TriggerKey jobTrigger : jobTriggers) {
deleteTriggerAndChildren(conn, jobTrigger);
}
@@ -1298,7 +1296,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
});
}
-
+
public boolean removeTriggers(final List triggerKeys)
throws JobPersistenceException {
return (Boolean) executeInLock(
@@ -1315,7 +1313,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
});
}
-
+
public void storeJobsAndTriggers(
final Map> triggersAndJobs, final boolean replace)
throws JobPersistenceException {
@@ -1324,61 +1322,61 @@ public abstract class JobStoreSupport implements JobStore, Constants {
(isLockOnInsert() || replace) ? LOCK_TRIGGER_ACCESS : null,
new VoidTransactionCallback() {
public void executeVoid(Connection conn) throws JobPersistenceException {
-
+
// FUTURE_TODO: make this more efficient with a true bulk operation...
- for(JobDetail job: triggersAndJobs.keySet()) {
+ for (JobDetail job : triggersAndJobs.keySet()) {
storeJob(conn, job, replace);
- for(Trigger trigger: triggersAndJobs.get(job)) {
+ for (Trigger trigger : triggersAndJobs.get(job)) {
storeTrigger(conn, (OperableTrigger) trigger, job, replace,
STATE_WAITING, false, false);
}
}
}
});
- }
-
+ }
+
/**
* Delete a job and its listeners.
- *
+ *
* @see #removeJob(java.sql.Connection, JobKey)
* @see #removeTrigger(Connection, TriggerKey)
*/
private boolean deleteJobAndChildren(Connection conn, JobKey key)
- throws NoSuchDelegateException, SQLException {
+ throws NoSuchDelegateException, SQLException {
return (getDelegate().deleteJobDetail(conn, key) > 0);
}
-
+
/**
* Delete a trigger, its listeners, and its Simple/Cron/BLOB sub-table entry.
- *
+ *
* @see #removeJob(java.sql.Connection, JobKey)
* @see #removeTrigger(Connection, TriggerKey)
* @see #replaceTrigger(Connection, TriggerKey, OperableTrigger)
*/
private boolean deleteTriggerAndChildren(Connection conn, TriggerKey key)
- throws SQLException, NoSuchDelegateException {
+ throws SQLException, NoSuchDelegateException {
return (getDelegate().deleteTrigger(conn, key) > 0);
}
-
+
/**
*
* Retrieve the {@link JobDetail}
for the given
* {@link Job}
.
*
- *
+ *
* @return The desired Job
, or null if there is no match.
*/
public JobDetail retrieveJob(final JobKey jobKey) throws JobPersistenceException {
- return (JobDetail)executeWithoutLock( // no locks necessary for read...
- new TransactionCallback() {
- public Object execute(Connection conn) throws JobPersistenceException {
- return retrieveJob(conn, jobKey);
- }
- });
+ return (JobDetail) executeWithoutLock( // no locks necessary for read...
+ new TransactionCallback() {
+ public Object execute(Connection conn) throws JobPersistenceException {
+ return retrieveJob(conn, jobKey);
+ }
+ });
}
-
+
protected JobDetail retrieveJob(Connection conn, JobKey key) throws JobPersistenceException {
try {
@@ -1403,21 +1401,21 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Remove (delete) the {@link Trigger}
with the
* given name.
*
- *
+ *
*
* If removal of the Trigger
results in an empty group, the
* group should be removed from the JobStore
's list of
* known group names.
*
- *
+ *
*
* If removal of the Trigger
results in an 'orphaned' Job
* that is not 'durable', then the Job
should be deleted
* also.
*
- *
+ *
* @return true
if a Trigger
with the given
- * name & group was found and removed from the store.
+ * name & group was found and removed from the store.
*/
public boolean removeTrigger(final TriggerKey triggerKey) throws JobPersistenceException {
return (Boolean) executeInLock(
@@ -1429,17 +1427,17 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
});
}
-
+
protected boolean removeTrigger(Connection conn, TriggerKey key)
- throws JobPersistenceException {
+ throws JobPersistenceException {
boolean removedTrigger;
try {
// this must be called before we delete the trigger, obviously
JobDetail job = getDelegate().selectJobForTrigger(conn,
getClassLoadHelper(), key, false);
- removedTrigger =
- deleteTriggerAndChildren(conn, key);
+ removedTrigger =
+ deleteTriggerAndChildren(conn, key);
if (null != job && !job.isDurable()) {
int numTriggers = getDelegate().selectNumTriggersForJob(conn,
@@ -1461,11 +1459,11 @@ public abstract class JobStoreSupport implements JobStore, Constants {
return removedTrigger;
}
- /**
+ /**
* @see JobStore#replaceTrigger(TriggerKey, OperableTrigger)
*/
- public boolean replaceTrigger(final TriggerKey triggerKey,
- final OperableTrigger newTrigger) throws JobPersistenceException {
+ public boolean replaceTrigger(final TriggerKey triggerKey,
+ final OperableTrigger newTrigger) throws JobPersistenceException {
return (Boolean) executeInLock(
LOCK_TRIGGER_ACCESS,
new TransactionCallback() {
@@ -1475,10 +1473,10 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
});
}
-
- protected boolean replaceTrigger(Connection conn,
- TriggerKey key, OperableTrigger newTrigger)
- throws JobPersistenceException {
+
+ protected boolean replaceTrigger(Connection conn,
+ TriggerKey key, OperableTrigger newTrigger)
+ throws JobPersistenceException {
try {
// this must be called before we delete the trigger, obviously
JobDetail job = getDelegate().selectJobForTrigger(conn,
@@ -1487,14 +1485,14 @@ public abstract class JobStoreSupport implements JobStore, Constants {
if (job == null) {
return false;
}
-
+
if (!newTrigger.getJobKey().equals(job.getKey())) {
throw new JobPersistenceException("New trigger is not related to the same job as the old trigger.");
}
-
- boolean removedTrigger =
- deleteTriggerAndChildren(conn, key);
-
+
+ boolean removedTrigger =
+ deleteTriggerAndChildren(conn, key);
+
storeTrigger(conn, newTrigger, job, false, STATE_WAITING, false, false);
return removedTrigger;
@@ -1511,21 +1509,21 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*
* Retrieve the given {@link Trigger}
.
*
- *
+ *
* @return The desired Trigger
, or null if there is no
- * match.
+ * match.
*/
public OperableTrigger retrieveTrigger(final TriggerKey triggerKey) throws JobPersistenceException {
- return (OperableTrigger)executeWithoutLock( // no locks necessary for read...
- new TransactionCallback() {
- public Object execute(Connection conn) throws JobPersistenceException {
- return retrieveTrigger(conn, triggerKey);
- }
- });
+ return (OperableTrigger) executeWithoutLock( // no locks necessary for read...
+ new TransactionCallback() {
+ public Object execute(Connection conn) throws JobPersistenceException {
+ return retrieveTrigger(conn, triggerKey);
+ }
+ });
}
-
+
protected OperableTrigger retrieveTrigger(Connection conn, TriggerKey key)
- throws JobPersistenceException {
+ throws JobPersistenceException {
try {
return getDelegate().selectTrigger(conn, key);
@@ -1539,7 +1537,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*
* Get the current state of the identified {@link Trigger}
.
*
- *
+ *
* @see TriggerState#NORMAL
* @see TriggerState#PAUSED
* @see TriggerState#COMPLETE
@@ -1547,16 +1545,16 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* @see TriggerState#NONE
*/
public TriggerState getTriggerState(final TriggerKey triggerKey) throws JobPersistenceException {
- return (TriggerState)executeWithoutLock( // no locks necessary for read...
+ return (TriggerState) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
return getTriggerState(conn, triggerKey);
}
});
}
-
+
public TriggerState getTriggerState(Connection conn, TriggerKey key)
- throws JobPersistenceException {
+ throws JobPersistenceException {
try {
String ts = getDelegate().selectTriggerState(conn, key);
@@ -1600,59 +1598,55 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*
* Store the given {@link Calendar}
.
*
- *
- * @param calName
- * The name of the calendar.
- * @param calendar
- * The Calendar
to be stored.
- * @param replaceExisting
- * If true
, any Calendar
existing
- * in the JobStore
with the same name & group
- * should be over-written.
- * @throws ObjectAlreadyExistsException
- * if a Calendar
with the same name already
- * exists, and replaceExisting is set to false.
+ *
+ * @param calName The name of the calendar.
+ * @param calendar The Calendar
to be stored.
+ * @param replaceExisting If true
, any Calendar
existing
+ * in the JobStore
with the same name & group
+ * should be over-written.
+ * @throws ObjectAlreadyExistsException if a Calendar
with the same name already
+ * exists, and replaceExisting is set to false.
*/
public void storeCalendar(final String calName,
final Calendar calendar, final boolean replaceExisting, final boolean updateTriggers)
- throws JobPersistenceException {
+ throws JobPersistenceException {
executeInLock(
- (isLockOnInsert() || updateTriggers) ? LOCK_TRIGGER_ACCESS : null,
- new VoidTransactionCallback() {
- public void executeVoid(Connection conn) throws JobPersistenceException {
- storeCalendar(conn, calName, calendar, replaceExisting, updateTriggers);
- }
- });
+ (isLockOnInsert() || updateTriggers) ? LOCK_TRIGGER_ACCESS : null,
+ new VoidTransactionCallback() {
+ public void executeVoid(Connection conn) throws JobPersistenceException {
+ storeCalendar(conn, calName, calendar, replaceExisting, updateTriggers);
+ }
+ });
}
-
+
protected void storeCalendar(Connection conn,
String calName, Calendar calendar, boolean replaceExisting, boolean updateTriggers)
- throws JobPersistenceException {
+ throws JobPersistenceException {
try {
boolean existingCal = calendarExists(conn, calName);
- if (existingCal && !replaceExisting) {
+ if (existingCal && !replaceExisting) {
throw new ObjectAlreadyExistsException(
- "Calendar with name '" + calName + "' already exists.");
+ "Calendar with name '" + calName + "' already exists.");
}
if (existingCal) {
- if (getDelegate().updateCalendar(conn, calName, calendar) < 1) {
+ if (getDelegate().updateCalendar(conn, calName, calendar) < 1) {
throw new JobPersistenceException(
- "Couldn't store calendar. Update failed.");
+ "Couldn't store calendar. Update failed.");
}
-
- if(updateTriggers) {
+
+ if (updateTriggers) {
List trigs = getDelegate().selectTriggersForCalendar(conn, calName);
-
- for(OperableTrigger trigger: trigs) {
+
+ for (OperableTrigger trigger : trigs) {
trigger.updateWithNewCalendar(calendar, getMisfireThreshold());
storeTrigger(conn, trigger, null, true, STATE_WAITING, false, false);
}
}
} else {
- if (getDelegate().insertCalendar(conn, calName, calendar) < 1) {
+ if (getDelegate().insertCalendar(conn, calName, calendar) < 1) {
throw new JobPersistenceException(
- "Couldn't store calendar. Insert failed.");
+ "Couldn't store calendar. Insert failed.");
}
}
@@ -1667,14 +1661,14 @@ public abstract class JobStoreSupport implements JobStore, Constants {
} catch (ClassNotFoundException e) {
throw new JobPersistenceException("Couldn't store calendar: "
+ e.getMessage(), e);
- }catch (SQLException e) {
+ } catch (SQLException e) {
throw new JobPersistenceException("Couldn't store calendar: "
+ e.getMessage(), e);
}
}
-
+
protected boolean calendarExists(Connection conn, String calName)
- throws JobPersistenceException {
+ throws JobPersistenceException {
try {
return getDelegate().calendarExists(conn, calName);
} catch (SQLException e) {
@@ -1689,18 +1683,19 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Remove (delete) the {@link Calendar}
with the
* given name.
*
- *
+ *
*
* If removal of the Calendar
would result in
* Trigger
s pointing to non-existent calendars, then a
* JobPersistenceException
will be thrown.
- * *
+ * *
+ *
* @param calName The name of the Calendar
to be removed.
* @return true
if a Calendar
with the given name
* was found and removed from the store.
*/
public boolean removeCalendar(final String calName)
- throws JobPersistenceException {
+ throws JobPersistenceException {
return (Boolean) executeInLock(
LOCK_TRIGGER_ACCESS,
new TransactionCallback() {
@@ -1710,13 +1705,13 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
});
}
-
- protected boolean removeCalendar(Connection conn,
- String calName) throws JobPersistenceException {
+
+ protected boolean removeCalendar(Connection conn,
+ String calName) throws JobPersistenceException {
try {
- if (getDelegate().calendarIsReferenced(conn, calName)) {
+ if (getDelegate().calendarIsReferenced(conn, calName)) {
throw new JobPersistenceException(
- "Calender cannot be removed if it referenced by a trigger!");
+ "Calender cannot be removed if it referenced by a trigger!");
}
if (!isClustered) {
@@ -1734,25 +1729,24 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*
* Retrieve the given {@link Trigger}
.
*
- *
- * @param calName
- * The name of the Calendar
to be retrieved.
+ *
+ * @param calName The name of the Calendar
to be retrieved.
* @return The desired Calendar
, or null if there is no
- * match.
+ * match.
*/
public Calendar retrieveCalendar(final String calName)
- throws JobPersistenceException {
- return (Calendar)executeWithoutLock( // no locks necessary for read...
- new TransactionCallback() {
- public Object execute(Connection conn) throws JobPersistenceException {
- return retrieveCalendar(conn, calName);
- }
- });
+ throws JobPersistenceException {
+ return (Calendar) executeWithoutLock( // no locks necessary for read...
+ new TransactionCallback() {
+ public Object execute(Connection conn) throws JobPersistenceException {
+ return retrieveCalendar(conn, calName);
+ }
+ });
}
-
+
protected Calendar retrieveCalendar(Connection conn,
String calName)
- throws JobPersistenceException {
+ throws JobPersistenceException {
// all calendars are persistent, but we can lazy-cache them during run
// time as long as we aren't running clustered.
Calendar cal = (isClustered) ? null : calendarCache.get(calName);
@@ -1787,7 +1781,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*
*/
public int getNumberOfJobs()
- throws JobPersistenceException {
+ throws JobPersistenceException {
return (Integer) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
@@ -1795,9 +1789,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
});
}
-
+
protected int getNumberOfJobs(Connection conn)
- throws JobPersistenceException {
+ throws JobPersistenceException {
try {
return getDelegate().selectNumJobs(conn);
} catch (SQLException e) {
@@ -1813,7 +1807,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*
*/
public int getNumberOfTriggers()
- throws JobPersistenceException {
+ throws JobPersistenceException {
return (Integer) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
@@ -1821,9 +1815,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
});
}
-
+
protected int getNumberOfTriggers(Connection conn)
- throws JobPersistenceException {
+ throws JobPersistenceException {
try {
return getDelegate().selectNumTriggers(conn);
} catch (SQLException e) {
@@ -1839,7 +1833,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*
*/
public int getNumberOfCalendars()
- throws JobPersistenceException {
+ throws JobPersistenceException {
return (Integer) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
@@ -1847,9 +1841,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
});
}
-
+
protected int getNumberOfCalendars(Connection conn)
- throws JobPersistenceException {
+ throws JobPersistenceException {
try {
return getDelegate().selectNumCalendars(conn);
} catch (SQLException e) {
@@ -1863,24 +1857,24 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Get the names of all of the {@link Job}
s that
* matcher the given groupMatcher.
*
- *
+ *
*
* If there are no jobs in the given group name, the result should be an empty Set
*
*/
@SuppressWarnings("unchecked")
public Set getJobKeys(final GroupMatcher matcher)
- throws JobPersistenceException {
- return (Set)executeWithoutLock( // no locks necessary for read...
- new TransactionCallback() {
- public Object execute(Connection conn) throws JobPersistenceException {
- return getJobNames(conn, matcher);
- }
- });
+ throws JobPersistenceException {
+ return (Set) executeWithoutLock( // no locks necessary for read...
+ new TransactionCallback() {
+ public Object execute(Connection conn) throws JobPersistenceException {
+ return getJobNames(conn, matcher);
+ }
+ });
}
-
+
protected Set getJobNames(Connection conn,
- GroupMatcher matcher) throws JobPersistenceException {
+ GroupMatcher matcher) throws JobPersistenceException {
Set jobNames;
try {
@@ -1892,25 +1886,25 @@ public abstract class JobStoreSupport implements JobStore, Constants {
return jobNames;
}
-
-
+
+
/**
* Determine whether a {@link Job} with the given identifier already
* exists within the scheduler.
- *
+ *
* @param jobKey the identifier to check for
* @return true if a Job exists with the given identifier
* @throws JobPersistenceException
*/
public boolean checkExists(final JobKey jobKey) throws JobPersistenceException {
- return (Boolean)executeWithoutLock( // no locks necessary for read...
+ return (Boolean) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
return checkExists(conn, jobKey);
}
});
}
-
+
protected boolean checkExists(Connection conn, JobKey jobKey) throws JobPersistenceException {
try {
return getDelegate().jobExists(conn, jobKey);
@@ -1919,24 +1913,24 @@ public abstract class JobStoreSupport implements JobStore, Constants {
+ e.getMessage(), e);
}
}
-
+
/**
- * Determine whether a {@link Trigger} with the given identifier already
+ * Determine whether a {@link Trigger} with the given identifier already
* exists within the scheduler.
- *
+ *
* @param triggerKey the identifier to check for
* @return true if a Trigger exists with the given identifier
* @throws JobPersistenceException
*/
public boolean checkExists(final TriggerKey triggerKey) throws JobPersistenceException {
- return (Boolean)executeWithoutLock( // no locks necessary for read...
+ return (Boolean) executeWithoutLock( // no locks necessary for read...
new TransactionCallback() {
public Object execute(Connection conn) throws JobPersistenceException {
return checkExists(conn, triggerKey);
}
});
}
-
+
protected boolean checkExists(Connection conn, TriggerKey triggerKey) throws JobPersistenceException {
try {
return getDelegate().triggerExists(conn, triggerKey);
@@ -1949,7 +1943,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
/**
* Clear (delete!) all scheduling data - all {@link Job}s, {@link Trigger}s
* {@link Calendar}s.
- *
+ *
* @throws JobPersistenceException
*/
public void clearAllSchedulingData() throws JobPersistenceException {
@@ -1961,7 +1955,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
});
}
-
+
protected void clearAllSchedulingData(Connection conn) throws JobPersistenceException {
try {
getDelegate().clearData(conn);
@@ -1969,13 +1963,13 @@ public abstract class JobStoreSupport implements JobStore, Constants {
throw new JobPersistenceException("Error clearing scheduling data: " + e.getMessage(), e);
}
}
-
+
/**
*
* Get the names of all of the {@link Trigger}
s
* that match the given group Matcher.
*
- *
+ *
*
* If there are no triggers in the given group name, the result should be a
* an empty Set (not null
).
@@ -1983,19 +1977,19 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*/
@SuppressWarnings("unchecked")
public Set getTriggerKeys(final GroupMatcher matcher)
- throws JobPersistenceException {
- return (Set)executeWithoutLock( // no locks necessary for read...
- new TransactionCallback() {
- public Object execute(Connection conn) throws JobPersistenceException {
- return getTriggerNames(conn, matcher);
- }
- });
- }
-
- protected Set getTriggerNames(Connection conn,
- GroupMatcher matcher) throws JobPersistenceException {
-
- Set trigNames;
+ throws JobPersistenceException {
+ return (Set) executeWithoutLock( // no locks necessary for read...
+ new TransactionCallback() {
+ public Object execute(Connection conn) throws JobPersistenceException {
+ return getTriggerNames(conn, matcher);
+ }
+ });
+ }
+
+ protected Set getTriggerNames(Connection conn,
+ GroupMatcher matcher) throws JobPersistenceException {
+
+ Set trigNames;
try {
trigNames = getDelegate().selectTriggersInGroup(conn, matcher);
@@ -2013,7 +2007,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Get the names of all of the {@link Job}
* groups.
*
- *
+ *
*
* If there are no known group names, the result should be a zero-length
* array (not null
).
@@ -2021,17 +2015,17 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*/
@SuppressWarnings("unchecked")
public List getJobGroupNames()
- throws JobPersistenceException {
- return (List)executeWithoutLock( // no locks necessary for read...
- new TransactionCallback() {
- public Object execute(Connection conn) throws JobPersistenceException {
- return getJobGroupNames(conn);
- }
- });
+ throws JobPersistenceException {
+ return (List) executeWithoutLock( // no locks necessary for read...
+ new TransactionCallback() {
+ public Object execute(Connection conn) throws JobPersistenceException {
+ return getJobGroupNames(conn);
+ }
+ });
}
-
+
protected List getJobGroupNames(Connection conn)
- throws JobPersistenceException {
+ throws JobPersistenceException {
List groupNames;
@@ -2050,7 +2044,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Get the names of all of the {@link Trigger}
* groups.
*
- *
+ *
*
* If there are no known group names, the result should be a zero-length
* array (not null
).
@@ -2058,15 +2052,15 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*/
@SuppressWarnings("unchecked")
public List getTriggerGroupNames()
- throws JobPersistenceException {
- return (List)executeWithoutLock( // no locks necessary for read...
- new TransactionCallback() {
- public Object execute(Connection conn) throws JobPersistenceException {
- return getTriggerGroupNames(conn);
- }
- });
+ throws JobPersistenceException {
+ return (List) executeWithoutLock( // no locks necessary for read...
+ new TransactionCallback() {
+ public Object execute(Connection conn) throws JobPersistenceException {
+ return getTriggerGroupNames(conn);
+ }
+ });
}
-
+
protected List getTriggerGroupNames(Connection conn) throws JobPersistenceException {
List groupNames;
@@ -2086,7 +2080,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Get the names of all of the {@link Calendar}
s
* in the JobStore
.
*
- *
+ *
*
* If there are no Calendars in the given group name, the result should be
* a zero-length array (not null
).
@@ -2094,17 +2088,17 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*/
@SuppressWarnings("unchecked")
public List getCalendarNames()
- throws JobPersistenceException {
- return (List)executeWithoutLock( // no locks necessary for read...
- new TransactionCallback() {
- public Object execute(Connection conn) throws JobPersistenceException {
- return getCalendarNames(conn);
- }
- });
+ throws JobPersistenceException {
+ return (List) executeWithoutLock( // no locks necessary for read...
+ new TransactionCallback() {
+ public Object execute(Connection conn) throws JobPersistenceException {
+ return getCalendarNames(conn);
+ }
+ });
}
-
+
protected List getCalendarNames(Connection conn)
- throws JobPersistenceException {
+ throws JobPersistenceException {
try {
return getDelegate().selectCalendars(conn);
} catch (SQLException e) {
@@ -2117,24 +2111,24 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*
* Get all of the Triggers that are associated to the given Job.
*
- *
+ *
*
* If there are no matches, a zero-length array should be returned.
*
*/
@SuppressWarnings("unchecked")
public List getTriggersForJob(final JobKey jobKey) throws JobPersistenceException {
- return (List)executeWithoutLock( // no locks necessary for read...
- new TransactionCallback() {
- public Object execute(Connection conn) throws JobPersistenceException {
- return getTriggersForJob(conn, jobKey);
- }
- });
+ return (List) executeWithoutLock( // no locks necessary for read...
+ new TransactionCallback() {
+ public Object execute(Connection conn) throws JobPersistenceException {
+ return getTriggersForJob(conn, jobKey);
+ }
+ });
}
-
+
protected List getTriggersForJob(Connection conn,
- JobKey key)
- throws JobPersistenceException {
+ JobKey key)
+ throws JobPersistenceException {
List list;
try {
@@ -2152,29 +2146,29 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*
* Pause the {@link Trigger}
with the given name.
*
- *
+ *
* @see #resumeTrigger(TriggerKey)
*/
public void pauseTrigger(final TriggerKey triggerKey) throws JobPersistenceException {
executeInLock(
- LOCK_TRIGGER_ACCESS,
- new VoidTransactionCallback() {
- public void executeVoid(Connection conn) throws JobPersistenceException {
- pauseTrigger(conn, triggerKey);
- }
- });
+ LOCK_TRIGGER_ACCESS,
+ new VoidTransactionCallback() {
+ public void executeVoid(Connection conn) throws JobPersistenceException {
+ pauseTrigger(conn, triggerKey);
+ }
+ });
}
-
+
/**
*
* Pause the {@link Trigger}
with the given name.
*
- *
+ *
* @see #resumeTrigger(Connection, TriggerKey)
*/
- public void pauseTrigger(Connection conn,
- TriggerKey triggerKey)
- throws JobPersistenceException {
+ public void pauseTrigger(Connection conn,
+ TriggerKey triggerKey)
+ throws JobPersistenceException {
try {
String oldState = getDelegate().selectTriggerState(conn,
@@ -2200,71 +2194,71 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Pause the {@link Job}
with the given name - by
* pausing all of its current Trigger
s.
*
- *
+ *
* @see #resumeJob(JobKey)
*/
public void pauseJob(final JobKey jobKey) throws JobPersistenceException {
executeInLock(
- LOCK_TRIGGER_ACCESS,
- new VoidTransactionCallback() {
- public void executeVoid(Connection conn) throws JobPersistenceException {
- List triggers = getTriggersForJob(conn, jobKey);
- for (OperableTrigger trigger: triggers) {
- pauseTrigger(conn, trigger.getKey());
+ LOCK_TRIGGER_ACCESS,
+ new VoidTransactionCallback() {
+ public void executeVoid(Connection conn) throws JobPersistenceException {
+ List triggers = getTriggersForJob(conn, jobKey);
+ for (OperableTrigger trigger : triggers) {
+ pauseTrigger(conn, trigger.getKey());
+ }
}
- }
- });
+ });
}
-
+
/**
*
* Pause all of the {@link Job}s
matching the given
* groupMatcher - by pausing all of their Trigger
s.
*
- *
+ *
* @see #resumeJobs(GroupMatcher)
*/
@SuppressWarnings("unchecked")
public Set pauseJobs(final GroupMatcher matcher)
- throws JobPersistenceException {
+ throws JobPersistenceException {
return (Set) executeInLock(
- LOCK_TRIGGER_ACCESS,
- new TransactionCallback() {
- public Set execute(final Connection conn) throws JobPersistenceException {
- Set groupNames = new HashSet();
- Set jobNames = getJobNames(conn, matcher);
-
- for (JobKey jobKey : jobNames) {
- List triggers = getTriggersForJob(conn, jobKey);
- for (OperableTrigger trigger : triggers) {
- pauseTrigger(conn, trigger.getKey());
+ LOCK_TRIGGER_ACCESS,
+ new TransactionCallback() {
+ public Set execute(final Connection conn) throws JobPersistenceException {
+ Set groupNames = new HashSet();
+ Set jobNames = getJobNames(conn, matcher);
+
+ for (JobKey jobKey : jobNames) {
+ List triggers = getTriggersForJob(conn, jobKey);
+ for (OperableTrigger trigger : triggers) {
+ pauseTrigger(conn, trigger.getKey());
+ }
+ groupNames.add(jobKey.getGroup());
}
- groupNames.add(jobKey.getGroup());
- }
- return groupNames;
+ return groupNames;
+ }
}
- }
- );
+ );
}
-
+
/**
- * Determines if a Trigger for the given job should be blocked.
- * State can only transition to STATE_PAUSED_BLOCKED/BLOCKED from
+ * Determines if a Trigger for the given job should be blocked.
+ * State can only transition to STATE_PAUSED_BLOCKED/BLOCKED from
* PAUSED/STATE_WAITING respectively.
- *
- * @return STATE_PAUSED_BLOCKED, BLOCKED, or the currentState.
+ *
+ * @return STATE_PAUSED_BLOCKED, BLOCKED, or the currentState.
*/
protected String checkBlockedState(
Connection conn, JobKey jobKey, String currentState)
- throws JobPersistenceException {
+ throws JobPersistenceException {
// State can only transition to BLOCKED from PAUSED or WAITING.
if ((!currentState.equals(STATE_WAITING)) &&
- (!currentState.equals(STATE_PAUSED))) {
+ (!currentState.equals(STATE_PAUSED))) {
return currentState;
}
-
+
try {
List lst = getDelegate().selectFiredTriggerRecordsByJob(conn,
jobKey.getName(), jobKey.getGroup());
@@ -2279,9 +2273,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
return currentState;
} catch (SQLException e) {
throw new JobPersistenceException(
- "Couldn't determine if trigger should be in a blocked state '"
- + jobKey + "': "
- + e.getMessage(), e);
+ "Couldn't determine if trigger should be in a blocked state '"
+ + jobKey + "': "
+ + e.getMessage(), e);
}
}
@@ -2291,40 +2285,40 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Resume (un-pause) the {@link Trigger}
with the
* given name.
*
- *
+ *
*
* If the Trigger
missed one or more fire-times, then the
* Trigger
's misfire instruction will be applied.
*
- *
+ *
* @see #pauseTrigger(TriggerKey)
*/
public void resumeTrigger(final TriggerKey triggerKey) throws JobPersistenceException {
executeInLock(
- LOCK_TRIGGER_ACCESS,
- new VoidTransactionCallback() {
- public void executeVoid(Connection conn) throws JobPersistenceException {
- resumeTrigger(conn, triggerKey);
- }
- });
+ LOCK_TRIGGER_ACCESS,
+ new VoidTransactionCallback() {
+ public void executeVoid(Connection conn) throws JobPersistenceException {
+ resumeTrigger(conn, triggerKey);
+ }
+ });
}
-
+
/**
*
* Resume (un-pause) the {@link Trigger}
with the
* given name.
*
- *
+ *
*
* If the Trigger
missed one or more fire-times, then the
* Trigger
's misfire instruction will be applied.
*
- *
+ *
* @see #pauseTrigger(Connection, TriggerKey)
*/
- public void resumeTrigger(Connection conn,
- TriggerKey key)
- throws JobPersistenceException {
+ public void resumeTrigger(Connection conn,
+ TriggerKey key)
+ throws JobPersistenceException {
try {
TriggerStatus status = getDelegate().selectTriggerStatus(conn,
@@ -2335,7 +2329,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
boolean blocked = false;
- if(STATE_PAUSED_BLOCKED.equals(status.getStatus())) {
+ if (STATE_PAUSED_BLOCKED.equals(status.getStatus())) {
blocked = true;
}
@@ -2345,18 +2339,18 @@ public abstract class JobStoreSupport implements JobStore, Constants {
if (schedulerRunning && status.getNextFireTime().before(new Date())) {
misfired = updateMisfiredTrigger(conn, key,
- newState, true);
+ newState, true);
}
- if(!misfired) {
- if(blocked) {
+ if (!misfired) {
+ if (blocked) {
getDelegate().updateTriggerStateFromOtherState(conn,
key, newState, STATE_PAUSED_BLOCKED);
} else {
getDelegate().updateTriggerStateFromOtherState(conn,
key, newState, STATE_PAUSED);
}
- }
+ }
} catch (SQLException e) {
throw new JobPersistenceException("Couldn't resume trigger '"
@@ -2369,94 +2363,94 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Resume (un-pause) the {@link Job}
with the
* given name.
*
- *
+ *
*
* If any of the Job
'sTrigger
s missed one
* or more fire-times, then the Trigger
's misfire
* instruction will be applied.
*
- *
+ *
* @see #pauseJob(JobKey)
*/
public void resumeJob(final JobKey jobKey) throws JobPersistenceException {
executeInLock(
- LOCK_TRIGGER_ACCESS,
- new VoidTransactionCallback() {
- public void executeVoid(Connection conn) throws JobPersistenceException {
- List triggers = getTriggersForJob(conn, jobKey);
- for (OperableTrigger trigger: triggers) {
- resumeTrigger(conn, trigger.getKey());
+ LOCK_TRIGGER_ACCESS,
+ new VoidTransactionCallback() {
+ public void executeVoid(Connection conn) throws JobPersistenceException {
+ List triggers = getTriggersForJob(conn, jobKey);
+ for (OperableTrigger trigger : triggers) {
+ resumeTrigger(conn, trigger.getKey());
+ }
}
- }
- });
+ });
}
-
+
/**
*
* Resume (un-pause) all of the {@link Job}s
in
* the given group.
*
- *
+ *
*
* If any of the Job
s had Trigger
s that
* missed one or more fire-times, then the Trigger
's
* misfire instruction will be applied.
*
- *
+ *
* @see #pauseJobs(GroupMatcher)
*/
@SuppressWarnings("unchecked")
public Set resumeJobs(final GroupMatcher matcher)
- throws JobPersistenceException {
+ throws JobPersistenceException {
return (Set) executeInLock(
- LOCK_TRIGGER_ACCESS,
- new TransactionCallback() {
- public Set execute(Connection conn) throws JobPersistenceException {
- Set jobKeys = getJobNames(conn, matcher);
- Set groupNames = new HashSet();
-
- for (JobKey jobKey: jobKeys) {
- List triggers = getTriggersForJob(conn, jobKey);
- for (OperableTrigger trigger: triggers) {
- resumeTrigger(conn, trigger.getKey());
+ LOCK_TRIGGER_ACCESS,
+ new TransactionCallback() {
+ public Set execute(Connection conn) throws JobPersistenceException {
+ Set jobKeys = getJobNames(conn, matcher);
+ Set groupNames = new HashSet();
+
+ for (JobKey jobKey : jobKeys) {
+ List triggers = getTriggersForJob(conn, jobKey);
+ for (OperableTrigger trigger : triggers) {
+ resumeTrigger(conn, trigger.getKey());
+ }
+ groupNames.add(jobKey.getGroup());
}
- groupNames.add(jobKey.getGroup());
+ return groupNames;
}
- return groupNames;
- }
- });
+ });
}
-
+
/**
*
* Pause all of the {@link Trigger}s
matching the
* given groupMatcher.
*
- *
+ *
* @see #resumeTriggerGroup(java.sql.Connection, GroupMatcher)
*/
@SuppressWarnings("unchecked")
public Set pauseTriggers(final GroupMatcher matcher)
- throws JobPersistenceException {
+ throws JobPersistenceException {
return (Set) executeInLock(
- LOCK_TRIGGER_ACCESS,
- new TransactionCallback() {
- public Set execute(Connection conn) throws JobPersistenceException {
- return pauseTriggerGroup(conn, matcher);
- }
- });
+ LOCK_TRIGGER_ACCESS,
+ new TransactionCallback() {
+ public Set execute(Connection conn) throws JobPersistenceException {
+ return pauseTriggerGroup(conn, matcher);
+ }
+ });
}
-
+
/**
*
* Pause all of the {@link Trigger}s
matching the
* given groupMatcher.
*
- *
+ *
* @see #resumeTriggerGroup(java.sql.Connection, GroupMatcher)
*/
public Set pauseTriggerGroup(Connection conn,
- GroupMatcher matcher) throws JobPersistenceException {
+ GroupMatcher matcher) throws JobPersistenceException {
try {
@@ -2468,11 +2462,11 @@ public abstract class JobStoreSupport implements JobStore, Constants {
conn, matcher, STATE_PAUSED_BLOCKED, STATE_BLOCKED);
List groups = getDelegate().selectTriggerGroups(conn, matcher);
-
+
// make sure to account for an exact group match for a group that doesn't yet exist
StringMatcher.StringOperatorName operator = matcher.getCompareWithOperator();
if (operator.equals(StringOperatorName.EQUALS) && !groups.contains(matcher.getCompareToValue())) {
- groups.add(matcher.getCompareToValue());
+ groups.add(matcher.getCompareToValue());
}
for (String group : groups) {
@@ -2490,26 +2484,26 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
@SuppressWarnings("unchecked")
- public Set getPausedTriggerGroups()
- throws JobPersistenceException {
- return (Set)executeWithoutLock( // no locks necessary for read...
- new TransactionCallback() {
- public Object execute(Connection conn) throws JobPersistenceException {
- return getPausedTriggerGroups(conn);
- }
- });
- }
-
+ public Set getPausedTriggerGroups()
+ throws JobPersistenceException {
+ return (Set) executeWithoutLock( // no locks necessary for read...
+ new TransactionCallback() {
+ public Object execute(Connection conn) throws JobPersistenceException {
+ return getPausedTriggerGroups(conn);
+ }
+ });
+ }
+
/**
*
* Pause all of the {@link Trigger}s
in the
* given group.
*
- *
+ *
* @see #resumeTriggers(GroupMatcher)
*/
- public Set getPausedTriggerGroups(Connection conn)
- throws JobPersistenceException {
+ public Set getPausedTriggerGroups(Connection conn)
+ throws JobPersistenceException {
try {
return getDelegate().selectPausedTriggerGroups(conn);
@@ -2518,48 +2512,48 @@ public abstract class JobStoreSupport implements JobStore, Constants {
"Couldn't determine paused trigger groups: " + e.getMessage(), e);
}
}
-
+
/**
*
* Resume (un-pause) all of the {@link Trigger}s
* matching the given groupMatcher.
*
- *
+ *
*
* If any Trigger
missed one or more fire-times, then the
* Trigger
's misfire instruction will be applied.
*
- *
+ *
* @see #pauseTriggers(GroupMatcher)
*/
@SuppressWarnings("unchecked")
public Set resumeTriggers(final GroupMatcher matcher)
- throws JobPersistenceException {
+ throws JobPersistenceException {
return (Set) executeInLock(
- LOCK_TRIGGER_ACCESS,
- new TransactionCallback() {
- public Set execute(Connection conn) throws JobPersistenceException {
- return resumeTriggerGroup(conn, matcher);
- }
- });
+ LOCK_TRIGGER_ACCESS,
+ new TransactionCallback() {
+ public Set execute(Connection conn) throws JobPersistenceException {
+ return resumeTriggerGroup(conn, matcher);
+ }
+ });
}
-
+
/**
*
* Resume (un-pause) all of the {@link Trigger}s
* matching the given groupMatcher.
*
- *
+ *
*
* If any Trigger
missed one or more fire-times, then the
* Trigger
's misfire instruction will be applied.
*
- *
+ *
* @see #pauseTriggers(GroupMatcher)
*/
public Set resumeTriggerGroup(Connection conn,
- GroupMatcher matcher) throws JobPersistenceException {
+ GroupMatcher matcher) throws JobPersistenceException {
try {
@@ -2569,7 +2563,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
Set keys = getDelegate().selectTriggersInGroup(conn,
matcher);
- for (TriggerKey key: keys) {
+ for (TriggerKey key : keys) {
resumeTrigger(conn, key);
groups.add(key.getGroup());
}
@@ -2583,25 +2577,25 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* int res =
* getDelegate().updateTriggerGroupStateFromOtherState(conn,
* groupName, STATE_WAITING, PAUSED);
- *
+ *
* if(res > 0) {
- *
+ *
* long misfireTime = System.currentTimeMillis();
* if(getMisfireThreshold() > 0) misfireTime -=
* getMisfireThreshold();
- *
+ *
* Key[] misfires =
* getDelegate().selectMisfiredTriggersInGroupInState(conn,
* groupName, STATE_WAITING, misfireTime);
- *
+ *
* List blockedTriggers = findTriggersToBeBlocked(conn,
* groupName);
- *
+ *
* Iterator itr = blockedTriggers.iterator(); while(itr.hasNext()) {
* Key key = (Key)itr.next();
* getDelegate().updateTriggerState(conn, key.getName(),
* key.getGroup(), BLOCKED); }
- *
+ *
* for(int i=0; i < misfires.length; i++) { String
* newState = STATE_WAITING;
* if(blockedTriggers.contains(misfires[i])) newState =
@@ -2620,45 +2614,45 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Pause all triggers - equivalent of calling pauseTriggerGroup(group)
* on every group.
*
- *
+ *
*
* When resumeAll()
is called (to un-pause), trigger misfire
* instructions WILL be applied.
*
- *
+ *
* @see #resumeAll()
* @see #pauseTriggerGroup(java.sql.Connection, GroupMatcher)
*/
public void pauseAll() throws JobPersistenceException {
executeInLock(
- LOCK_TRIGGER_ACCESS,
- new VoidTransactionCallback() {
- public void executeVoid(Connection conn) throws JobPersistenceException {
- pauseAll(conn);
- }
- });
+ LOCK_TRIGGER_ACCESS,
+ new VoidTransactionCallback() {
+ public void executeVoid(Connection conn) throws JobPersistenceException {
+ pauseAll(conn);
+ }
+ });
}
-
+
/**
*
* Pause all triggers - equivalent of calling pauseTriggerGroup(group)
* on every group.
*
- *
+ *
*
* When resumeAll()
is called (to un-pause), trigger misfire
* instructions WILL be applied.
*
- *
+ *
* @see #resumeAll(Connection)
* @see #pauseTriggerGroup(java.sql.Connection, GroupMatcher)
*/
public void pauseAll(Connection conn)
- throws JobPersistenceException {
+ throws JobPersistenceException {
List names = getTriggerGroupNames(conn);
- for (String name: names) {
+ for (String name : names) {
pauseTriggerGroup(conn, GroupMatcher.triggerGroupEquals(name));
}
@@ -2679,45 +2673,45 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Resume (un-pause) all triggers - equivalent of calling resumeTriggerGroup(group)
* on every group.
*
- *
+ *
*
* If any Trigger
missed one or more fire-times, then the
* Trigger
's misfire instruction will be applied.
*
- *
+ *
* @see #pauseAll()
*/
public void resumeAll()
- throws JobPersistenceException {
+ throws JobPersistenceException {
executeInLock(
- LOCK_TRIGGER_ACCESS,
- new VoidTransactionCallback() {
- public void executeVoid(Connection conn) throws JobPersistenceException {
- resumeAll(conn);
- }
- });
+ LOCK_TRIGGER_ACCESS,
+ new VoidTransactionCallback() {
+ public void executeVoid(Connection conn) throws JobPersistenceException {
+ resumeAll(conn);
+ }
+ });
}
-
+
/**
* protected
*
* Resume (un-pause) all triggers - equivalent of calling resumeTriggerGroup(group)
* on every group.
*
- *
+ *
*
* If any Trigger
missed one or more fire-times, then the
* Trigger
's misfire instruction will be applied.
*
- *
+ *
* @see #pauseAll(Connection)
*/
public void resumeAll(Connection conn)
- throws JobPersistenceException {
+ throws JobPersistenceException {
List names = getTriggerGroupNames(conn);
- for (String name: names) {
+ for (String name : names) {
resumeTriggerGroup(conn, GroupMatcher.triggerGroupEquals(name));
}
@@ -2740,20 +2734,20 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* Get a handle to the next N triggers to be fired, and mark them as 'reserved'
* by the calling scheduler.
*
- *
+ *
* @see #releaseAcquiredTrigger(OperableTrigger)
*/
@SuppressWarnings("unchecked")
public List acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
- throws JobPersistenceException {
-
+ throws JobPersistenceException {
+
String lockName;
- if(isAcquireTriggersWithinLock() || maxCount > 1) {
+ if (isAcquireTriggersWithinLock() || maxCount > 1) {
lockName = LOCK_TRIGGER_ACCESS;
} else {
lockName = null;
}
- return executeInNonManagedTXLock(lockName,
+ return executeInNonManagedTXLock(lockName,
new TransactionCallback>() {
public List execute(Connection conn) throws JobPersistenceException {
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
@@ -2779,37 +2773,37 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
});
}
-
+
// FUTURE_TODO: this really ought to return something like a FiredTriggerBundle,
// so that the fireInstanceId doesn't have to be on the trigger...
protected List acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
- throws JobPersistenceException {
+ throws JobPersistenceException {
if (timeWindow < 0) {
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException();
}
-
+
List acquiredTriggers = new ArrayList();
Set acquiredJobKeysForNoConcurrentExec = new HashSet();
final int MAX_DO_LOOP_RETRY = 3;
int currentLoopCount = 0;
do {
- currentLoopCount ++;
+ currentLoopCount++;
try {
List keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
-
+
// No trigger is ready to fire yet.
if (keys == null || keys.size() == 0)
return acquiredTriggers;
long batchEnd = noLaterThan;
- for(TriggerKey triggerKey: keys) {
+ for (TriggerKey triggerKey : keys) {
// If our trigger is no longer available, try a new one.
OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);
- if(nextTrigger == null) {
+ if (nextTrigger == null) {
continue; // next trigger
}
-
+
// If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then
// put it back into the timeTriggers set and continue to search for next trigger.
JobKey jobKey = nextTrigger.getJobKey();
@@ -2825,7 +2819,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
continue;
}
-
+
if (job.isConcurrentExectionDisallowed()) {
if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
continue; // next trigger
@@ -2833,9 +2827,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
acquiredJobKeysForNoConcurrentExec.add(jobKey);
}
}
-
+
if (nextTrigger.getNextFireTime().getTime() > batchEnd) {
- break;
+ break;
}
// We now have a acquired trigger, let's add to return list.
// If our trigger was no longer in the expected state, try a new one.
@@ -2846,7 +2840,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);
- if(acquiredTriggers.isEmpty()) {
+ if (acquiredTriggers.isEmpty()) {
batchEnd = Math.max(nextTrigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow;
}
acquiredTriggers.add(nextTrigger);
@@ -2854,22 +2848,22 @@ public abstract class JobStoreSupport implements JobStore, Constants {
// if we didn't end up with any trigger to fire from that first
// batch, try again for another batch. We allow with a max retry count.
- if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {
+ if (acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {
continue;
}
-
+
// We are done with the while loop.
break;
} catch (Exception e) {
throw new JobPersistenceException(
- "Couldn't acquire next trigger: " + e.getMessage(), e);
+ "Couldn't acquire next trigger: " + e.getMessage(), e);
}
} while (true);
-
+
// Return the acquired trigger list
return acquiredTriggers;
}
-
+
/**
*
* Inform the JobStore
that the scheduler no longer plans to
@@ -2879,17 +2873,17 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*/
public void releaseAcquiredTrigger(final OperableTrigger trigger) {
retryExecuteInNonManagedTXLock(
- LOCK_TRIGGER_ACCESS,
- new VoidTransactionCallback() {
- public void executeVoid(Connection conn) throws JobPersistenceException {
- releaseAcquiredTrigger(conn, trigger);
- }
- });
+ LOCK_TRIGGER_ACCESS,
+ new VoidTransactionCallback() {
+ public void executeVoid(Connection conn) throws JobPersistenceException {
+ releaseAcquiredTrigger(conn, trigger);
+ }
+ });
}
-
+
protected void releaseAcquiredTrigger(Connection conn,
- OperableTrigger trigger)
- throws JobPersistenceException {
+ OperableTrigger trigger)
+ throws JobPersistenceException {
try {
getDelegate().updateTriggerStateFromOtherState(conn,
trigger.getKey(), STATE_WAITING, STATE_ACQUIRED);
@@ -2906,10 +2900,10 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* given Trigger
(executing its associated Job
),
* that it had previously acquired (reserved).
*
- *
+ *
* @return null if the trigger or its job or calendar no longer exist, or
- * if the trigger was not successfully put into the 'executing'
- * state.
+ * if the trigger was not successfully put into the 'executing'
+ * state.
*/
@SuppressWarnings("unchecked")
public List triggersFired(final List triggers) throws JobPersistenceException {
@@ -2921,11 +2915,11 @@ public abstract class JobStoreSupport implements JobStore, Constants {
TriggerFiredResult result;
for (OperableTrigger trigger : triggers) {
try {
- TriggerFiredBundle bundle = triggerFired(conn, trigger);
- result = new TriggerFiredResult(bundle);
+ TriggerFiredBundle bundle = triggerFired(conn, trigger);
+ result = new TriggerFiredResult(bundle);
} catch (JobPersistenceException jpe) {
result = new TriggerFiredResult(jpe);
- } catch(RuntimeException re) {
+ } catch (RuntimeException re) {
result = new TriggerFiredResult(re);
}
results.add(result);
@@ -2959,8 +2953,8 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
protected TriggerFiredBundle triggerFired(Connection conn,
- OperableTrigger trigger)
- throws JobPersistenceException {
+ OperableTrigger trigger)
+ throws JobPersistenceException {
JobDetail job;
Calendar cal = null;
@@ -2978,7 +2972,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
try {
job = retrieveJob(conn, trigger.getJobKey());
- if (job == null) { return null; }
+ if (job == null) {
+ return null;
+ }
} catch (JobPersistenceException jpe) {
try {
getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
@@ -2992,7 +2988,9 @@ public abstract class JobStoreSupport implements JobStore, Constants {
if (trigger.getCalendarName() != null) {
cal = retrieveCalendar(conn, trigger.getCalendarName());
- if (cal == null) { return null; }
+ if (cal == null) {
+ return null;
+ }
}
try {
@@ -3009,7 +3007,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
String state = STATE_WAITING;
boolean force = true;
-
+
if (job.isConcurrentExectionDisallowed()) {
state = STATE_BLOCKED;
force = false;
@@ -3025,8 +3023,8 @@ public abstract class JobStoreSupport implements JobStore, Constants {
"Couldn't update states of blocked triggers: "
+ e.getMessage(), e);
}
- }
-
+ }
+
if (trigger.getNextFireTime() == null) {
state = STATE_COMPLETE;
force = true;
@@ -3053,28 +3051,28 @@ public abstract class JobStoreSupport implements JobStore, Constants {
public void triggeredJobComplete(final OperableTrigger trigger,
final JobDetail jobDetail, final CompletedExecutionInstruction triggerInstCode) {
retryExecuteInNonManagedTXLock(
- LOCK_TRIGGER_ACCESS,
- new VoidTransactionCallback() {
- public void executeVoid(Connection conn) throws JobPersistenceException {
- triggeredJobComplete(conn, trigger, jobDetail,triggerInstCode);
- }
- });
+ LOCK_TRIGGER_ACCESS,
+ new VoidTransactionCallback() {
+ public void executeVoid(Connection conn) throws JobPersistenceException {
+ triggeredJobComplete(conn, trigger, jobDetail, triggerInstCode);
+ }
+ });
}
-
+
protected void triggeredJobComplete(Connection conn,
- OperableTrigger trigger, JobDetail jobDetail,
- CompletedExecutionInstruction triggerInstCode) throws JobPersistenceException {
+ OperableTrigger trigger, JobDetail jobDetail,
+ CompletedExecutionInstruction triggerInstCode) throws JobPersistenceException {
try {
if (triggerInstCode == CompletedExecutionInstruction.DELETE_TRIGGER) {
- if(trigger.getNextFireTime() == null) {
+ if (trigger.getNextFireTime() == null) {
// double check for possible reschedule within job
// execution, which would cancel the need to delete...
TriggerStatus stat = getDelegate().selectTriggerStatus(
conn, trigger.getKey());
- if(stat != null && stat.getNextFireTime() == null) {
+ if (stat != null && stat.getNextFireTime() == null) {
removeTrigger(conn, trigger.getKey());
}
- } else{
+ } else {
removeTrigger(conn, trigger.getKey());
signalSchedulingChangeOnTxCompletion(0L);
}
@@ -3092,7 +3090,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
trigger.getJobKey(), STATE_COMPLETE);
signalSchedulingChangeOnTxCompletion(0L);
} else if (triggerInstCode == CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR) {
- getLog().info("All triggers of Job " +
+ getLog().info("All triggers of Job " +
trigger.getKey() + " set to ERROR state.");
getDelegate().updateTriggerStatesForJob(conn,
trigger.getJobKey(), STATE_ERROR);
@@ -3142,17 +3140,17 @@ public abstract class JobStoreSupport implements JobStore, Constants {
*
*/
protected DriverDelegate getDelegate() throws NoSuchDelegateException {
- synchronized(this) {
- if(null == delegate) {
+ synchronized (this) {
+ if (null == delegate) {
try {
- if(delegateClassName != null) {
+ if (delegateClassName != null) {
delegateClass = getClassLoadHelper().loadClass(delegateClassName, DriverDelegate.class);
}
delegate = delegateClass.newInstance();
-
+
delegate.initialize(getLog(), tablePrefix, instanceName, instanceId, getClassLoadHelper(), canUseProperties(), getDriverDelegateInitString());
-
+
} catch (InstantiationException e) {
throw new NoSuchDelegateException("Couldn't create delegate: "
+ e.getMessage(), e);
@@ -3185,24 +3183,24 @@ public abstract class JobStoreSupport implements JobStore, Constants {
Connection conn = getNonManagedTXConnection();
try {
RecoverMisfiredJobsResult result = RecoverMisfiredJobsResult.NO_OP;
-
+
// Before we make the potentially expensive call to acquire the
// trigger lock, peek ahead to see if it is likely we would find
// misfired triggers requiring recovery.
int misfireCount = (getDoubleCheckLockMisfireHandler()) ?
- getDelegate().countMisfiredTriggersInState(
- conn, STATE_WAITING, getMisfireTime()) :
- Integer.MAX_VALUE;
-
+ getDelegate().countMisfiredTriggersInState(
+ conn, STATE_WAITING, getMisfireTime()) :
+ Integer.MAX_VALUE;
+
if (misfireCount == 0) {
getLog().debug(
- "Found 0 triggers that missed their scheduled fire-time.");
+ "Found 0 triggers that missed their scheduled fire-time.");
} else {
transOwner = getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
-
+
result = recoverMisfiredJobs(conn, false);
}
-
+
commitConnection(conn);
return result;
} catch (JobPersistenceException e) {
@@ -3225,16 +3223,17 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
protected ThreadLocal sigChangeForTxCompletion = new ThreadLocal();
+
protected void signalSchedulingChangeOnTxCompletion(long candidateNewNextFireTime) {
Long sigTime = sigChangeForTxCompletion.get();
- if(sigTime == null && candidateNewNextFireTime >= 0L)
+ if (sigTime == null && candidateNewNextFireTime >= 0L)
sigChangeForTxCompletion.set(candidateNewNextFireTime);
else {
- if(sigTime == null || candidateNewNextFireTime < sigTime)
+ if (sigTime == null || candidateNewNextFireTime < sigTime)
sigChangeForTxCompletion.set(candidateNewNextFireTime);
}
}
-
+
protected Long clearAndGetSignalSchedulingChangeOnTxCompletion() {
Long t = sigChangeForTxCompletion.get();
sigChangeForTxCompletion.set(null);
@@ -3252,7 +3251,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
protected boolean firstCheckIn = true;
protected long lastCheckin = System.currentTimeMillis();
-
+
protected boolean doCheckin() throws JobPersistenceException {
boolean transOwner = false;
boolean transStateOwner = false;
@@ -3269,25 +3268,25 @@ public abstract class JobStoreSupport implements JobStore, Constants {
failedRecords = clusterCheckIn(conn);
commitConnection(conn);
}
-
+
if (firstCheckIn || (failedRecords.size() > 0)) {
getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
transStateOwner = true;
-
+
// Now that we own the lock, make sure we still have work to do.
// The first time through, we also need to make sure we update/create our state record
failedRecords = (firstCheckIn) ? clusterCheckIn(conn) : findFailedInstances(conn);
-
+
if (failedRecords.size() > 0) {
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
transOwner = true;
-
+
clusterRecover(conn, failedRecords);
recovered = true;
}
}
-
+
commitConnection(conn);
} catch (JobPersistenceException e) {
rollbackConnection(conn);
@@ -3314,16 +3313,16 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* This includes this scheduler if it is checking in for the first time.
*/
protected List findFailedInstances(Connection conn)
- throws JobPersistenceException {
+ throws JobPersistenceException {
try {
List failedInstances = new LinkedList();
boolean foundThisScheduler = false;
long timeNow = System.currentTimeMillis();
-
+
List states = getDelegate().selectSchedulerStateRecords(conn, null);
- for(SchedulerStateRecord rec: states) {
-
+ for (SchedulerStateRecord rec : states) {
+
// find own record...
if (rec.getSchedulerInstanceId().equals(getInstanceId())) {
foundThisScheduler = true;
@@ -3337,22 +3336,22 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
}
}
-
+
// The first time through, also check for orphaned fired triggers.
if (firstCheckIn) {
failedInstances.addAll(findOrphanedFailedInstances(conn, states));
}
-
+
// If not the first time but we didn't find our own instance, then
// Someone must have done recovery for us.
if ((!foundThisScheduler) && (!firstCheckIn)) {
// FUTURE_TODO: revisit when handle self-failed-out impl'ed (see FUTURE_TODO in clusterCheckIn() below)
getLog().warn(
- "This scheduler instance (" + getInstanceId() + ") is still " +
- "active but was recovered by another instance in the cluster. " +
- "This may cause inconsistent behavior.");
+ "This scheduler instance (" + getInstanceId() + ") is still " +
+ "active but was recovered by another instance in the cluster. " +
+ "This may cause inconsistent behavior.");
}
-
+
return failedInstances;
} catch (Exception e) {
lastCheckin = System.currentTimeMillis();
@@ -3360,64 +3359,64 @@ public abstract class JobStoreSupport implements JobStore, Constants {
+ e.getMessage(), e);
}
}
-
+
/**
* Create dummy SchedulerStateRecord
objects for fired triggers
* that have no scheduler state record. Checkin timestamp and interval are
* left as zero on these dummy SchedulerStateRecord
objects.
- *
+ *
* @param schedulerStateRecords List of all current SchedulerStateRecords
*/
private List findOrphanedFailedInstances(
- Connection conn,
+ Connection conn,
List schedulerStateRecords)
- throws SQLException, NoSuchDelegateException {
+ throws SQLException, NoSuchDelegateException {
List orphanedInstances = new ArrayList();
-
+
Set allFiredTriggerInstanceNames = getDelegate().selectFiredTriggerInstanceNames(conn);
if (!allFiredTriggerInstanceNames.isEmpty()) {
- for (SchedulerStateRecord rec: schedulerStateRecords) {
-
+ for (SchedulerStateRecord rec : schedulerStateRecords) {
+
allFiredTriggerInstanceNames.remove(rec.getSchedulerInstanceId());
}
-
- for (String inst: allFiredTriggerInstanceNames) {
-
+
+ for (String inst : allFiredTriggerInstanceNames) {
+
SchedulerStateRecord orphanedInstance = new SchedulerStateRecord();
orphanedInstance.setSchedulerInstanceId(inst);
-
+
orphanedInstances.add(orphanedInstance);
-
+
getLog().warn(
- "Found orphaned fired triggers for instance: " + orphanedInstance.getSchedulerInstanceId());
+ "Found orphaned fired triggers for instance: " + orphanedInstance.getSchedulerInstanceId());
}
}
-
+
return orphanedInstances;
}
-
+
protected long calcFailedIfAfter(SchedulerStateRecord rec) {
return rec.getCheckinTimestamp() +
- Math.max(rec.getCheckinInterval(),
- (System.currentTimeMillis() - lastCheckin)) +
- 7500L;
+ Math.max(rec.getCheckinInterval(),
+ (System.currentTimeMillis() - lastCheckin)) +
+ 7500L;
}
-
+
protected List clusterCheckIn(Connection conn)
- throws JobPersistenceException {
+ throws JobPersistenceException {
List failedInstances = findFailedInstances(conn);
-
+
try {
// FUTURE_TODO: handle self-failed-out
// check in...
lastCheckin = System.currentTimeMillis();
- if(getDelegate().updateSchedulerState(conn, getInstanceId(), lastCheckin) == 0) {
+ if (getDelegate().updateSchedulerState(conn, getInstanceId(), lastCheckin) == 0) {
getDelegate().insertSchedulerState(conn, getInstanceId(),
lastCheckin, getClusterCheckinInterval());
}
-
+
} catch (Exception e) {
throw new JobPersistenceException("Failure updating scheduler state when checking-in: "
+ e.getMessage(), e);
@@ -3428,7 +3427,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
@SuppressWarnings("ConstantConditions")
protected void clusterRecover(Connection conn, List failedInstances)
- throws JobPersistenceException {
+ throws JobPersistenceException {
if (failedInstances.size() > 0) {
@@ -3592,47 +3591,47 @@ public abstract class JobStoreSupport implements JobStore, Constants {
* any modified auto commit or transaction isolation connection
* attributes, and then closing the underlying connection.
*
- *
+ *
*
- * This is separate from closeConnection() because the Spring
+ * This is separate from closeConnection() because the Spring
* integration relies on being able to overload closeConnection() and
* expects the same connection back that it originally returned
- * from the datasource.
+ * from the datasource.
*
- *
+ *
* @see #closeConnection(Connection)
*/
protected void cleanupConnection(Connection conn) {
if (conn != null) {
if (conn instanceof Proxy) {
- Proxy connProxy = (Proxy)conn;
-
- InvocationHandler invocationHandler =
- Proxy.getInvocationHandler(connProxy);
+ Proxy connProxy = (Proxy) conn;
+
+ InvocationHandler invocationHandler =
+ Proxy.getInvocationHandler(connProxy);
if (invocationHandler instanceof AttributeRestoringConnectionInvocationHandler) {
AttributeRestoringConnectionInvocationHandler connHandler =
- (AttributeRestoringConnectionInvocationHandler)invocationHandler;
-
+ (AttributeRestoringConnectionInvocationHandler) invocationHandler;
+
connHandler.restoreOriginalAtributes();
closeConnection(connHandler.getWrappedConnection());
return;
}
}
-
+
// Wan't a Proxy, or was a Proxy, but wasn't ours.
closeConnection(conn);
}
}
-
-
+
+
/**
* Closes the supplied Connection
.
*
- * Ignores a null Connection
.
+ * Ignores a null Connection
.
* Any exception thrown trying to close the Connection
is
- * logged and ignored.
+ * logged and ignored.
*
- *
+ *
* @param conn The Connection
to close (Optional).
*/
protected void closeConnection(Connection conn) {
@@ -3643,18 +3642,18 @@ public abstract class JobStoreSupport implements JobStore, Constants {
getLog().error("Failed to close Connection", e);
} catch (Throwable e) {
getLog().error(
- "Unexpected exception closing Connection." +
- " This is often due to a Connection being returned after or during shutdown.", e);
+ "Unexpected exception closing Connection." +
+ " This is often due to a Connection being returned after or during shutdown.", e);
}
}
}
/**
* Rollback the supplied connection.
- *
- *
+ *
+ *
* Logs any SQLException it gets trying to rollback, but will not propogate
- * the exception lest it mask the exception that caused the caller to
+ * the exception lest it mask the exception that caused the caller to
* need to rollback in the first place.
*
*
@@ -3666,36 +3665,36 @@ public abstract class JobStoreSupport implements JobStore, Constants {
conn.rollback();
} catch (SQLException e) {
getLog().error(
- "Couldn't rollback jdbc connection. "+e.getMessage(), e);
+ "Couldn't rollback jdbc connection. " + e.getMessage(), e);
}
}
}
-
+
/**
* Commit the supplied connection
*
* @param conn (Optional)
* @throws JobPersistenceException thrown if a SQLException occurs when the
- * connection is committed
+ * connection is committed
*/
protected void commitConnection(Connection conn)
- throws JobPersistenceException {
+ throws JobPersistenceException {
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
throw new JobPersistenceException(
- "Couldn't commit jdbc connection. "+e.getMessage(), e);
+ "Couldn't commit jdbc connection. " + e.getMessage(), e);
}
}
}
-
+
/**
* Implement this interface to provide the code to execute within
* the a transaction template. If no return value is required, execute
* should just return null.
- *
+ *
* @see JobStoreSupport#executeInNonManagedTXLock(String, TransactionCallback)
* @see JobStoreSupport#executeInLock(String, TransactionCallback)
* @see JobStoreSupport#executeWithoutLock(TransactionCallback)
@@ -3707,11 +3706,11 @@ public abstract class JobStoreSupport implements JobStore, Constants {
protected interface TransactionValidator {
Boolean validate(Connection conn, T result) throws JobPersistenceException;
}
-
+
/**
* Implement this interface to provide the code to execute within
* the a transaction template that has no return value.
- *
+ *
* @see JobStoreSupport#executeInNonManagedTXLock(String, TransactionCallback)
*/
protected abstract class VoidTransactionCallback implements TransactionCallback {
@@ -3719,45 +3718,45 @@ public abstract class JobStoreSupport implements JobStore, Constants {
executeVoid(conn);
return null;
}
-
+
abstract void executeVoid(Connection conn) throws JobPersistenceException;
}
/**
- * Execute the given callback in a transaction. Depending on the JobStore,
- * the surrounding transaction may be assumed to be already present
- * (managed).
- *
+ * Execute the given callback in a transaction. Depending on the JobStore,
+ * the surrounding transaction may be assumed to be already present
+ * (managed).
+ *
*
* This method just forwards to executeInLock() with a null lockName.
*
- *
+ *
* @see #executeInLock(String, TransactionCallback)
*/
public T executeWithoutLock(
- TransactionCallback txCallback) throws JobPersistenceException {
+ TransactionCallback txCallback) throws JobPersistenceException {
return executeInLock(null, txCallback);
}
/**
* Execute the given callback having acquired the given lock.
- * Depending on the JobStore, the surrounding transaction may be
+ * Depending on the JobStore, the surrounding transaction may be
* assumed to be already present (managed).
- *
+ *
* @param lockName The name of the lock to acquire, for example
- * "TRIGGER_ACCESS". If null, then no lock is acquired, but the
- * lockCallback is still executed in a transaction.
+ * "TRIGGER_ACCESS". If null, then no lock is acquired, but the
+ * lockCallback is still executed in a transaction.
*/
protected abstract T executeInLock(
- String lockName,
- TransactionCallback txCallback) throws JobPersistenceException;
-
+ String lockName,
+ TransactionCallback txCallback) throws JobPersistenceException;
+
protected T retryExecuteInNonManagedTXLock(String lockName, TransactionCallback txCallback) {
for (int retry = 1; !shutdown; retry++) {
try {
return executeInNonManagedTXLock(lockName, txCallback, null);
} catch (JobPersistenceException jpe) {
- if(retry % 4 == 0) {
+ if (retry % 4 == 0) {
schedSignaler.notifySchedulerListenersError("An error occurred while " + txCallback, jpe);
}
} catch (RuntimeException e) {
@@ -3771,17 +3770,17 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
throw new IllegalStateException("JobStore is shutdown - aborting retry");
}
-
+
/**
* Execute the given callback having optionally acquired the given lock.
* This uses the non-managed transaction connection.
- *
+ *
* @param lockName The name of the lock to acquire, for example
- * "TRIGGER_ACCESS". If null, then no lock is acquired, but the
- * lockCallback is still executed in a non-managed transaction.
+ * "TRIGGER_ACCESS". If null, then no lock is acquired, but the
+ * lockCallback is still executed in a non-managed transaction.
*/
protected T executeInNonManagedTXLock(
- String lockName,
+ String lockName,
TransactionCallback txCallback, final TransactionValidator txValidator) throws JobPersistenceException {
boolean transOwner = false;
Connection conn = null;
@@ -3792,14 +3791,14 @@ public abstract class JobStoreSupport implements JobStore, Constants {
if (getLockHandler().requiresConnection()) {
conn = getNonManagedTXConnection();
}
-
+
transOwner = getLockHandler().obtainLock(conn, lockName);
}
-
+
if (conn == null) {
conn = getNonManagedTXConnection();
}
-
+
final T result = txCallback.execute(conn);
try {
commitConnection(conn);
@@ -3816,10 +3815,10 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
- if(sigTime != null && sigTime >= 0) {
+ if (sigTime != null && sigTime >= 0) {
signalSchedulingChangeImmediately(sigTime);
}
-
+
return result;
} catch (JobPersistenceException e) {
rollbackConnection(conn);
@@ -3836,7 +3835,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
}
}
}
-
+
/////////////////////////////////////////////////////////////////////////////
//
// ClusterManager Thread
@@ -3848,7 +3847,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
private volatile boolean shutdown = false;
private int numFails = 0;
-
+
ClusterManager() {
this.setPriority(Thread.NORM_PRIORITY + 2);
this.setName("QuartzScheduler_" + instanceName + "-" + instanceId + "_ClusterManager");
@@ -3876,10 +3875,10 @@ public abstract class JobStoreSupport implements JobStore, Constants {
numFails = 0;
getLog().debug("ClusterManager: Check-in complete.");
} catch (Exception e) {
- if(numFails % 4 == 0) {
+ if (numFails % 4 == 0) {
getLog().error(
- "ClusterManager: Error managing cluster: "
- + e.getMessage(), e);
+ "ClusterManager: Error managing cluster: "
+ + e.getMessage(), e);
}
numFails++;
}
@@ -3898,10 +3897,10 @@ public abstract class JobStoreSupport implements JobStore, Constants {
timeToSleep = 100L;
}
- if(numFails > 0) {
+ if (numFails > 0) {
timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
}
-
+
try {
Thread.sleep(timeToSleep);
} catch (Exception ignore) {
@@ -3927,7 +3926,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
private volatile boolean shutdown = false;
private int numFails = 0;
-
+
MisfireHandler() {
this.setName("QuartzScheduler_" + instanceName + "-" + instanceId + "_MisfireHandler");
@@ -3952,10 +3951,10 @@ public abstract class JobStoreSupport implements JobStore, Constants {
numFails = 0;
return res;
} catch (Exception e) {
- if(numFails % 4 == 0) {
+ if (numFails % 4 == 0) {
getLog().error(
- "MisfireHandler: Error handling misfires: "
- + e.getMessage(), e);
+ "MisfireHandler: Error handling misfires: "
+ + e.getMessage(), e);
}
numFails++;
}
@@ -3964,7 +3963,7 @@ public abstract class JobStoreSupport implements JobStore, Constants {
@Override
public void run() {
-
+
while (!shutdown) {
long sTime = System.currentTimeMillis();
@@ -3983,11 +3982,11 @@ public abstract class JobStoreSupport implements JobStore, Constants {
timeToSleep = 50l;
}
- if(numFails > 0) {
+ if (numFails > 0) {
timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
}
}
-
+
try {
Thread.sleep(timeToSleep);
} catch (Exception ignore) {