diff --git a/docker/build/conf/dolphinscheduler/master.properties.tpl b/docker/build/conf/dolphinscheduler/master.properties.tpl index 98ca3ddd26..5d130fa411 100644 --- a/docker/build/conf/dolphinscheduler/master.properties.tpl +++ b/docker/build/conf/dolphinscheduler/master.properties.tpl @@ -47,4 +47,6 @@ master.reserved.memory=${MASTER_RESERVED_MEMORY} # master failover interval minutes master.failover.interval=${MASTER_FAILOVER_INTERVAL} # master kill yarn job when handle failover -master.kill.yarn.job.when.handle.failover=${MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER} \ No newline at end of file +master.kill.yarn.job.when.handle.failover=${MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER} +# master.persist.event.state.threads +master.persist.event.state.threads=${MASTER_PERSIST_EVENT_STATE_THREADS} \ No newline at end of file diff --git a/docker/kubernetes/dolphinscheduler/values.yaml b/docker/kubernetes/dolphinscheduler/values.yaml index f0df9e7859..52431f9cad 100644 --- a/docker/kubernetes/dolphinscheduler/values.yaml +++ b/docker/kubernetes/dolphinscheduler/values.yaml @@ -166,6 +166,7 @@ master: ORG_QUARTZ_THREADPOOL_THREADCOUNT: "25" ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT: "1" SESSION_TIMEOUT_MS: 60000 + MASTER_PERSIST_EVENT_STATE_THREADS: 10 ## Periodic probe of container liveness. Container will be restarted if the probe fails. Cannot be updated. ## More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes livenessProbe: diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 9e52d60401..8e7401be88 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -532,7 +532,8 @@ CREATE TABLE `t_ds_process_task_relation` ( `condition_params` text COMMENT 'condition params(json)', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime NOT NULL COMMENT 'update time', - PRIMARY KEY (`id`) + PRIMARY KEY (`id`), + KEY `project_code_process_definition_code_index` (`project_code`,`process_definition_code`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; -- ---------------------------- diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index 5c02006eac..7bb8a9bac9 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -440,6 +440,8 @@ CREATE TABLE t_ds_process_task_relation ( PRIMARY KEY (id) ) ; +create index project_code_process_definition_code_index on t_ds_process_task_relation (project_code,process_definition_code); + DROP TABLE IF EXISTS t_ds_process_task_relation_log; CREATE TABLE t_ds_process_task_relation_log ( id int NOT NULL , diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql index 2525b3ce8e..89d5c53c59 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql @@ -17,6 +17,7 @@ SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY','')); + -- uc_dolphin_T_t_ds_process_instance_A_restart_time drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_instance_A_restart_time; delimiter d// @@ -35,4 +36,25 @@ d// delimiter ; CALL uc_dolphin_T_t_ds_process_instance_A_restart_time(); -DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_restart_time; \ No newline at end of file +DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_restart_time; + + +-- uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index +drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index; +delimiter d// +CREATE PROCEDURE uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index() +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.STATISTICS + WHERE TABLE_NAME='t_ds_process_task_relation' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND INDEX_NAME ='project_code_process_definition_code_index') + THEN +ALTER TABLE `t_ds_process_task_relation` ADD KEY `project_code_process_definition_code_index`(`project_code`,`process_definition_code`) USING BTREE; +END IF; +END; + +d// + +delimiter ; +CALL uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index(); +DROP PROCEDURE uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index; \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql index d26cf8e3b8..75be01f0c1 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql @@ -29,6 +29,9 @@ BEGIN v_schema =current_schema(); EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_process_instance ADD COLUMN IF NOT EXISTS "restart_time" timestamp DEFAULT NULL'; + +EXECUTE 'CREATE INDEX IF NOT EXISTS project_code_process_definition_code_index ON ' || quote_ident(v_schema) ||'.t_ds_process_task_relation USING Btree("project_code","process_definition_code")'; + return 'Success!'; exception when others then ---Raise EXCEPTION '(%)',SQLERRM; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index 13a68c40d8..b7e564296e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -66,9 +66,12 @@ public class MasterConfig { @Value("${master.failover.interval:10}") private int failoverInterval; - @Value("${master.kill.yarn.job.when.handle.fail.over:true}") + @Value("${master.kill.yarn.job.when.handle.failover:true}") private boolean masterKillYarnJobWhenHandleFailOver; + @Value("${master.persist.event.state.threads:10}") + private int masterPersistEventStateThreads; + public int getListenPort() { return listenPort; } @@ -183,4 +186,12 @@ public class MasterConfig { public void setMasterKillYarnJobWhenHandleFailOver(boolean masterKillYarnJobWhenHandleFailOver) { this.masterKillYarnJobWhenHandleFailOver = masterKillYarnJobWhenHandleFailOver; } + + public int getMasterPersistEventStateThreads() { + return masterPersistEventStateThreads; + } + + public void setMasterPersistEventStateThreads(int masterPersistEventStateThreads) { + this.masterPersistEventStateThreads = masterPersistEventStateThreads; + } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java new file mode 100644 index 0000000000..621dd79c74 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.processor.queue; + +import org.apache.dolphinscheduler.common.enums.Event; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.StateEvent; +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand; +import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.Channel; + +public class TaskResponsePersistThread implements Runnable { + + /** + * logger of TaskResponsePersistThread + */ + private static final Logger logger = LoggerFactory.getLogger(TaskResponsePersistThread.class); + + private final ConcurrentLinkedQueue events = new ConcurrentLinkedQueue<>(); + + private final Integer processInstanceId; + + /** + * process service + */ + private ProcessService processService; + + private ConcurrentHashMap processInstanceMapper; + + public TaskResponsePersistThread(ProcessService processService, + ConcurrentHashMap processInstanceMapper, + Integer processInstanceId) { + this.processService = processService; + this.processInstanceMapper = processInstanceMapper; + this.processInstanceId = processInstanceId; + } + + @Override + public void run() { + while (!this.events.isEmpty()) { + TaskResponseEvent event = this.events.peek(); + try { + boolean result = persist(event); + if (!result) { + logger.error("persist meta error, task id:{}, instance id:{}", event.getTaskInstanceId(), event.getProcessInstanceId()); + } + } catch (Exception e) { + logger.error("persist error, task id:{}, instance id:{}", event.getTaskInstanceId(), event.getProcessInstanceId(), e); + } finally { + this.events.remove(event); + } + } + } + + /** + * persist taskResponseEvent + * + * @param taskResponseEvent taskResponseEvent + */ + private boolean persist(TaskResponseEvent taskResponseEvent) { + Event event = taskResponseEvent.getEvent(); + Channel channel = taskResponseEvent.getChannel(); + + TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); + + boolean result = true; + + switch (event) { + case ACK: + try { + if (taskInstance != null) { + ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState(); + processService.changeTaskState(taskInstance, status, + taskResponseEvent.getStartTime(), + taskResponseEvent.getWorkerAddress(), + taskResponseEvent.getExecutePath(), + taskResponseEvent.getLogPath(), + taskResponseEvent.getTaskInstanceId()); + logger.debug("changeTaskState in ACK , changed in meta:{} ,task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}", + result, taskInstance.getState(), taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost()); + } + // if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success + DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); + channel.writeAndFlush(taskAckCommand.convert2Command()); + logger.debug("worker ack master success, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost()); + } catch (Exception e) { + result = false; + logger.error("worker ack master error", e); + DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance == null ? -1 : taskInstance.getId()); + channel.writeAndFlush(taskAckCommand.convert2Command()); + } + break; + case RESULT: + try { + if (taskInstance != null) { + result = processService.changeTaskState(taskInstance, taskResponseEvent.getState(), + taskResponseEvent.getEndTime(), + taskResponseEvent.getProcessId(), + taskResponseEvent.getAppIds(), + taskResponseEvent.getTaskInstanceId(), + taskResponseEvent.getVarPool() + ); + logger.debug("changeTaskState in RESULT , changed in meta:{} task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}", + result, taskInstance.getState(), taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost()); + } + if (!result) { + DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), taskResponseEvent.getTaskInstanceId()); + channel.writeAndFlush(taskResponseCommand.convert2Command()); + logger.debug("worker response master failure, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost()); + } else { + // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success + DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); + channel.writeAndFlush(taskResponseCommand.convert2Command()); + logger.debug("worker response master success, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost()); + } + } catch (Exception e) { + result = false; + logger.error("worker response master error", e); + DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1); + channel.writeAndFlush(taskResponseCommand.convert2Command()); + } + break; + default: + throw new IllegalArgumentException("invalid event type : " + event); + } + + WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId()); + if (workflowExecuteThread != null) { + StateEvent stateEvent = new StateEvent(); + stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId()); + stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId()); + stateEvent.setExecutionStatus(taskResponseEvent.getState()); + stateEvent.setType(StateEventType.TASK_STATE_CHANGE); + workflowExecuteThread.addStateEvent(stateEvent); + } + return result; + } + + public boolean addEvent(TaskResponseEvent event) { + if (event.getProcessInstanceId() != this.processInstanceId) { + logger.info("event would be abounded, task instance id:{}, process instance id:{}, this.processInstanceId:{}", + event.getTaskInstanceId(), event.getProcessInstanceId(), this.processInstanceId); + return false; + } + return this.events.add(event); + } + + public int eventSize() { + return this.events.size(); + } + + public boolean isEmpty() { + return this.events.isEmpty(); + } + + public Integer getProcessInstanceId() { + return processInstanceId; + } + + public String getKey() { + return String.valueOf(processInstanceId); + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index a320a70ec4..5ef235089e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -17,22 +17,18 @@ package org.apache.dolphinscheduler.server.master.processor.queue; -import org.apache.dolphinscheduler.common.enums.Event; -import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.enums.StateEvent; -import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand; -import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.service.process.ProcessService; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -42,7 +38,11 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import io.netty.channel.Channel; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; /** * task manager @@ -66,37 +66,56 @@ public class TaskResponseService { @Autowired private ProcessService processService; + @Autowired + private MasterConfig masterConfig; + /** * task response worker */ private Thread taskResponseWorker; - private ConcurrentHashMap processInstanceMapper; + /** + * event handler + */ + private Thread taskResponseEventHandler; + + private ConcurrentHashMap processInstanceMap; + + private final ConcurrentHashMap taskResponseEventHandlerMap = new ConcurrentHashMap<>(); + + private ListeningExecutorService listeningExecutorService; + + private ExecutorService eventExecService; + + /** + * task response mapper + */ + private final ConcurrentHashMap processTaskResponseMap = new ConcurrentHashMap<>(); - public void init(ConcurrentHashMap processInstanceMapper) { - if (this.processInstanceMapper == null) { - this.processInstanceMapper = processInstanceMapper; + public void init(ConcurrentHashMap processInstanceMap) { + if (this.processInstanceMap == null) { + this.processInstanceMap = processInstanceMap; } } @PostConstruct public void start() { + eventExecService = ThreadUtils.newDaemonFixedThreadExecutor("PersistEventState", masterConfig.getMasterPersistEventStateThreads()); + this.listeningExecutorService = MoreExecutors.listeningDecorator(eventExecService); this.taskResponseWorker = new TaskResponseWorker(); - this.taskResponseWorker.setName("StateEventResponseWorker"); + this.taskResponseWorker.setName("TaskResponseWorker"); this.taskResponseWorker.start(); + this.taskResponseEventHandler = new TaskResponseEventHandler(); + this.taskResponseEventHandler.setName("TaskResponseEventHandler"); + this.taskResponseEventHandler.start(); } @PreDestroy public void stop() { try { this.taskResponseWorker.interrupt(); - if (!eventQueue.isEmpty()) { - List remainEvents = new ArrayList<>(eventQueue.size()); - eventQueue.drainTo(remainEvents); - for (TaskResponseEvent event : remainEvents) { - this.persist(event); - } - } + this.taskResponseEventHandler.interrupt(); + this.eventExecService.shutdown(); } catch (Exception e) { logger.error("stop error:", e); } @@ -124,12 +143,26 @@ public class TaskResponseService { @Override public void run() { - while (Stopper.isRunning()) { try { // if not task , blocking here TaskResponseEvent taskResponseEvent = eventQueue.take(); - persist(taskResponseEvent); + if (processInstanceMap.containsKey(taskResponseEvent.getProcessInstanceId()) + && !processTaskResponseMap.containsKey(taskResponseEvent.getProcessInstanceId())) { + TaskResponsePersistThread taskResponsePersistThread = new TaskResponsePersistThread( + processService, processInstanceMap, taskResponseEvent.getProcessInstanceId()); + processTaskResponseMap.put(taskResponseEvent.getProcessInstanceId(), taskResponsePersistThread); + } + TaskResponsePersistThread taskResponsePersistThread = processTaskResponseMap.get(taskResponseEvent.getProcessInstanceId()); + if (null != taskResponsePersistThread) { + if (taskResponsePersistThread.addEvent(taskResponseEvent)) { + logger.debug("submit task response persist queue success, task instance id:{},process instance id:{}, state:{} ", + taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getProcessInstanceId(), taskResponseEvent.getState()); + } else { + logger.error("submit task response persist queue error, task instance id:{},process instance id:{} ", + taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getProcessInstanceId()); + } + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; @@ -142,84 +175,72 @@ public class TaskResponseService { } /** - * persist taskResponseEvent - * - * @param taskResponseEvent taskResponseEvent + * event handler thread */ - private void persist(TaskResponseEvent taskResponseEvent) { - Event event = taskResponseEvent.getEvent(); - Channel channel = taskResponseEvent.getChannel(); + class TaskResponseEventHandler extends Thread { - TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); - switch (event) { - case ACK: + @Override + public void run() { + logger.info("event handler thread started"); + while (Stopper.isRunning()) { try { - if (taskInstance != null) { - ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState(); - boolean result = processService.changeTaskState(taskInstance, status, - taskResponseEvent.getStartTime(), - taskResponseEvent.getWorkerAddress(), - taskResponseEvent.getExecutePath(), - taskResponseEvent.getLogPath(), - taskResponseEvent.getTaskInstanceId()); - logger.debug("changeTaskState in ACK , changed in meta:{} ,task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}", - result, taskInstance.getState(), taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost()); - } - // if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success - DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); - channel.writeAndFlush(taskAckCommand.convert2Command()); - logger.debug("worker ack master success, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost()); + eventHandler(); + + TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; } catch (Exception e) { - logger.error("worker ack master error", e); - DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance == null ? -1 : taskInstance.getId()); - channel.writeAndFlush(taskAckCommand.convert2Command()); + logger.error("event handler thread error", e); } - break; - case RESULT: - try { - boolean result = true; - if (taskInstance != null) { - result = processService.changeTaskState(taskInstance, taskResponseEvent.getState(), - taskResponseEvent.getEndTime(), - taskResponseEvent.getProcessId(), - taskResponseEvent.getAppIds(), - taskResponseEvent.getTaskInstanceId(), - taskResponseEvent.getVarPool() - ); - logger.debug("changeTaskState in RESULT , changed in meta:{} task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}", - result, taskInstance.getState(), taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost()); - } - if (!result) { - DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), taskResponseEvent.getTaskInstanceId()); - channel.writeAndFlush(taskResponseCommand.convert2Command()); - logger.debug("worker response master failure, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost()); - } else { - // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success - DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); - channel.writeAndFlush(taskResponseCommand.convert2Command()); - logger.debug("worker response master success, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost()); + } + } + + private void eventHandler() { + + for (TaskResponsePersistThread taskResponsePersistThread: processTaskResponseMap.values()) { + + if (taskResponseEventHandlerMap.containsKey(taskResponsePersistThread.getKey())) { + continue; + } + if (taskResponsePersistThread.eventSize() == 0) { + if (!processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId())) { + processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId()); + logger.info("remove process instance: {}", taskResponsePersistThread.getProcessInstanceId()); } - } catch (Exception e) { - logger.error("worker response master error", e); - DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1); - channel.writeAndFlush(taskResponseCommand.convert2Command()); + continue; } - break; - default: - throw new IllegalArgumentException("invalid event type : " + event); - } - WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId()); - if (workflowExecuteThread != null) { - StateEvent stateEvent = new StateEvent(); - stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId()); - stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId()); - stateEvent.setExecutionStatus(taskResponseEvent.getState()); - stateEvent.setType(StateEventType.TASK_STATE_CHANGE); - workflowExecuteThread.addStateEvent(stateEvent); + logger.info("already exists handler process size:{}", taskResponseEventHandlerMap.size()); + taskResponseEventHandlerMap.put(taskResponsePersistThread.getKey(), taskResponsePersistThread); + + ListenableFuture future = listeningExecutorService.submit(taskResponsePersistThread); + FutureCallback futureCallback = new FutureCallback() { + @Override + public void onSuccess(Object o) { + logger.info("persist events {} succeeded.", taskResponsePersistThread.getProcessInstanceId()); + if (!processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId())) { + processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId()); + logger.info("remove process instance: {}", taskResponsePersistThread.getProcessInstanceId()); + } + taskResponseEventHandlerMap.remove(taskResponsePersistThread.getKey()); + } + + @Override + public void onFailure(Throwable throwable) { + logger.error("persist events failed: {}", throwable); + if (!processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId())) { + processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId()); + logger.info("remove process instance: {}", taskResponsePersistThread.getProcessInstanceId()); + } + taskResponseEventHandlerMap.remove(taskResponsePersistThread.getKey()); + } + }; + Futures.addCallback(future, futureCallback, listeningExecutorService); + } } } public BlockingQueue getEventQueue() { return eventQueue; } -} +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index aaf1d90908..425d6e97ee 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -436,13 +436,6 @@ public class MasterRegistryClient { continue; } - if (serverStartupTime != null && processInstance.getRestartTime() != null - && processInstance.getRestartTime().after(serverStartupTime)) { - continue; - } - - logger.info("failover process instance id: {}", processInstance.getId()); - List validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); for (TaskInstance taskInstance : validTaskInstanceList) { if (Constants.NULL.equals(taskInstance.getHost())) { @@ -457,6 +450,13 @@ public class MasterRegistryClient { logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); failoverTaskInstance(processInstance, taskInstance); } + + if (serverStartupTime != null && processInstance.getRestartTime() != null + && processInstance.getRestartTime().after(serverStartupTime)) { + continue; + } + + logger.info("failover process instance id: {}", processInstance.getId()); //updateProcessInstance host is null and insert into command processService.processNeedFailoverProcessInstances(processInstance); } @@ -576,8 +576,8 @@ public class MasterRegistryClient { /** * get local address */ - private String getLocalAddress() { + public String getLocalAddress() { return NetUtils.getAddr(masterConfig.getListenPort()); } -} +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java index 81d02a9050..770062f3f1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.commons.collections4.CollectionUtils; +import java.util.Iterator; import java.util.List; import org.slf4j.Logger; @@ -66,10 +67,12 @@ public class FailoverExecuteThread extends Thread { while (Stopper.isRunning()) { logger.info("failover execute started"); try { - List hosts = processService.queryNeedFailoverProcessInstanceHost(); + List hosts = getNeedFailoverMasterServers(); if (CollectionUtils.isEmpty(hosts)) { continue; } + logger.info("need failover hosts:{}", hosts); + for (String host : hosts) { String failoverPath = masterRegistryClient.getFailoverLockPath(NodeType.MASTER, host); try { @@ -88,4 +91,20 @@ public class FailoverExecuteThread extends Thread { } } } + + private List getNeedFailoverMasterServers() { + // failover myself && failover dead masters + List hosts = processService.queryNeedFailoverProcessInstanceHost(); + + Iterator iterator = hosts.iterator(); + while (iterator.hasNext()) { + String host = iterator.next(); + if (registryClient.checkNodeExists(host, NodeType.MASTER)) { + if (!host.equals(masterRegistryClient.getLocalAddress())) { + iterator.remove(); + } + } + } + return hosts; + } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index 878446c30c..d787d0c2dc 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.processor.queue; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.Date; @@ -34,12 +35,15 @@ import org.mockito.junit.MockitoJUnitRunner; import io.netty.channel.Channel; -@RunWith(MockitoJUnitRunner.class) +@RunWith(MockitoJUnitRunner.Silent.class) public class TaskResponseServiceTest { @Mock(name = "processService") private ProcessService processService; + @Mock + private MasterConfig masterConfig; + @InjectMocks TaskResponseService taskRspService; @@ -54,6 +58,7 @@ public class TaskResponseServiceTest { @Before public void before() { + Mockito.when(masterConfig.getMasterPersistEventStateThreads()).thenReturn(10); taskRspService.start(); ackEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION,