From c956185e00118bde6df99c2aaf3017d974515ebf Mon Sep 17 00:00:00 2001 From: zwZjut Date: Wed, 22 Dec 2021 18:57:51 +0800 Subject: [PATCH] [Feature] 2.0.2-prepare bug fix of Pressure tests #7511 (#7540) * [Feature][dolphinscheduler-api] parse traceId in http header for Cross system delivery to #7237 (#7238) * to #7237 * rerun test Co-authored-by: honghuo.zw * chery-pick 05aef27 and handle conflicts * to #7065: fix ExecutorService and schedulerService (#7072) Co-authored-by: honghuo.zw * [Feature][dolphinscheduler-api] access control of taskDefinition and taskInstance in project to #7081 (#7082) * to #7081 * fix #7081 * to #7081 Co-authored-by: honghuo.zw * chery-pick 8ebe060 and handle conflicts * cherry-pick 1f18444 and handle conflicts * fix #6807: dolphinscheduler.zookeeper.env_vars - > dolphinscheduler.registry.env_vars (#6808) Co-authored-by: honghuo.zw Co-authored-by: Kirs * add default constructor (#6780) Co-authored-by: honghuo.zw * to #7108 (#7109) * to #7511 * to #7511 * to #7511 * to #7511 Co-authored-by: honghuo.zw Co-authored-by: Kirs --- .../application-api.properties.tpl | 3 + .../dolphinscheduler/master.properties.tpl | 4 + .../dolphinscheduler/quartz.properties.tpl | 3 +- .../dolphinscheduler/registry.properties.tpl | 1 + .../dolphinscheduler/worker.properties.tpl | 3 + .../kubernetes/dolphinscheduler/values.yaml | 10 + .../dolphinscheduler/common/Constants.java | 3 +- .../dao/entity/ProcessInstance.java | 18 ++ .../dao/mapper/ProcessInstanceMapMapper.java | 4 +- .../dao/mapper/ProcessInstanceMapper.java | 8 + .../dao/mapper/TaskInstanceMapper.java | 2 + .../dao/mapper/ProcessInstanceMapper.xml | 13 +- .../dao/mapper/TaskInstanceMapper.xml | 6 + .../resources/sql/dolphinscheduler_h2.sql | 1 + .../resources/sql/dolphinscheduler_mysql.sql | 1 + .../sql/dolphinscheduler_postgre.sql | 1 + .../src/main/resources/sql/soft_version | 2 +- .../mysql/dolphinscheduler_ddl.sql | 20 ++ .../mysql/dolphinscheduler_dml.sql | 16 ++ .../postgresql/dolphinscheduler_ddl.sql | 41 ++++ .../postgresql/dolphinscheduler_dml.sql | 16 ++ .../server/master/MasterServer.java | 23 +- .../server/master/config/MasterConfig.java | 21 ++ .../consumer/TaskPriorityQueueConsumer.java | 8 +- .../processor/queue/TaskResponseService.java | 26 ++- .../master/registry/MasterRegistryClient.java | 199 ++++++++++++++---- .../registry/MasterRegistryDataListener.java | 4 +- .../master/registry/ServerNodeManager.java | 4 +- .../master/runner/FailoverExecuteThread.java | 91 ++++++++ .../master/runner/MasterSchedulerService.java | 1 + .../master/runner/WorkflowExecuteThread.java | 16 +- .../runner/task/CommonTaskProcessor.java | 6 +- .../server/worker/WorkerServer.java | 30 ++- .../server/worker/config/WorkerConfig.java | 11 + .../worker/processor/DBTaskAckProcessor.java | 9 +- .../processor/DBTaskResponseProcessor.java | 5 +- .../worker/registry/WorkerRegistryClient.java | 44 ++++ .../runner/RetryReportTaskStatusThread.java | 16 +- .../registry/MasterRegistryClientTest.java | 6 +- .../service/process/ProcessService.java | 30 ++- .../service/quartz/QuartzExecutors.java | 73 ++++--- .../service/queue/TaskPriorityQueueImpl.java | 2 +- .../service/registry/RegistryClient.java | 2 +- .../spi/utils/PropertyUtils.java | 3 + 44 files changed, 670 insertions(+), 136 deletions(-) create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_dml.sql create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_dml.sql create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java diff --git a/docker/build/conf/dolphinscheduler/application-api.properties.tpl b/docker/build/conf/dolphinscheduler/application-api.properties.tpl index d78db2d631..393a33ce0c 100644 --- a/docker/build/conf/dolphinscheduler/application-api.properties.tpl +++ b/docker/build/conf/dolphinscheduler/application-api.properties.tpl @@ -38,6 +38,9 @@ server.compression.mime-types=text/html,text/xml,text/plain,text/css,text/javasc # max http post size server.jetty.max-http-form-post-size=5000000 +# max http header size +server.max-http-header-size=81920 + # messages encoding spring.messages.encoding=UTF-8 diff --git a/docker/build/conf/dolphinscheduler/master.properties.tpl b/docker/build/conf/dolphinscheduler/master.properties.tpl index 046d5c15ff..98ca3ddd26 100644 --- a/docker/build/conf/dolphinscheduler/master.properties.tpl +++ b/docker/build/conf/dolphinscheduler/master.properties.tpl @@ -44,3 +44,7 @@ master.max.cpuload.avg=${MASTER_MAX_CPULOAD_AVG} # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G master.reserved.memory=${MASTER_RESERVED_MEMORY} +# master failover interval minutes +master.failover.interval=${MASTER_FAILOVER_INTERVAL} +# master kill yarn job when handle failover +master.kill.yarn.job.when.handle.failover=${MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER} \ No newline at end of file diff --git a/docker/build/conf/dolphinscheduler/quartz.properties.tpl b/docker/build/conf/dolphinscheduler/quartz.properties.tpl index 45c61a653f..5f011f9151 100644 --- a/docker/build/conf/dolphinscheduler/quartz.properties.tpl +++ b/docker/build/conf/dolphinscheduler/quartz.properties.tpl @@ -32,7 +32,8 @@ #org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool #org.quartz.threadPool.makeThreadsDaemons = true -#org.quartz.threadPool.threadCount = 25 +org.quartz.threadPool.threadCount = ${ORG_QUARTZ_THREADPOOL_THREADCOUNT} +org.quartz.scheduler.batchTriggerAcquisitionMaxCount = ${ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT} #org.quartz.threadPool.threadPriority = 5 #============================================================================ diff --git a/docker/build/conf/dolphinscheduler/registry.properties.tpl b/docker/build/conf/dolphinscheduler/registry.properties.tpl index 9ee8add211..e1ac10434a 100644 --- a/docker/build/conf/dolphinscheduler/registry.properties.tpl +++ b/docker/build/conf/dolphinscheduler/registry.properties.tpl @@ -17,3 +17,4 @@ registry.plugin.name=${REGISTRY_PLUGIN_NAME} registry.servers=${REGISTRY_SERVERS} +session.timeout.ms=${SESSION_TIMEOUT_MS} \ No newline at end of file diff --git a/docker/build/conf/dolphinscheduler/worker.properties.tpl b/docker/build/conf/dolphinscheduler/worker.properties.tpl index 94a3352611..e1f1574561 100644 --- a/docker/build/conf/dolphinscheduler/worker.properties.tpl +++ b/docker/build/conf/dolphinscheduler/worker.properties.tpl @@ -41,3 +41,6 @@ worker.groups=${WORKER_GROUPS} # alert server listen host alert.listen.host=${ALERT_LISTEN_HOST} + +# worker retry report task statues interval seconds +worker.retry.report.task.statues.interval=${WORKER_RETRY_REPORT_TASK_STATUS_INTERVAL} \ No newline at end of file diff --git a/docker/kubernetes/dolphinscheduler/values.yaml b/docker/kubernetes/dolphinscheduler/values.yaml index 6ef58d586c..f0df9e7859 100644 --- a/docker/kubernetes/dolphinscheduler/values.yaml +++ b/docker/kubernetes/dolphinscheduler/values.yaml @@ -53,6 +53,10 @@ externalDatabase: ## If not exists external zookeeper, by default, Dolphinscheduler's zookeeper will use it. zookeeper: enabled: true + tickTime: 3000 + maxSessionTimeout: 60000 + initLimit: 300 + maxClientCnxns: 2000 fourlwCommandsWhitelist: "srvr,ruok,wchs,cons" persistence: enabled: false @@ -158,6 +162,10 @@ master: MASTER_TASK_COMMIT_INTERVAL: "1000" MASTER_MAX_CPULOAD_AVG: "-1" MASTER_RESERVED_MEMORY: "0.3" + MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER: "true" + ORG_QUARTZ_THREADPOOL_THREADCOUNT: "25" + ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT: "1" + SESSION_TIMEOUT_MS: 60000 ## Periodic probe of container liveness. Container will be restarted if the probe fails. Cannot be updated. ## More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes livenessProbe: @@ -225,6 +233,8 @@ worker: WORKER_MAX_CPULOAD_AVG: "-1" WORKER_RESERVED_MEMORY: "0.3" WORKER_GROUPS: "default" + SESSION_TIMEOUT_MS: 60000 + WORKER_RETRY_REPORT_TASK_STATUS_INTERVAL: 600 ## Periodic probe of container liveness. Container will be restarted if the probe fails. Cannot be updated. ## More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes livenessProbe: diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 5a7b7cd535..809bcb1773 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -53,7 +53,7 @@ public final class Constants { public static final String ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK = "org.quartz.jobStore.acquireTriggersWithinLock"; public static final String ORG_QUARTZ_JOBSTORE_DATASOURCE = "org.quartz.jobStore.dataSource"; public static final String ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS = "org.quartz.dataSource.myDs.connectionProvider.class"; - + public static final String ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT = "org.quartz.scheduler.batchTriggerAcquisitionMaxCount"; /** * quartz config default value */ @@ -66,6 +66,7 @@ public final class Constants { public static final String QUARTZ_INSTANCENAME = "DolphinScheduler"; public static final String QUARTZ_INSTANCEID = "AUTO"; public static final String QUARTZ_ACQUIRETRIGGERSWITHINLOCK = "true"; + public static final String QUARTZ_BATCHTRIGGERACQUISTITIONMAXCOUNT = "100"; /** * common properties path diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index 18c386b854..f20b13a08b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -244,6 +244,12 @@ public class ProcessInstance { */ private int dryRun; + /** + * re-start time + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date restartTime; + public ProcessInstance() { } @@ -516,6 +522,14 @@ public class ProcessInstance { this.dryRun = dryRun; } + public Date getRestartTime() { + return restartTime; + } + + public void setRestartTime(Date restartTime) { + this.restartTime = restartTime; + } + /** * add command to history * @@ -684,6 +698,10 @@ public class ProcessInstance { + ", dryRun='" + dryRun + '\'' + + '}' + + ", restartTime='" + + restartTime + + '\'' + '}'; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.java index 0e5a38140f..8ad7e89413 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.java @@ -17,11 +17,13 @@ package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; + import org.apache.ibatis.annotations.Param; import java.util.List; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + /** * process instance map mapper interface */ diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index 51f60295f4..4a156ce487 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -43,6 +43,14 @@ public interface ProcessInstanceMapper extends BaseMapper { */ ProcessInstance queryDetailById(@Param("processId") int processId); + /** + * query process instance host by stateArray + * + * @param stateArray + * @return + */ + List queryNeedFailoverProcessInstanceHost(@Param("states") int[] stateArray); + /** * query process instance by host and stateArray * diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java index 795004d0c9..5e2597d4cf 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java @@ -73,4 +73,6 @@ public interface TaskInstanceMapper extends BaseMapper { @Param("startTime") Date startTime, @Param("endTime") Date endTime ); + + int updateHostAndSubmitTimeById(@Param("id") int id, @Param("host") String host, @Param("submitTime") Date submitTime); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index 77d96b5b26..772969552b 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -23,7 +23,7 @@ command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type, warning_group_id, schedule_time, command_start_time, global_params, flag, update_time, is_sub_process, executor_id, history_cmd, - process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool, dry_run + process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool, dry_run, restart_time - + select instance.id, instance.command_type, instance.executor_id, instance.process_definition_version, instance.process_definition_code, instance.name, instance.state, instance.schedule_time, instance.start_time, - instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run + instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run, instance.restart_time from t_ds_process_instance instance join t_ds_process_definition define ON instance.process_definition_code = define.code where instance.is_sub_process=0 diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index f41b58ae13..bdbc5386f3 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -162,4 +162,10 @@ order by instance.start_time desc + + update t_ds_task_instance + set host = #{host}, + submit_time = #{submitTime} + where id = #{id} + diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index c85e106bd2..4f6e900739 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -601,6 +601,7 @@ CREATE TABLE t_ds_process_instance tenant_id int(11) NOT NULL DEFAULT '-1', var_pool longtext, dry_run int NULL DEFAULT 0, + restart_time datetime DEFAULT NULL, PRIMARY KEY (id) ); diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 35e8d5a9b0..9e52d60401 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -596,6 +596,7 @@ CREATE TABLE `t_ds_process_instance` ( `var_pool` longtext COMMENT 'var_pool', `dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flagļ¼š0 normal, 1 dry run', `next_process_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next processInstanceId', + `restart_time` datetime DEFAULT NULL COMMENT 'process instance restart time', PRIMARY KEY (`id`), KEY `process_instance_index` (`process_definition_code`,`id`) USING BTREE, KEY `start_time_index` (`start_time`,`end_time`) USING BTREE diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql index 3683b9bae1..5c02006eac 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql @@ -501,6 +501,7 @@ CREATE TABLE t_ds_process_instance ( var_pool text , dry_run int DEFAULT '0' , next_process_instance_id int DEFAULT '0', + restart_time timestamp DEFAULT NULL , PRIMARY KEY (id) ) ; diff --git a/dolphinscheduler-dao/src/main/resources/sql/soft_version b/dolphinscheduler-dao/src/main/resources/sql/soft_version index 10bf840ed5..f93ea0ca33 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/soft_version +++ b/dolphinscheduler-dao/src/main/resources/sql/soft_version @@ -1 +1 @@ -2.0.1 \ No newline at end of file +2.0.2 \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql new file mode 100644 index 0000000000..c7a2396fa4 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY','')); + +alter table t_ds_process_instance add column if not exists `restart_time` datetime DEFAULT NULL COMMENT 'process instance restart time'; \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_dml.sql new file mode 100644 index 0000000000..38964cc551 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_dml.sql @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql new file mode 100644 index 0000000000..d26cf8e3b8 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +delimiter d// +CREATE OR REPLACE FUNCTION public.dolphin_update_metadata( + ) + RETURNS character varying + LANGUAGE 'plpgsql' + COST 100 + VOLATILE PARALLEL UNSAFE +AS $BODY$ +DECLARE +v_schema varchar; +BEGIN + ---get schema name + v_schema =current_schema(); + +EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_process_instance ADD COLUMN IF NOT EXISTS "restart_time" timestamp DEFAULT NULL'; +return 'Success!'; +exception when others then + ---Raise EXCEPTION '(%)',SQLERRM; + return SQLERRM; +END; +$BODY$; + +select dolphin_update_metadata(); + +d// \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_dml.sql new file mode 100644 index 0000000000..38964cc551 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_dml.sql @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index ce0a080a73..e9d0cc5a4d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.server.master; +import static org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.thread.Stopper; @@ -31,8 +33,9 @@ import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProce import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; +import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; @@ -51,8 +54,6 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.FilterType; import org.springframework.transaction.annotation.EnableTransactionManagement; -import static org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME; - /** * master server */ @@ -105,6 +106,9 @@ public class MasterServer implements IStoppable { @Autowired private EventExecuteService eventExecuteService; + @Autowired + private FailoverExecuteThread failoverExecuteThread; + @Value("${spring.datasource.driver-class-name}") private String driverClassName; @@ -145,8 +149,8 @@ public class MasterServer implements IStoppable { // self tolerant this.masterRegistryClient.init(this.processInstanceExecMaps); - this.masterRegistryClient.start(); this.masterRegistryClient.setRegistryStoppable(this); + this.masterRegistryClient.start(); this.eventExecuteService.init(this.processInstanceExecMaps); this.eventExecuteService.start(); @@ -155,6 +159,8 @@ public class MasterServer implements IStoppable { this.masterSchedulerService.start(); + this.failoverExecuteThread.start(); + // start QuartzExecutors // what system should do if exception try { @@ -217,8 +223,17 @@ public class MasterServer implements IStoppable { } // close spring Context and will invoke method with @PreDestroy annotation to destory beans. like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc springApplicationContext.close(); + logger.info("springApplicationContext close"); } catch (Exception e) { logger.error("master server stop exception ", e); + } finally { + try { + // thread sleep 60 seconds for quietly stop + Thread.sleep(60000L); + } catch (Exception e) { + logger.warn("thread sleep exception ", e); + } + System.exit(1); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index 124ecebba9..13a68c40d8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -63,6 +63,12 @@ public class MasterConfig { @Value("${master.cache.process.definition:true}") private boolean masterCacheProcessDefinition; + @Value("${master.failover.interval:10}") + private int failoverInterval; + + @Value("${master.kill.yarn.job.when.handle.fail.over:true}") + private boolean masterKillYarnJobWhenHandleFailOver; + public int getListenPort() { return listenPort; } @@ -162,4 +168,19 @@ public class MasterConfig { this.masterCacheProcessDefinition = masterCacheProcessDefinition; } + public int getFailoverInterval() { + return failoverInterval; + } + + public void setFailoverInterval(int failoverInterval) { + this.failoverInterval = failoverInterval; + } + + public boolean getMasterKillYarnJobWhenHandleFailOver() { + return masterKillYarnJobWhenHandleFailOver; + } + + public void setMasterKillYarnJobWhenHandleFailOver(boolean masterKillYarnJobWhenHandleFailOver) { + this.masterKillYarnJobWhenHandleFailOver = masterKillYarnJobWhenHandleFailOver; + } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 7b18e2bc71..574f5db386 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -136,8 +137,13 @@ public class TaskPriorityQueueConsumer extends Thread { } else { result = dispatcher.dispatch(executionContext); } + if (result) { + processService.updateHostAndSubmitTimeById(taskPriority.getTaskId(), executionContext.getHost().getAddress(), new Date()); + } } catch (ExecuteException e) { - logger.error("dispatch error: {}", e.getMessage(),e); + logger.error("ExecuteException dispatch error: {}", e.getMessage(), e); + } catch (Throwable t) { + logger.error("dispatch error: {}", t, t); } return result; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index 27b96e14d8..a320a70ec4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -110,6 +110,7 @@ public class TaskResponseService { public void addResponse(TaskResponseEvent taskResponseEvent) { try { eventQueue.put(taskResponseEvent); + logger.debug("eventQueue size:{}", eventQueue.size()); } catch (InterruptedException e) { logger.error("put task : {} error :{}", taskResponseEvent, e); Thread.currentThread().interrupt(); @@ -155,36 +156,49 @@ public class TaskResponseService { try { if (taskInstance != null) { ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState(); - processService.changeTaskState(taskInstance, status, + boolean result = processService.changeTaskState(taskInstance, status, taskResponseEvent.getStartTime(), taskResponseEvent.getWorkerAddress(), taskResponseEvent.getExecutePath(), taskResponseEvent.getLogPath(), taskResponseEvent.getTaskInstanceId()); + logger.debug("changeTaskState in ACK , changed in meta:{} ,task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}", + result, taskInstance.getState(), taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost()); } // if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); channel.writeAndFlush(taskAckCommand.convert2Command()); + logger.debug("worker ack master success, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost()); } catch (Exception e) { logger.error("worker ack master error", e); - DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1); + DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance == null ? -1 : taskInstance.getId()); channel.writeAndFlush(taskAckCommand.convert2Command()); } break; case RESULT: try { + boolean result = true; if (taskInstance != null) { - processService.changeTaskState(taskInstance, taskResponseEvent.getState(), + result = processService.changeTaskState(taskInstance, taskResponseEvent.getState(), taskResponseEvent.getEndTime(), taskResponseEvent.getProcessId(), taskResponseEvent.getAppIds(), taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getVarPool() ); + logger.debug("changeTaskState in RESULT , changed in meta:{} task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}", + result, taskInstance.getState(), taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost()); + } + if (!result) { + DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), taskResponseEvent.getTaskInstanceId()); + channel.writeAndFlush(taskResponseCommand.convert2Command()); + logger.debug("worker response master failure, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost()); + } else { + // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success + DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); + channel.writeAndFlush(taskResponseCommand.convert2Command()); + logger.debug("worker response master success, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost()); } - // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success - DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); - channel.writeAndFlush(taskResponseCommand.convert2Command()); } catch (Exception e) { logger.error("worker response master error", e); DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index ae2b96912f..5f1eff628f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -43,6 +43,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import org.apache.dolphinscheduler.service.registry.RegistryClient; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang.StringUtils; import java.util.Collections; @@ -127,16 +128,16 @@ public class MasterRegistryClient { ThreadUtils.sleep(SLEEP_TIME_MILLIS); } - // self tolerant - if (registryClient.getActiveMasterNum() == 1) { - removeNodePath(null, NodeType.MASTER, true); - removeNodePath(null, NodeType.WORKER, true); - } registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener()); } catch (Exception e) { logger.error("master start up exception", e); + this.registryClient.getStoppable().stop("master start up exception"); } finally { - registryClient.releaseLock(nodeLock); + try { + registryClient.releaseLock(nodeLock); + } catch (Exception e) { + logger.error("release lock error", e); + } } } @@ -150,18 +151,57 @@ public class MasterRegistryClient { } /** - * remove zookeeper node path + * remove master node path * - * @param path zookeeper node path - * @param nodeType zookeeper node type + * @param path node path + * @param nodeType node type * @param failover is failover */ - public void removeNodePath(String path, NodeType nodeType, boolean failover) { + public void removeMasterNodePath(String path, NodeType nodeType, boolean failover) { logger.info("{} node deleted : {}", nodeType, path); - String failoverPath = getFailoverLockPath(nodeType); + + if (StringUtils.isEmpty(path)) { + logger.error("server down error: empty path: {}, nodeType:{}", path, nodeType); + return; + } + + String serverHost = registryClient.getHostByEventDataPath(path); + if (StringUtils.isEmpty(serverHost)) { + logger.error("server down error: unknown path: {}, nodeType:{}", path, nodeType); + return; + } + + String failoverPath = getFailoverLockPath(nodeType, serverHost); try { registryClient.getLock(failoverPath); + if (!registryClient.exists(path)) { + logger.info("path: {} not exists", path); + // handle dead server + registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP); + } + + //failover server + if (failover) { + failoverServerWhenDown(serverHost, nodeType); + } + } catch (Exception e) { + logger.error("{} server failover failed, host:{}", nodeType, serverHost, e); + } finally { + registryClient.releaseLock(failoverPath); + } + } + + /** + * remove worker node path + * + * @param path node path + * @param nodeType node type + * @param failover is failover + */ + public void removeWorkerNodePath(String path, NodeType nodeType, boolean failover) { + logger.info("{} node deleted : {}", nodeType, path); + try { String serverHost = null; if (!StringUtils.isEmpty(path)) { serverHost = registryClient.getHostByEventDataPath(path); @@ -169,18 +209,18 @@ public class MasterRegistryClient { logger.error("server down error: unknown path: {}", path); return; } - // handle dead server - registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP); + if (!registryClient.exists(path)) { + logger.info("path: {} not exists", path); + // handle dead server + registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP); + } } //failover server if (failover) { failoverServerWhenDown(serverHost, nodeType); } } catch (Exception e) { - logger.error("{} server failover failed.", nodeType); - logger.error("failover exception ", e); - } finally { - registryClient.releaseLock(failoverPath); + logger.error("{} server failover failed", nodeType, e); } } @@ -209,12 +249,12 @@ public class MasterRegistryClient { * @param nodeType zookeeper node type * @return fail over lock path */ - private String getFailoverLockPath(NodeType nodeType) { + public String getFailoverLockPath(NodeType nodeType, String host) { switch (nodeType) { case MASTER: - return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS; + return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host; case WORKER: - return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS; + return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host; default: return ""; } @@ -226,7 +266,11 @@ public class MasterRegistryClient { * @param taskInstance task instance * @return true if task instance need fail over */ - private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) { + private boolean checkTaskInstanceNeedFailover(List workerServers, TaskInstance taskInstance) { + + // first submit: host is null + // dispatch succeed: host is not null && submit_time is null + // ACK || RESULT from worker: host is not null && start_time is not null boolean taskNeedFailover = true; @@ -234,14 +278,15 @@ public class MasterRegistryClient { if (taskInstance.getHost() == null) { return false; } - - // if the worker node exists in zookeeper, we must check the task starts after the worker - if (registryClient.checkNodeExists(taskInstance.getHost(), NodeType.WORKER)) { - //if task start after worker starts, there is no need to failover the task. - if (checkTaskAfterWorkerStart(taskInstance)) { - taskNeedFailover = false; - } + // host is not null and submit time is null, master will retry + if (taskInstance.getSubmitTime() == null) { + return false; + } + //if task start after worker starts, there is no need to failover the task. + if (checkTaskAfterWorkerStart(workerServers, taskInstance)) { + taskNeedFailover = false; } + return taskNeedFailover; } @@ -269,6 +314,54 @@ public class MasterRegistryClient { return false; } + /** + * check task start after the worker server starts. + * + * @param taskInstance task instance + * @return true if task instance start time after worker server start date + */ + private boolean checkTaskAfterWorkerStart(List workerServers, TaskInstance taskInstance) { + if (StringUtils.isEmpty(taskInstance.getHost())) { + return false; + } + + Date taskTime = taskInstance.getStartTime() == null ? taskInstance.getSubmitTime() : taskInstance.getStartTime(); + + Date workerServerStartDate = getServerStartupTime(workerServers, taskInstance.getHost()); + if (workerServerStartDate != null) { + return taskTime.after(workerServerStartDate); + } + return false; + } + + /** + * get server startup time + */ + private Date getServerStartupTime(List servers, String host) { + if (CollectionUtils.isEmpty(servers)) { + return null; + } + Date serverStartupTime = null; + for (Server server : servers) { + if (host.equals(server.getHost() + Constants.COLON + server.getPort())) { + serverStartupTime = server.getCreateTime(); + break; + } + } + return serverStartupTime; + } + + /** + * get server startup time + */ + private Date getServerStartupTime(NodeType nodeType, String host) { + if (StringUtils.isEmpty(host)) { + return null; + } + List servers = registryClient.getServerList(nodeType); + return getServerStartupTime(servers, host); + } + /** * failover worker tasks *

