From 8808c0a700d367a1d408c1ec07c2a6fbeb675d33 Mon Sep 17 00:00:00 2001 From: wind Date: Tue, 28 Dec 2021 19:32:10 +0800 Subject: [PATCH] [Bug-7474][MasterServer] fix failover when node host is null (#7475) * fix failover when node host is null * add failover execute thread * worker handle dead server * fix task instance failover time check * fix upgrade sql * failover logic update Co-authored-by: caishunfeng <534328519@qq.com> --- .../dolphinscheduler/common/Constants.java | 5 + .../dao/entity/ProcessInstance.java | 18 ++ .../dao/mapper/ProcessInstanceMapper.java | 7 + .../dao/mapper/ProcessInstanceMapper.xml | 14 +- .../resources/sql/dolphinscheduler_h2.sql | 1 + .../resources/sql/dolphinscheduler_mysql.sql | 1 + .../sql/dolphinscheduler_postgresql.sql | 1 + .../mysql/dolphinscheduler_ddl.sql | 38 +++ .../mysql/dolphinscheduler_dml.sql | 16 ++ .../postgresql/dolphinscheduler_ddl.sql | 41 ++++ .../postgresql/dolphinscheduler_dml.sql | 16 ++ .../server/master/MasterServer.java | 5 + .../server/master/config/MasterConfig.java | 18 ++ .../master/registry/MasterRegistryClient.java | 220 ++++++++++++------ .../registry/MasterRegistryDataListener.java | 4 +- .../master/runner/FailoverExecuteThread.java | 110 +++++++++ .../master/runner/WorkflowExecuteThread.java | 1 + .../src/main/resources/application.yaml | 4 + .../registry/MasterRegistryClientTest.java | 8 +- .../registry/api/Registry.java | 3 + .../registry/zookeeper/ZookeeperRegistry.java | 6 + .../service/process/ProcessService.java | 7 + .../service/registry/RegistryClient.java | 5 + .../src/main/resources/application.yaml | 4 + 24 files changed, 479 insertions(+), 74 deletions(-) create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_dml.sql create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_dml.sql create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index aba6011832..af4938f4dc 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -209,6 +209,11 @@ public final class Constants { */ public static final int SOCKET_TIMEOUT = 60 * 1000; + /** + * registry session timeout + */ + public static final int REGISTRY_SESSION_TIMEOUT = 10 * 1000; + /** * http header */ diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index 265eb34bb5..71c466bc3b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -249,6 +249,12 @@ public class ProcessInstance { */ private int dryRun; + /** + * re-start time + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date restartTime; + public ProcessInstance() { } @@ -521,6 +527,14 @@ public class ProcessInstance { this.dryRun = dryRun; } + public Date getRestartTime() { + return restartTime; + } + + public void setRestartTime(Date restartTime) { + this.restartTime = restartTime; + } + /** * add command to history * @@ -689,6 +703,10 @@ public class ProcessInstance { + ", dryRun='" + dryRun + '\'' + + '}' + + ", restartTime='" + + restartTime + + '\'' + '}'; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index 6dbbdea8a8..6bfb4cfd37 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -53,6 +53,13 @@ public interface ProcessInstanceMapper extends BaseMapper { List queryByHostAndStatus(@Param("host") String host, @Param("states") int[] stateArray); + /** + * query process instance host by stateArray + * @param stateArray + * @return + */ + List queryNeedFailoverProcessInstanceHost(@Param("states") int[] stateArray); + /** * query process instance by tenantId and stateArray * diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index 5d718d3bf8..8910bf0532 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -23,7 +23,8 @@ command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type, warning_group_id, schedule_time, command_start_time, global_params, flag, update_time, is_sub_process, executor_id, history_cmd, - process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool, dry_run,next_process_instance_id + process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool, + dry_run, next_process_instance_id, restart_time + select instance.id, instance.command_type, instance.executor_id, instance.process_definition_version, instance.process_definition_code, instance.name, instance.state, instance.schedule_time, instance.start_time, - instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run ,instance.next_process_instance_id + instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run ,instance.next_process_instance_id, + restart_time from t_ds_process_instance instance join t_ds_process_definition define ON instance.process_definition_code = define.code where instance.is_sub_process=0 diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index a4faab6f45..40a21ea92a 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -605,6 +605,7 @@ CREATE TABLE t_ds_process_instance tenant_id int(11) NOT NULL DEFAULT '-1', var_pool longtext, dry_run int NULL DEFAULT 0, + restart_time datetime DEFAULT NULL, PRIMARY KEY (id) ); diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index d67e013479..21556187f6 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -603,6 +603,7 @@ CREATE TABLE `t_ds_process_instance` ( `var_pool` longtext COMMENT 'var_pool', `dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flagļ¼š0 normal, 1 dry run', `next_process_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next processInstanceId', + `restart_time` datetime DEFAULT NULL COMMENT 'process instance restart time', PRIMARY KEY (`id`), KEY `process_instance_index` (`process_definition_code`,`id`) USING BTREE, KEY `start_time_index` (`start_time`,`end_time`) USING BTREE diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index 557c664c12..f6c5df1ac4 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -529,6 +529,7 @@ CREATE TABLE t_ds_process_instance ( var_pool text , dry_run int DEFAULT '0' , next_process_instance_id int DEFAULT '0', + restart_time timestamp DEFAULT NULL , PRIMARY KEY (id) ) ; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql new file mode 100644 index 0000000000..e492693b8a --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY','')); + +-- uc_dolphin_T_t_ds_process_instance_A_restart_time +drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_instance_A_restart_time; +delimiter d// +CREATE PROCEDURE uc_dolphin_T_t_ds_process_instance_A_restart_time() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_process_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='restart_time') + THEN + ALTER TABLE t_ds_process_instance ADD COLUMN `restart_time` datetime DEFAULT NULL COMMENT 'process instance restart time'; + END IF; + END; + +d// + +delimiter ; +CALL uc_dolphin_T_t_ds_process_instance_A_restart_time(); +DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_restart_time; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_dml.sql new file mode 100644 index 0000000000..38964cc551 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_dml.sql @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql new file mode 100644 index 0000000000..79be113825 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +delimiter d// +CREATE OR REPLACE FUNCTION public.dolphin_update_metadata( + ) + RETURNS character varying + LANGUAGE 'plpgsql' + COST 100 + VOLATILE PARALLEL UNSAFE +AS $BODY$ +DECLARE + v_schema varchar; +BEGIN + ---get schema name + v_schema =current_schema(); + + EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_process_instance ADD COLUMN IF NOT EXISTS "restart_time" timestamp DEFAULT NULL'; + return 'Success!'; + exception when others then + ---Raise EXCEPTION '(%)',SQLERRM; + return SQLERRM; +END; +$BODY$; + +select dolphin_update_metadata(); + +d// diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_dml.sql new file mode 100644 index 0000000000..38964cc551 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_dml.sql @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ \ No newline at end of file diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 11b0b799a6..65b03dc502 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProce import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; +import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -90,6 +91,9 @@ public class MasterServer implements IStoppable { @Autowired private EventExecuteService eventExecuteService; + @Autowired + private FailoverExecuteThread failoverExecuteThread; + public static void main(String[] args) { Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER); SpringApplication.run(MasterServer.class); @@ -122,6 +126,7 @@ public class MasterServer implements IStoppable { this.masterSchedulerService.start(); this.eventExecuteService.start(); + this.failoverExecuteThread.start(); this.scheduler.start(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index a76dd5e066..e1366bcfb2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -40,6 +40,8 @@ public class MasterConfig { private double maxCpuLoadAvg; private double reservedMemory; private boolean taskLogger; + private int failoverInterval; + private boolean killYarnJobWhenTaskFailover; public int getListenPort() { return listenPort; @@ -144,4 +146,20 @@ public class MasterConfig { public void setTaskLogger(boolean taskLogger) { this.taskLogger = taskLogger; } + + public int getFailoverInterval() { + return failoverInterval; + } + + public void setFailoverInterval(int failoverInterval) { + this.failoverInterval = failoverInterval; + } + + public boolean isKillYarnJobWhenTaskFailover() { + return killYarnJobWhenTaskFailover; + } + + public void setKillYarnJobWhenTaskFailover(boolean killYarnJobWhenTaskFailover) { + this.killYarnJobWhenTaskFailover = killYarnJobWhenTaskFailover; + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index 58a0b6a3fe..d97c8c241e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -43,8 +43,10 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import org.apache.dolphinscheduler.service.registry.RegistryClient; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang.StringUtils; +import java.time.Duration; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -113,7 +115,6 @@ public class MasterRegistryClient { String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS; try { // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/startup-masters - registryClient.getLock(nodeLock); // master registry registry(); @@ -126,11 +127,6 @@ public class MasterRegistryClient { ThreadUtils.sleep(SLEEP_TIME_MILLIS); } - // self tolerant - if (registryClient.getActiveMasterNum() == 1) { - removeNodePath(null, NodeType.MASTER, true); - removeNodePath(null, NodeType.WORKER, true); - } registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener()); } catch (Exception e) { logger.error("master start up exception", e); @@ -149,18 +145,57 @@ public class MasterRegistryClient { } /** - * remove zookeeper node path + * remove master node path * - * @param path zookeeper node path - * @param nodeType zookeeper node type + * @param path node path + * @param nodeType node type * @param failover is failover */ - public void removeNodePath(String path, NodeType nodeType, boolean failover) { + public void removeMasterNodePath(String path, NodeType nodeType, boolean failover) { logger.info("{} node deleted : {}", nodeType, path); - String failoverPath = getFailoverLockPath(nodeType); + + if (StringUtils.isEmpty(path)) { + logger.error("server down error: empty path: {}, nodeType:{}", path, nodeType); + return; + } + + String serverHost = registryClient.getHostByEventDataPath(path); + if (StringUtils.isEmpty(serverHost)) { + logger.error("server down error: unknown path: {}, nodeType:{}", path, nodeType); + return; + } + + String failoverPath = getFailoverLockPath(nodeType, serverHost); try { registryClient.getLock(failoverPath); + if (!registryClient.exists(path)) { + logger.info("path: {} not exists", path); + // handle dead server + registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP); + } + + //failover server + if (failover) { + failoverServerWhenDown(serverHost, nodeType); + } + } catch (Exception e) { + logger.error("{} server failover failed, host:{}", nodeType, serverHost, e); + } finally { + registryClient.releaseLock(failoverPath); + } + } + + /** + * remove worker node path + * + * @param path node path + * @param nodeType node type + * @param failover is failover + */ + public void removeWorkerNodePath(String path, NodeType nodeType, boolean failover) { + logger.info("{} node deleted : {}", nodeType, path); + try { String serverHost = null; if (!StringUtils.isEmpty(path)) { serverHost = registryClient.getHostByEventDataPath(path); @@ -168,21 +203,37 @@ public class MasterRegistryClient { logger.error("server down error: unknown path: {}", path); return; } - // handle dead server - registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP); + if (!registryClient.exists(path)) { + logger.info("path: {} not exists", path); + // handle dead server + registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP); + } } //failover server if (failover) { failoverServerWhenDown(serverHost, nodeType); } } catch (Exception e) { - logger.error("{} server failover failed.", nodeType); - logger.error("failover exception ", e); - } finally { - registryClient.releaseLock(failoverPath); + logger.error("{} server failover failed", nodeType, e); } } + private boolean isNeedToHandleDeadServer(String host, NodeType nodeType, Duration sessionTimeout) { + long sessionTimeoutMillis = Math.max(Constants.REGISTRY_SESSION_TIMEOUT, sessionTimeout.toMillis()); + List serverList = registryClient.getServerList(nodeType); + if (CollectionUtils.isEmpty(serverList)) { + return true; + } + Date startupTime = getServerStartupTime(serverList, host); + if (startupTime == null) { + return true; + } + if (System.currentTimeMillis() - startupTime.getTime() > sessionTimeoutMillis) { + return true; + } + return false; + } + /** * failover server when server down * @@ -208,12 +259,12 @@ public class MasterRegistryClient { * @param nodeType zookeeper node type * @return fail over lock path */ - private String getFailoverLockPath(NodeType nodeType) { + public String getFailoverLockPath(NodeType nodeType, String host) { switch (nodeType) { case MASTER: - return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS; + return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host; case WORKER: - return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS; + return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host; default: return ""; } @@ -222,10 +273,11 @@ public class MasterRegistryClient { /** * task needs failover if task start before worker starts * + * @param workerServers worker servers * @param taskInstance task instance * @return true if task instance need fail over */ - private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) { + private boolean checkTaskInstanceNeedFailover(List workerServers, TaskInstance taskInstance) { boolean taskNeedFailover = true; @@ -234,13 +286,11 @@ public class MasterRegistryClient { return false; } - // if the worker node exists in zookeeper, we must check the task starts after the worker - if (registryClient.checkNodeExists(taskInstance.getHost(), NodeType.WORKER)) { - //if task start after worker starts, there is no need to failover the task. - if (checkTaskAfterWorkerStart(taskInstance)) { - taskNeedFailover = false; - } + //if task start after worker starts, there is no need to failover the task. + if (checkTaskAfterWorkerStart(workerServers, taskInstance)) { + taskNeedFailover = false; } + return taskNeedFailover; } @@ -250,22 +300,47 @@ public class MasterRegistryClient { * @param taskInstance task instance * @return true if task instance start time after worker server start date */ - private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) { + private boolean checkTaskAfterWorkerStart(List workerServers, TaskInstance taskInstance) { if (StringUtils.isEmpty(taskInstance.getHost())) { return false; } - Date workerServerStartDate = null; - List workerServers = registryClient.getServerList(NodeType.WORKER); - for (Server workerServer : workerServers) { - if (taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())) { - workerServerStartDate = workerServer.getCreateTime(); + Date workerServerStartDate = getServerStartupTime(workerServers, taskInstance.getHost()); + if (workerServerStartDate != null) { + if (taskInstance.getStartTime() == null) { + return taskInstance.getSubmitTime().after(workerServerStartDate); + } else { + return taskInstance.getStartTime().after(workerServerStartDate); + } + } + return false; + } + + /** + * get server startup time + */ + private Date getServerStartupTime(List servers, String host) { + if (CollectionUtils.isEmpty(servers)) { + return null; + } + Date serverStartupTime = null; + for (Server server : servers) { + if (host.equals(server.getHost() + Constants.COLON + server.getPort())) { + serverStartupTime = server.getCreateTime(); break; } } - if (workerServerStartDate != null) { - return taskInstance.getStartTime().after(workerServerStartDate); + return serverStartupTime; + } + + /** + * get server startup time + */ + private Date getServerStartupTime(NodeType nodeType, String host) { + if (StringUtils.isEmpty(host)) { + return null; } - return false; + List servers = registryClient.getServerList(nodeType); + return getServerStartupTime(servers, host); } /** @@ -278,10 +353,13 @@ public class MasterRegistryClient { * @param workerHost worker host */ private void failoverWorker(String workerHost) { + if (StringUtils.isEmpty(workerHost)) { return; } + List workerServers = registryClient.getServerList(NodeType.WORKER); + long startTime = System.currentTimeMillis(); List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); Map processInstanceCacheMap = new HashMap<>(); @@ -297,31 +375,19 @@ public class MasterRegistryClient { continue; } processInstanceCacheMap.put(processInstance.getId(), processInstance); - taskInstance.setProcessInstance(processInstance); - - TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() - .buildTaskInstanceRelatedInfo(taskInstance) - .buildProcessInstanceRelatedInfo(processInstance) - .create(); - // only kill yarn job if exists , the local thread has exited - ProcessUtils.killYarnJob(taskExecutionContext); - - taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); - processService.saveTaskInstance(taskInstance); - - StateEvent stateEvent = new StateEvent(); - stateEvent.setTaskInstanceId(taskInstance.getId()); - stateEvent.setType(StateEventType.TASK_STATE_CHANGE); - stateEvent.setProcessInstanceId(processInstance.getId()); - stateEvent.setExecutionStatus(taskInstance.getState()); - workflowExecuteThreadPool.submitStateEvent(stateEvent); + } + + if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) { + continue; } // only failover the task owned myself if worker down. - if (processInstance.getHost().equalsIgnoreCase(getLocalAddress())) { - logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); - failoverTaskInstance(processInstance, taskInstance); + if (!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) { + continue; } + + logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); + failoverTaskInstance(processInstance, taskInstance); } logger.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime); } @@ -333,11 +399,15 @@ public class MasterRegistryClient { * * @param masterHost master host */ - private void failoverMaster(String masterHost) { + public void failoverMaster(String masterHost) { + if (StringUtils.isEmpty(masterHost)) { return; } + Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost); + List workerServers = registryClient.getServerList(NodeType.WORKER); + long startTime = System.currentTimeMillis(); List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); logger.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size()); @@ -347,16 +417,27 @@ public class MasterRegistryClient { continue; } - logger.info("failover process instance id: {}", processInstance.getId()); - List validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); for (TaskInstance taskInstance : validTaskInstanceList) { if (Constants.NULL.equals(taskInstance.getHost())) { continue; } + if (taskInstance.getState().typeIsFinished()) { + continue; + } + if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) { + continue; + } logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); failoverTaskInstance(processInstance, taskInstance); } + + if (serverStartupTime != null && processInstance.getRestartTime() != null + && processInstance.getRestartTime().after(serverStartupTime)) { + continue; + } + + logger.info("failover process instance id: {}", processInstance.getId()); //updateProcessInstance host is null and insert into command processService.processNeedFailoverProcessInstances(processInstance); } @@ -364,6 +445,13 @@ public class MasterRegistryClient { logger.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime); } + /** + * failover task instance + *

+ * 1. kill yarn job if there are yarn jobs in tasks. + * 2. change task state from running to need failover. + * 3. try to notify local master + */ private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance) { if (taskInstance == null) { logger.error("failover task instance error, taskInstance is null"); @@ -376,18 +464,16 @@ public class MasterRegistryClient { return; } - if (!checkTaskInstanceNeedFailover(taskInstance)) { - return; - } - taskInstance.setProcessInstance(processInstance); TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() .buildTaskInstanceRelatedInfo(taskInstance) .buildProcessInstanceRelatedInfo(processInstance) .create(); - // only kill yarn job if exists , the local thread has exited - ProcessUtils.killYarnJob(taskExecutionContext); + if (masterConfig.isKillYarnJobWhenTaskFailover()) { + // only kill yarn job if exists , the local thread has exited + ProcessUtils.killYarnJob(taskExecutionContext); + } taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); processService.saveTaskInstance(taskInstance); @@ -466,7 +552,7 @@ public class MasterRegistryClient { /** * get local address */ - private String getLocalAddress() { + public String getLocalAddress() { return NetUtils.getAddr(masterConfig.getListenPort()); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java index cb5b6bef86..361f09f10b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java @@ -63,7 +63,7 @@ public class MasterRegistryDataListener implements SubscribeListener { logger.info("master node added : {}", path); break; case REMOVE: - masterRegistryClient.removeNodePath(path, NodeType.MASTER, true); + masterRegistryClient.removeMasterNodePath(path, NodeType.MASTER, true); break; default: break; @@ -78,7 +78,7 @@ public class MasterRegistryDataListener implements SubscribeListener { break; case REMOVE: logger.info("worker node deleted : {}", path); - masterRegistryClient.removeNodePath(path, NodeType.WORKER, true); + masterRegistryClient.removeWorkerNodePath(path, NodeType.WORKER, true); break; default: break; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java new file mode 100644 index 0000000000..144baf2dab --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.registry.RegistryClient; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.Iterator; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class FailoverExecuteThread extends Thread { + + private static final Logger logger = LoggerFactory.getLogger(FailoverExecuteThread.class); + + @Autowired + private MasterRegistryClient masterRegistryClient; + + @Autowired + private RegistryClient registryClient; + + @Autowired + private MasterConfig masterConfig; + + /** + * process service + */ + @Autowired + private ProcessService processService; + + @Override + public synchronized void start() { + super.setName("FailoverExecuteThread"); + super.start(); + } + + @Override + public void run() { + while (Stopper.isRunning()) { + logger.info("failover execute started"); + try { + List hosts = getNeedFailoverMasterServers(); + if (CollectionUtils.isEmpty(hosts)) { + continue; + } + logger.info("need failover hosts:{}", hosts); + + for (String host : hosts) { + String failoverPath = masterRegistryClient.getFailoverLockPath(NodeType.MASTER, host); + try { + registryClient.getLock(failoverPath); + masterRegistryClient.failoverMaster(host); + } catch (Exception e) { + logger.error("{} server failover failed, host:{}", NodeType.MASTER, host, e); + } finally { + registryClient.releaseLock(failoverPath); + } + } + } catch (Exception e) { + logger.error("failover execute error", e); + } finally { + ThreadUtils.sleep((long) Constants.SLEEP_TIME_MILLIS * masterConfig.getFailoverInterval() * 60); + } + } + } + + private List getNeedFailoverMasterServers() { + // failover myself && failover dead masters + List hosts = processService.queryNeedFailoverProcessInstanceHost(); + + Iterator iterator = hosts.iterator(); + while (iterator.hasNext()) { + String host = iterator.next(); + if (registryClient.checkNodeExists(host, NodeType.MASTER)) { + if (!host.equals(masterRegistryClient.getLocalAddress())) { + iterator.remove(); + } + } + } + return hosts; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 0d46c3b3da..c23da356c2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -623,6 +623,7 @@ public class WorkflowExecuteThread { processDefinition.getGlobalParamList(), CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); processInstance.setStartTime(new Date()); + processInstance.setRestartTime(processInstance.getStartTime()); processInstance.setEndTime(null); processService.saveProcessInstance(processInstance); this.taskInstanceMap.clear(); diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml index f043196e4c..cfbe857760 100644 --- a/dolphinscheduler-master/src/main/resources/application.yaml +++ b/dolphinscheduler-master/src/main/resources/application.yaml @@ -106,6 +106,10 @@ master: reserved-memory: 0.3 # use task logger, default true; if true, it will create log for every task; if false, the task log will append to master log file task-logger: true + # failover interval, the unit is minute + failover-interval: 10 + # kill yarn jon when failover taskInstance, default true + kill-yarn-job-when-task-failover: true server: port: 5679 diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java index 89df27e0f1..d3d5b03de9 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java @@ -86,6 +86,7 @@ public class MasterRegistryClientTest { ProcessInstance processInstance = new ProcessInstance(); processInstance.setId(1); processInstance.setHost("127.0.0.1:8080"); + processInstance.setRestartTime(new Date()); processInstance.setHistoryCmd("xxx"); processInstance.setCommandType(CommandType.STOP); given(processService.queryNeedFailoverProcessInstances(Mockito.anyString())).willReturn(Arrays.asList(processInstance)); @@ -102,6 +103,7 @@ public class MasterRegistryClientTest { server.setPort(8080); server.setCreateTime(new Date()); given(registryClient.getServerList(NodeType.WORKER)).willReturn(Arrays.asList(server)); + given(registryClient.getServerList(NodeType.MASTER)).willReturn(Arrays.asList(server)); } @Test @@ -118,9 +120,9 @@ public class MasterRegistryClientTest { @Test public void removeNodePathTest() { - masterRegistryClient.removeNodePath("/path", NodeType.MASTER, false); - masterRegistryClient.removeNodePath("/path", NodeType.MASTER, true); + masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, false); + masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, true); //Cannot mock static methods - masterRegistryClient.removeNodePath("/path", NodeType.WORKER, true); + masterRegistryClient.removeWorkerNodePath("/path", NodeType.WORKER, true); } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java index 51ceae63f1..a1d0b03bae 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.registry.api; import java.io.Closeable; +import java.time.Duration; import java.util.Collection; import java.util.Map; @@ -43,4 +44,6 @@ public interface Registry extends Closeable { boolean acquireLock(String key); boolean releaseLock(String key); + + Duration getSessionTimeout(); } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java index 1719fb5386..e0f818e665 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java @@ -42,6 +42,7 @@ import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -233,6 +234,11 @@ public final class ZookeeperRegistry implements Registry { return true; } + @Override + public Duration getSessionTimeout() { + return properties.getSessionTimeout(); + } + @Override public void close() { treeCacheMap.values().forEach(CloseableUtils::closeQuietly); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 9702d648c3..342e91301f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -664,6 +664,7 @@ public class ProcessService { processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); processInstance.setRecovery(Flag.NO); processInstance.setStartTime(new Date()); + processInstance.setRestartTime(processInstance.getStartTime()); processInstance.setRunTimes(1); processInstance.setMaxTryTimes(0); processInstance.setCommandParam(command.getCommandParam()); @@ -853,6 +854,7 @@ public class ProcessService { processInstance.setScheduleTime(command.getScheduleTime()); } processInstance.setHost(host); + processInstance.setRestartTime(new Date()); ExecutionStatus runStatus = ExecutionStatus.RUNNING_EXECUTION; int runTime = processInstance.getRunTimes(); switch (commandType) { @@ -922,6 +924,7 @@ public class ProcessService { updateTaskInstance(taskInstance); } processInstance.setStartTime(new Date()); + processInstance.setRestartTime(processInstance.getStartTime()); processInstance.setEndTime(null); processInstance.setRunTimes(runTime + 1); initComplementDataParam(processDefinition, processInstance, cmdParam); @@ -1862,6 +1865,10 @@ public class ProcessService { return processInstanceMapper.queryByHostAndStatus(host, stateArray); } + public List queryNeedFailoverProcessInstanceHost() { + return processInstanceMapper.queryNeedFailoverProcessInstanceHost(stateArray); + } + /** * process need failover process instance * diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java index 2a7e7a1514..2f98b09b34 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java @@ -43,6 +43,7 @@ import org.apache.dolphinscheduler.registry.api.RegistryException; import org.apache.dolphinscheduler.registry.api.SubscribeListener; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Date; @@ -310,4 +311,8 @@ public class RegistryClient { } } } + + public Duration getSessionTimeout() { + return registry.getSessionTimeout(); + } } diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index 7ed47a00fd..02d2b9f7ad 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -110,6 +110,10 @@ master: reserved-memory: 0.3 # use task logger, default true; if true, it will create log for every task; if false, the task log will append to master log file task-logger: true + # failover interval, the unit is minute + failover-interval: 10 + # kill yarn jon when failover taskInstance, default true + kill-yarn-job-when-task-failover: true worker: # worker listener port