Browse Source

remove unnecessary lock & fix syncWorkerHostWeight variable reference bug

pull/2/head
gabry 5 years ago
parent
commit
1864f87c84
  1. 26
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java

26
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java

@ -36,8 +36,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.dolphinscheduler.common.Constants.COMMA; import static org.apache.dolphinscheduler.common.Constants.COMMA;
@ -70,11 +68,6 @@ public class LowerWeightHostManager extends CommonHostManager {
*/ */
private ConcurrentHashMap<String, Set<HostWeight>> workerHostWeights; private ConcurrentHashMap<String, Set<HostWeight>> workerHostWeights;
/**
* worker group host lock
*/
private Lock lock;
/** /**
* executor service * executor service
*/ */
@ -84,7 +77,6 @@ public class LowerWeightHostManager extends CommonHostManager {
public void init(){ public void init(){
this.selector = new LowerWeightRoundRobin(); this.selector = new LowerWeightRoundRobin();
this.workerHostWeights = new ConcurrentHashMap<>(); this.workerHostWeights = new ConcurrentHashMap<>();
this.lock = new ReentrantLock();
this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor")); this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor"));
this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),35, 40, TimeUnit.SECONDS); this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),35, 40, TimeUnit.SECONDS);
this.roundRobinHostManager = new RoundRobinHostManager(); this.roundRobinHostManager = new RoundRobinHostManager();
@ -116,23 +108,13 @@ public class LowerWeightHostManager extends CommonHostManager {
throw new UnsupportedOperationException("not support"); throw new UnsupportedOperationException("not support");
} }
private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights){ private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights) {
lock.lock(); this.workerHostWeights.clear();
try { this.workerHostWeights.putAll(workerHostWeights);
workerHostWeights.clear();
workerHostWeights.putAll(workerHostWeights);
} finally {
lock.unlock();
}
} }
private Set<HostWeight> getWorkerHostWeights(String workerGroup){ private Set<HostWeight> getWorkerHostWeights(String workerGroup) {
lock.lock();
try {
return workerHostWeights.get(workerGroup); return workerHostWeights.get(workerGroup);
} finally {
lock.unlock();
}
} }
class RefreshResourceTask implements Runnable{ class RefreshResourceTask implements Runnable{

Loading…
Cancel
Save