@@ -279,10 +372,13 @@ public class MasterRegistryClient { * @param workerHost worker host */ private void failoverWorker(String workerHost) { + if (StringUtils.isEmpty(workerHost)) { return; } + List workerServers = registryClient.getServerList(NodeType.WORKER); + long startTime = System.currentTimeMillis(); List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); Map processInstanceCacheMap = new HashMap<>(); @@ -300,11 +396,17 @@ public class MasterRegistryClient { processInstanceCacheMap.put(processInstance.getId(), processInstance); } + if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) { + continue; + } + // only failover the task owned myself if worker down. - if (processInstance.getHost().equalsIgnoreCase(getLocalAddress())) { - logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); - failoverTaskInstance(processInstance, taskInstance); + if (!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) { + continue; } + + logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); + failoverTaskInstance(processInstance, taskInstance); } logger.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime); } @@ -316,11 +418,15 @@ public class MasterRegistryClient { * * @param masterHost master host */ - private void failoverMaster(String masterHost) { + public void failoverMaster(String masterHost) { + if (StringUtils.isEmpty(masterHost)) { return; } + Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost); + List workerServers = registryClient.getServerList(NodeType.WORKER); + long startTime = System.currentTimeMillis(); List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); logger.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size()); @@ -330,6 +436,11 @@ public class MasterRegistryClient { continue; } + if (serverStartupTime != null && processInstance.getRestartTime() != null + && processInstance.getRestartTime().after(serverStartupTime)) { + continue; + } + logger.info("failover process instance id: {}", processInstance.getId()); List validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); @@ -337,6 +448,12 @@ public class MasterRegistryClient { if (Constants.NULL.equals(taskInstance.getHost())) { continue; } + if (taskInstance.getState().typeIsFinished()) { + continue; + } + if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) { + continue; + } logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId()); failoverTaskInstance(processInstance, taskInstance); } @@ -347,6 +464,13 @@ public class MasterRegistryClient { logger.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime); } + /** + * failover task instance + *

