Browse Source

[Bug][Registry] Optimizing waiting strategy (#15223)

* [Improvement][Registry] Optimizing waiting strategy

Signed-off-by: Gallardot <gallardot@apache.org>
augit-log
Gallardot 5 months ago committed by GitHub
parent
commit
575b89e2f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
  2. 15
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
  3. 9
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java

@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.registry.api.StrategyType;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
import org.apache.dolphinscheduler.server.master.rpc.MasterRpcServer;
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
import java.time.Duration;
@ -51,8 +50,6 @@ public class MasterWaitingStrategy implements MasterConnectStrategy {
@Autowired
private RegistryClient registryClient;
@Autowired
private MasterRpcServer masterRPCServer;
@Autowired
private WorkflowEventQueue workflowEventQueue;
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@ -97,7 +94,6 @@ public class MasterWaitingStrategy implements MasterConnectStrategy {
} else {
try {
ServerLifeCycleManager.recoverFromWaiting();
reStartMasterResource();
log.info("Recover from waiting success, the current server status is {}",
ServerLifeCycleManager.getServerStatus());
} catch (Exception e) {
@ -117,9 +113,6 @@ public class MasterWaitingStrategy implements MasterConnectStrategy {
}
private void clearMasterResource() {
// close the worker resource, if close failed should stop the worker server
masterRPCServer.close();
log.warn("Master closed RPC server due to lost registry connection");
workflowEventQueue.clearWorkflowEventQueue();
log.warn("Master clear workflow event queue due to lost registry connection");
processInstanceExecCacheManager.clearCache();
@ -129,9 +122,4 @@ public class MasterWaitingStrategy implements MasterConnectStrategy {
}
private void reStartMasterResource() {
// reopen the resource, if reopen failed should stop the worker server
masterRPCServer.start();
log.warn("Master restarted RPC server due to reconnect to registry");
}
}

15
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java

@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.StrategyType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
@ -47,9 +46,6 @@ public class WorkerWaitingStrategy implements WorkerConnectStrategy {
@Autowired
private RegistryClient registryClient;
@Autowired
private WorkerRpcServer workerRpcServer;
@Autowired
private MessageRetryRunner messageRetryRunner;
@ -70,6 +66,7 @@ public class WorkerWaitingStrategy implements WorkerConnectStrategy {
throw new ServerLifeCycleException(
String.format("Waiting to reconnect to registry in %s failed", maxWaitingTime), ex);
}
} catch (ServerLifeCycleException e) {
String errorMessage = String.format(
"Disconnect from registry and change the current status to waiting error, the current server state is %s, will stop the current server",
@ -94,7 +91,6 @@ public class WorkerWaitingStrategy implements WorkerConnectStrategy {
} else {
try {
ServerLifeCycleManager.recoverFromWaiting();
reStartWorkerResource();
log.info("Recover from waiting success, the current server status is {}",
ServerLifeCycleManager.getServerStatus());
} catch (Exception e) {
@ -114,20 +110,11 @@ public class WorkerWaitingStrategy implements WorkerConnectStrategy {
}
private void clearWorkerResource() {
// close the worker resource, if close failed should stop the worker server
workerRpcServer.close();
log.warn("Worker server close the RPC server due to lost connection from registry");
workerManagerThread.clearTask();
WorkerTaskExecutorHolder.clear();
log.warn("Worker server clear the tasks due to lost connection from registry");
messageRetryRunner.clearMessage();
log.warn("Worker server clear the retry message due to lost connection from registry");
}
private void reStartWorkerResource() {
// reopen the resource, if reopen failed should stop the worker server
workerRpcServer.start();
log.warn("Worker server restart PRC server due to reconnect to registry");
}
}

9
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.runner.operator;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -57,6 +58,14 @@ public class TaskInstanceDispatchOperationFunction
LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
// check server status, if server is not running, return failed to reject this task
if (!ServerLifeCycleManager.isRunning()) {
log.error("server is not running. reject task: {}", taskExecutionContext.getProcessInstanceId());
return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
"server is not running");
}
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
WorkerTaskExecutor workerTaskExecutor = workerTaskExecutorFactoryBuilder

Loading…
Cancel
Save