You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
133 lines
4.2 KiB
133 lines
4.2 KiB
/** |
|
* Copyright (c) 2012-2019 Nikita Koksharov |
|
* |
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
* you may not use this file except in compliance with the License. |
|
* You may obtain a copy of the License at |
|
* |
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
* |
|
* Unless required by applicable law or agreed to in writing, software |
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
* See the License for the specific language governing permissions and |
|
* limitations under the License. |
|
*/ |
|
/** |
|
* Modified version of HashedWheelScheduler specially for timeouts handling. |
|
* Difference: |
|
* - handling old timeout with same key after adding new one |
|
* fixes multithreaded problem that appears in highly concurrent non-atomic sequence cancel() -> schedule() |
|
* |
|
* (c) Alim Akbashev, 2015-02-11 |
|
*/ |
|
|
|
package com.fr.third.socketio.scheduler; |
|
|
|
import io.netty.channel.ChannelHandlerContext; |
|
import io.netty.util.HashedWheelTimer; |
|
import io.netty.util.Timeout; |
|
import io.netty.util.TimerTask; |
|
import io.netty.util.internal.PlatformDependent; |
|
|
|
import java.util.concurrent.ConcurrentMap; |
|
import java.util.concurrent.ThreadFactory; |
|
import java.util.concurrent.TimeUnit; |
|
|
|
public class HashedWheelTimeoutScheduler implements CancelableScheduler { |
|
|
|
private final ConcurrentMap<SchedulerKey, Timeout> scheduledFutures = PlatformDependent.newConcurrentHashMap(); |
|
private final HashedWheelTimer executorService; |
|
|
|
private volatile ChannelHandlerContext ctx; |
|
|
|
public HashedWheelTimeoutScheduler() { |
|
executorService = new HashedWheelTimer(); |
|
} |
|
|
|
public HashedWheelTimeoutScheduler(ThreadFactory threadFactory) { |
|
executorService = new HashedWheelTimer(threadFactory); |
|
} |
|
|
|
@Override |
|
public void update(ChannelHandlerContext ctx) { |
|
this.ctx = ctx; |
|
} |
|
|
|
@Override |
|
public void cancel(SchedulerKey key) { |
|
Timeout timeout = scheduledFutures.remove(key); |
|
if (timeout != null) { |
|
timeout.cancel(); |
|
} |
|
} |
|
|
|
@Override |
|
public void schedule(final Runnable runnable, long delay, TimeUnit unit) { |
|
executorService.newTimeout(new TimerTask() { |
|
@Override |
|
public void run(Timeout timeout) throws Exception { |
|
runnable.run(); |
|
} |
|
}, delay, unit); |
|
} |
|
|
|
@Override |
|
public void scheduleCallback(final SchedulerKey key, final Runnable runnable, long delay, TimeUnit unit) { |
|
Timeout timeout = executorService.newTimeout(new TimerTask() { |
|
@Override |
|
public void run(Timeout timeout) throws Exception { |
|
ctx.executor().execute(new Runnable() { |
|
@Override |
|
public void run() { |
|
try { |
|
runnable.run(); |
|
} finally { |
|
scheduledFutures.remove(key); |
|
} |
|
} |
|
}); |
|
} |
|
}, delay, unit); |
|
|
|
replaceScheduledFuture(key, timeout); |
|
} |
|
|
|
@Override |
|
public void schedule(final SchedulerKey key, final Runnable runnable, long delay, TimeUnit unit) { |
|
Timeout timeout = executorService.newTimeout(new TimerTask() { |
|
@Override |
|
public void run(Timeout timeout) throws Exception { |
|
try { |
|
runnable.run(); |
|
} finally { |
|
scheduledFutures.remove(key); |
|
} |
|
} |
|
}, delay, unit); |
|
|
|
replaceScheduledFuture(key, timeout); |
|
} |
|
|
|
@Override |
|
public void shutdown() { |
|
executorService.stop(); |
|
} |
|
|
|
private void replaceScheduledFuture(final SchedulerKey key, final Timeout newTimeout) { |
|
final Timeout oldTimeout; |
|
|
|
if (newTimeout.isExpired()) { |
|
// no need to put already expired timeout to scheduledFutures map. |
|
// simply remove old timeout |
|
oldTimeout = scheduledFutures.remove(key); |
|
} else { |
|
oldTimeout = scheduledFutures.put(key, newTimeout); |
|
} |
|
|
|
// if there was old timeout, cancel it |
|
if (oldTimeout != null) { |
|
oldTimeout.cancel(); |
|
} |
|
} |
|
}
|
|
|