+ * 1. kill yarn job if there are yarn jobs in tasks. + * 2. change task state from running to need failover. + * 3. try to notify local master + */ private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance) { if (taskInstance == null) { logger.error("failover task instance error, taskInstance is null"); @@ -359,24 +483,23 @@ public class MasterRegistryClient { return; } - if (!checkTaskInstanceNeedFailover(taskInstance)) { - return; - } - taskInstance.setProcessInstance(processInstance); TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() .buildTaskInstanceRelatedInfo(taskInstance) .buildProcessInstanceRelatedInfo(processInstance) .create(); - // only kill yarn job if exists , the local thread has exited - ProcessUtils.killYarnJob(taskExecutionContext); + if (masterConfig.getMasterKillYarnJobWhenHandleFailOver()) { + // only kill yarn job if exists , the local thread has exited + ProcessUtils.killYarnJob(taskExecutionContext); + } taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); processService.saveTaskInstance(taskInstance); WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecMaps.get(processInstance.getId()); if (workflowExecuteThreadNotify == null) { + logger.info("workflowExecuteThreadNotify is null, just return, task id:{},process id:{}", taskInstance.getId(), processInstance.getId()); return; } StateEvent stateEvent = new StateEvent(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java index cb5b6bef86..361f09f10b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java @@ -63,7 +63,7 @@ public class MasterRegistryDataListener implements SubscribeListener { logger.info("master node added : {}", path); break; case REMOVE: - masterRegistryClient.removeNodePath(path, NodeType.MASTER, true); + masterRegistryClient.removeMasterNodePath(path, NodeType.MASTER, true); break; default: break; @@ -78,7 +78,7 @@ public class MasterRegistryDataListener implements SubscribeListener { break; case REMOVE: logger.info("worker node deleted : {}", path); - masterRegistryClient.removeNodePath(path, NodeType.WORKER, true); + masterRegistryClient.removeWorkerNodePath(path, NodeType.WORKER, true); break; default: break; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index b7e904b4d3..6b30a1a5e2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -51,6 +51,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import javax.annotation.PreDestroy; @@ -387,7 +388,8 @@ public class ServerNodeManager implements InitializingBean { workerGroup = workerGroup.toLowerCase(); Set nodes = workerGroupNodes.get(workerGroup); if (CollectionUtils.isNotEmpty(nodes)) { - return Collections.unmodifiableSet(nodes); + // avoid ConcurrentModificationException + return Collections.unmodifiableSet(nodes.stream().collect(Collectors.toSet())); } return nodes; } finally { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java new file mode 100644 index 0000000000..81d02a9050 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.registry.RegistryClient; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class FailoverExecuteThread extends Thread { + + private static final Logger logger = LoggerFactory.getLogger(FailoverExecuteThread.class); + + @Autowired + private MasterRegistryClient masterRegistryClient; + + @Autowired + private RegistryClient registryClient; + + @Autowired + private MasterConfig masterConfig; + + /** + * process service + */ + @Autowired + private ProcessService processService; + + @Override + public synchronized void start() { + super.setName("FailoverExecuteThread"); + super.start(); + } + + @Override + public void run() { + while (Stopper.isRunning()) { + logger.info("failover execute started"); + try { + List hosts = processService.queryNeedFailoverProcessInstanceHost(); + if (CollectionUtils.isEmpty(hosts)) { + continue; + } + for (String host : hosts) { + String failoverPath = masterRegistryClient.getFailoverLockPath(NodeType.MASTER, host); + try { + registryClient.getLock(failoverPath); + masterRegistryClient.failoverMaster(host); + } catch (Exception e) { + logger.error("{} server failover failed, host:{}", NodeType.MASTER, host, e); + } finally { + registryClient.releaseLock(failoverPath); + } + } + } catch (Exception e) { + logger.error("failover execute error", e); + } finally { + ThreadUtils.sleep((long) Constants.SLEEP_TIME_MILLIS * masterConfig.getFailoverInterval() * 60); + } + } + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index ca44ead095..33c84b38e8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -234,6 +234,7 @@ public class MasterSchedulerService extends Thread { if (ServerNodeManager.MASTER_SIZE == 0) { return null; } + logger.debug("master size:{}",ServerNodeManager.MASTER_SIZE); List commandList = processService.findCommandPage(ServerNodeManager.MASTER_SIZE, pageNumber); if (commandList.size() == 0) { return null; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 640abb74af..bc6159d9c3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -409,9 +409,10 @@ public class WorkflowExecuteThread implements Runnable { private boolean checkStateEvent(StateEvent stateEvent) { if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) { - logger.error("mismatch process instance id: {}, state event:{}", + logger.error("mismatch process instance id: {}, state event:{}, task instance id:{}", this.processInstance.getId(), - stateEvent.toString()); + stateEvent.toString(), + stateEvent.getTaskInstanceId()); return false; } return true; @@ -482,6 +483,7 @@ public class WorkflowExecuteThread implements Runnable { processDefinition.getGlobalParamList(), CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); processInstance.setStartTime(new Date()); + processInstance.setRestartTime(processInstance.getStartTime()); processInstance.setEndTime(null); processService.saveProcessInstance(processInstance); this.taskInstanceHashMap.clear(); @@ -876,11 +878,11 @@ public class WorkflowExecuteThread implements Runnable { } if (completeTaskList.containsKey(Long.toString(task.getTaskCode()))) { - logger.info("task {} has already run success", task.getName()); + logger.info("task {} has already run success, task id:{}", task.getName(), task.getId()); continue; } if (task.getState().typeIsPause() || task.getState().typeIsCancel()) { - logger.info("task {} stopped, the state is {}", task.getName(), task.getState()); + logger.info("task {} stopped, the state is {}, task id:{}", task.getName(), task.getState(), task.getId()); } else { addTaskToStandByList(task); } @@ -1167,13 +1169,13 @@ public class WorkflowExecuteThread implements Runnable { * @param taskInstance task instance */ private void addTaskToStandByList(TaskInstance taskInstance) { - logger.info("add task to stand by list: {}", taskInstance.getName()); + logger.info("add task to stand by list, task name: {} , task id:{}", taskInstance.getName(), taskInstance.getId()); try { if (!readyToSubmitTaskQueue.contains(taskInstance)) { readyToSubmitTaskQueue.put(taskInstance); } } catch (Exception e) { - logger.error("add task instance to readyToSubmitTaskQueue error, taskName: {}", taskInstance.getName(), e); + logger.error("add task instance to readyToSubmitTaskQueue, taskName: {}, task id: {}", taskInstance.getName(), taskInstance.getId(), e); } } @@ -1253,7 +1255,7 @@ public class WorkflowExecuteThread implements Runnable { TaskInstance retryTask = processService.findTaskInstanceById(task.getId()); if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) { task.setState(retryTask.getState()); - logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName()); + logger.info("task name: {} has been forced success, put it into complete task list and stop retrying, task id:{}", task.getName(), task.getId()); removeTaskFromStandbyList(task); completeTaskList.put(Long.toString(task.getTaskCode()), task); submitPostNode(Long.toString(task.getTaskCode())); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index ee1c548525..23988f9930 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -70,8 +70,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { if (this.taskInstance == null) { return false; } - dispatchTask(taskInstance, processInstance); - return true; + return dispatchTask(taskInstance, processInstance); } @Override @@ -127,7 +126,8 @@ public class CommonTaskProcessor extends BaseTaskProcessor { taskPriority.setTaskExecutionContext(taskExecutionContext); taskUpdateQueue.put(taskPriority); - logger.info(String.format("master submit success, task : %s", taskInstance.getName())); + logger.info("master submit success, task id:{}, task name:{}, process id:{}", + taskInstance.getId(), taskInstance.getName(), taskInstance.getProcessInstanceId()); return true; } catch (Exception e) { logger.error("submit task Exception: ", e); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 66f4e537b5..c66718f21b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -21,7 +21,6 @@ import static org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRI import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.remote.NettyRemotingServer; @@ -29,12 +28,19 @@ import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager; -import org.apache.dolphinscheduler.server.worker.processor.*; +import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor; +import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor; +import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor; +import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; +import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import javax.annotation.PostConstruct; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -45,9 +51,6 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.FilterType; import org.springframework.transaction.annotation.EnableTransactionManagement; -import javax.annotation.PostConstruct; -import java.util.Set; - /** * worker server */ @@ -146,14 +149,14 @@ public class WorkerServer implements IStoppable { try { this.workerRegistryClient.registry(); this.workerRegistryClient.setRegistryStoppable(this); - Set workerZkPaths = this.workerRegistryClient.getWorkerZkPaths(); - - this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP); } catch (Exception e) { - logger.error(e.getMessage(), e); + logger.error("worker registry error", e); throw new RuntimeException(e); } + // solve dead lock + logger.info(org.apache.dolphinscheduler.spi.utils.PropertyUtils.dumpProperties()); + // task execute manager this.workerManagerThread.start(); @@ -194,8 +197,17 @@ public class WorkerServer implements IStoppable { this.workerRegistryClient.unRegistry(); this.alertClientService.close(); this.springApplicationContext.close(); + logger.info("springApplicationContext close"); } catch (Exception e) { logger.error("worker server stop exception ", e); + } finally { + try { + // thread sleep 60 seconds for quietly stop + Thread.sleep(60000L); + } catch (Exception e) { + logger.warn("thread sleep exception ", e); + } + System.exit(1); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index a3feb7777d..57119a703d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -65,6 +65,17 @@ public class WorkerConfig { @Value("${task.plugin.binding:}") private String taskPluginBinding; + @Value("${worker.retry.report.task.statues.interval:10}") + private int retryReportTaskStatusInterval; + + public int getRetryReportTaskStatusInterval() { + return retryReportTaskStatusInterval; + } + + public void setRetryReportTaskStatusInterval(int retryReportTaskStatusInterval) { + this.retryReportTaskStatusInterval = retryReportTaskStatusInterval; + } + public int getListenPort() { return listenPort; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java index a340ad704e..3aac840855 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java @@ -17,17 +17,21 @@ package org.apache.dolphinscheduler.server.worker.processor; -import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.remote.command.*; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import io.netty.channel.Channel; + /** * db task ack processor */ @@ -50,6 +54,7 @@ public class DBTaskAckProcessor implements NettyRequestProcessor { if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){ ResponceCache.get().removeAckCache(taskAckCommand.getTaskInstanceId()); + logger.debug("removeAckCache: taskinstance id:{}", taskAckCommand.getTaskInstanceId()); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java index 97a9cf527a..6da9fdd5c1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.worker.processor; -import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.Command; @@ -25,11 +24,14 @@ import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import io.netty.channel.Channel; + /** * db task response processor */ @@ -51,6 +53,7 @@ public class DBTaskResponseProcessor implements NettyRequestProcessor { if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){ ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId()); + logger.debug("removeResponseCache: taskinstance id:{}", taskResponseCommand.getTaskInstanceId()); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java index 54ec49c0a4..326fcb2bb6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java @@ -20,11 +20,14 @@ package org.apache.dolphinscheduler.server.worker.registry; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; +import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.registry.api.ConnectionState; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -104,6 +107,14 @@ public class WorkerRegistryClient { logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath); } + while (!this.checkNodeExists()) { + ThreadUtils.sleep(SLEEP_TIME_MILLIS); + } + + this.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP); + + registryClient.addConnectionStateListener(this::handleConnectionState); + HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, workerConfig.getWorkerMaxCpuloadAvg(), workerConfig.getWorkerReservedMemory(), @@ -119,6 +130,32 @@ public class WorkerRegistryClient { logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval); } + public void handleConnectionState(ConnectionState state) { + switch (state) { + case CONNECTED: + logger.info("registry connection state is {}", state); + break; + case SUSPENDED: + logger.info("registry connection state is {}, ready to stop myself", state); + registryClient.getStoppable().stop("registry connection state is SUSPENDED, stop myself"); + break; + case RECONNECTED: + logger.info("registry connection state is {}, clean the node info", state); + String address = NetUtils.getAddr(workerConfig.getListenPort()); + Set workerZkPaths = getWorkerZkPaths(); + for (String workerZKPath : workerZkPaths) { + registryClient.persistEphemeral(workerZKPath, ""); + logger.info("worker node : {} reconnect to ZK {} successfully", address, workerZKPath); + } + break; + case DISCONNECTED: + logger.info("registry connection state is {}, ready to stop myself", state); + registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself"); + break; + default: + } + } + /** * remove registry info */ @@ -177,4 +214,11 @@ public class WorkerRegistryClient { registryClient.setStoppable(stoppable); } + public boolean checkNodeExists() { + boolean result = registryClient.checkNodeExists(NetUtils.getHost(), NodeType.WORKER); + if (result) { + logger.info("check worker, node exist success, host:{}", NetUtils.getHost()); + } + return result; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java index b2d00317a5..f52be9dd3b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java @@ -18,18 +18,20 @@ package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.common.thread.Stopper; - import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.Map; - /** * Retry Report Task Status Thread */ @@ -38,10 +40,8 @@ public class RetryReportTaskStatusThread implements Runnable { private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class); - /** - * every 5 minutes - */ - private static long RETRY_REPORT_TASK_STATUS_INTERVAL = 5 * 60 * 1000L; + @Autowired + WorkerConfig workerConfig; /** * task callback service @@ -68,7 +68,7 @@ public class RetryReportTaskStatusThread implements Runnable { while (Stopper.isRunning()){ // sleep 5 minutes - ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL); + ThreadUtils.sleep(workerConfig.getRetryReportTaskStatusInterval() * 1000); try { if (!responceCache.getAckCache().isEmpty()){ diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java index 65d6b89033..71481052cc 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java @@ -114,9 +114,9 @@ public class MasterRegistryClientTest { @Test public void removeNodePathTest() { - masterRegistryClient.removeNodePath("/path", NodeType.MASTER, false); - masterRegistryClient.removeNodePath("/path", NodeType.MASTER, true); + masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, false); + masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, true); //Cannot mock static methods - masterRegistryClient.removeNodePath("/path", NodeType.WORKER, true); + masterRegistryClient.removeWorkerNodePath("/path", NodeType.WORKER, true); } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index df7549cd76..cc968f9e79 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.service.process; +import static java.util.stream.Collectors.toSet; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; @@ -27,8 +28,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; -import static java.util.stream.Collectors.toSet; - import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -579,6 +578,7 @@ public class ProcessService { processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); processInstance.setRecovery(Flag.NO); processInstance.setStartTime(new Date()); + processInstance.setRestartTime(processInstance.getStartTime()); processInstance.setRunTimes(1); processInstance.setMaxTryTimes(0); processInstance.setCommandParam(command.getCommandParam()); @@ -775,6 +775,7 @@ public class ProcessService { processInstance.setScheduleTime(command.getScheduleTime()); } processInstance.setHost(host); + processInstance.setRestartTime(new Date()); ExecutionStatus runStatus = ExecutionStatus.RUNNING_EXECUTION; int runTime = processInstance.getRunTimes(); switch (commandType) { @@ -844,6 +845,7 @@ public class ProcessService { updateTaskInstance(taskInstance); } processInstance.setStartTime(new Date()); + processInstance.setRestartTime(processInstance.getStartTime()); processInstance.setEndTime(null); processInstance.setRunTimes(runTime + 1); initComplementDataParam(processDefinition, processInstance, cmdParam); @@ -1015,6 +1017,7 @@ public class ProcessService { } taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); updateTaskInstance(taskInstance); + logger.debug("update task instance, task instance id:{}", taskInstance.getId()); } /** @@ -1317,9 +1320,6 @@ public class ProcessService { taskInstance.setExecutorId(processInstance.getExecutorId()); taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority()); taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState)); - if (taskInstance.getSubmitTime() == null) { - taskInstance.setSubmitTime(new Date()); - } if (taskInstance.getFirstSubmitTime() == null) { taskInstance.setFirstSubmitTime(taskInstance.getSubmitTime()); } @@ -1435,6 +1435,11 @@ public class ProcessService { } } + public boolean updateHostAndSubmitTimeById(int id, String host, Date date) { + int count = taskInstanceMapper.updateHostAndSubmitTimeById(id, host, date); + return count > 0; + } + /** * insert task instance * @@ -1454,6 +1459,7 @@ public class ProcessService { */ public boolean updateTaskInstance(TaskInstance taskInstance) { int count = taskInstanceMapper.updateById(taskInstance); + logger.debug("updateTaskInstance, task instance id:{}, state;{}", taskInstance.getId(), taskInstance.getState()); return count > 0; } @@ -1691,8 +1697,9 @@ public class ProcessService { * @param executePath executePath * @param logPath logPath * @param taskInstId taskInstId + * @reutrn */ - public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host, + public boolean changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host, String executePath, String logPath, int taskInstId) { @@ -1701,7 +1708,7 @@ public class ProcessService { taskInstance.setHost(host); taskInstance.setExecutePath(executePath); taskInstance.setLogPath(logPath); - saveTaskInstance(taskInstance); + return saveTaskInstance(taskInstance); } /** @@ -1721,8 +1728,9 @@ public class ProcessService { * @param endTime endTime * @param taskInstId taskInstId * @param varPool varPool + * @return */ - public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, + public boolean changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date endTime, int processId, String appIds, @@ -1734,7 +1742,7 @@ public class ProcessService { taskInstance.setEndTime(endTime); taskInstance.setVarPool(varPool); changeOutParam(taskInstance); - saveTaskInstance(taskInstance); + return saveTaskInstance(taskInstance); } /** @@ -1819,6 +1827,10 @@ public class ProcessService { return processInstanceMapper.queryByHostAndStatus(host, stateArray); } + public List queryNeedFailoverProcessInstanceHost() { + return processInstanceMapper.queryNeedFailoverProcessInstanceHost(stateArray); + } + /** * process need failover process instance * diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java index 622206a365..86d73b6268 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java @@ -17,39 +17,6 @@ package org.apache.dolphinscheduler.service.quartz; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.lang.StringUtils; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; -import org.apache.dolphinscheduler.dao.entity.Schedule; -import org.apache.dolphinscheduler.service.exceptions.ServiceException; -import org.quartz.CronTrigger; -import org.quartz.Job; -import org.quartz.JobDetail; -import org.quartz.JobKey; -import org.quartz.Scheduler; -import org.quartz.SchedulerException; -import org.quartz.TriggerKey; -import org.quartz.impl.StdSchedulerFactory; -import org.quartz.impl.jdbcjobstore.JobStoreTX; -import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate; -import org.quartz.impl.jdbcjobstore.StdJDBCDelegate; -import org.quartz.impl.matchers.GroupMatcher; -import org.quartz.simpl.SimpleThreadPool; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import static org.apache.dolphinscheduler.common.Constants.ORG_POSTGRESQL_DRIVER; import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS; import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK; @@ -61,6 +28,7 @@ import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_I import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD; import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX; import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES; +import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT; import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID; import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME; import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON; @@ -70,6 +38,7 @@ import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY; import static org.apache.dolphinscheduler.common.Constants.PROJECT_ID; import static org.apache.dolphinscheduler.common.Constants.QUARTZ_ACQUIRETRIGGERSWITHINLOCK; +import static org.apache.dolphinscheduler.common.Constants.QUARTZ_BATCHTRIGGERACQUISTITIONMAXCOUNT; import static org.apache.dolphinscheduler.common.Constants.QUARTZ_CLUSTERCHECKININTERVAL; import static org.apache.dolphinscheduler.common.Constants.QUARTZ_DATASOURCE; import static org.apache.dolphinscheduler.common.Constants.QUARTZ_INSTANCEID; @@ -91,6 +60,42 @@ import static org.quartz.CronScheduleBuilder.cronSchedule; import static org.quartz.JobBuilder.newJob; import static org.quartz.TriggerBuilder.newTrigger; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.service.exceptions.ServiceException; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.lang.StringUtils; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.quartz.CronTrigger; +import org.quartz.Job; +import org.quartz.JobDetail; +import org.quartz.JobKey; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.TriggerKey; +import org.quartz.impl.StdSchedulerFactory; +import org.quartz.impl.jdbcjobstore.JobStoreTX; +import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate; +import org.quartz.impl.jdbcjobstore.StdJDBCDelegate; +import org.quartz.impl.matchers.GroupMatcher; +import org.quartz.simpl.SimpleThreadPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * single Quartz executors instance */ @@ -170,6 +175,8 @@ public class QuartzExecutors { properties.setProperty(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, conf.getString(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, QUARTZ_ACQUIRETRIGGERSWITHINLOCK)); properties.setProperty(ORG_QUARTZ_JOBSTORE_DATASOURCE, conf.getString(ORG_QUARTZ_JOBSTORE_DATASOURCE, QUARTZ_DATASOURCE)); properties.setProperty(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS, conf.getString(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS, HikariConnectionProvider.class.getName())); + properties.setProperty(ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT, + conf.getString(ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT, QUARTZ_BATCHTRIGGERACQUISTITIONMAXCOUNT)); schedulerFactory.initialize(properties); scheduler = schedulerFactory.getScheduler(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java index 8d630beeb0..d9168d4f7e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java @@ -32,7 +32,7 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue { /** * queue size */ - private static final Integer QUEUE_MAX_SIZE = 3000; + private static final Integer QUEUE_MAX_SIZE = 10000; /** * queue diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java index f384678681..775fc10798 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java @@ -330,7 +330,7 @@ public class RegistryClient { if (serverPath.startsWith(serverType + UNDERLINE + host)) { String server = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + serverPath; remove(server); - logger.info("{} server {} deleted from zk dead server path success", serverType, host); + logger.info("{} server {} deleted from zk dead server path:{} success", serverType, host,server); } } } diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/PropertyUtils.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/PropertyUtils.java index 2b1adffee7..8a4068c51e 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/PropertyUtils.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/PropertyUtils.java @@ -181,4 +181,7 @@ public class PropertyUtils { properties.setProperty(key, value); } + public static String dumpProperties() { + return properties.toString(); + } }