From ce4d123347d112946e2f20d21ceae961d9acc5a7 Mon Sep 17 00:00:00 2001 From: baoliang Date: Mon, 9 Nov 2020 17:24:44 +0800 Subject: [PATCH] merge from 1.3.3-release --- .../server/utils/AlertManager.java | 4 ++++ .../server/zk/ZKMasterClient.java | 22 ++++++++++++++----- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java index 58ade83d68..6b8d926c0f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.utils; import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.ShowType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -197,6 +198,9 @@ public class AlertManager { public void sendAlertProcessInstance(ProcessInstance processInstance, List taskInstances) { + if(Flag.YES == processInstance.getIsSubProcess()){ + return; + } boolean sendWarnning = false; WarningType warningType = processInstance.getWarningType(); switch (warningType) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 49bfb5f9a8..f6d4d0d4bb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -83,10 +83,10 @@ public class ZKMasterClient extends AbstractZKClient { // self tolerant if (getActiveMasterNum() == 1) { - failoverWorker(null, true); - failoverMaster(null); + removeZKNodePath(null, ZKNodeType.MASTER, true); + removeZKNodePath(null, ZKNodeType.WORKER, true); } - + registerListener(); } catch (Exception e) { logger.error("master start up exception", e); } finally { @@ -133,9 +133,16 @@ public class ZKMasterClient extends AbstractZKClient { mutex = new InterProcessMutex(getZkClient(), failoverPath); mutex.acquire(); - String serverHost = getHostByEventDataPath(path); - // handle dead server - handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); + String serverHost = null; + if(StringUtils.isNotEmpty(path)){ + serverHost = getHostByEventDataPath(path); + if(StringUtils.isEmpty(serverHost)){ + logger.error("server down error: unknown path: {}", path); + return; + } + // handle dead server + handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); + } //failover server if (failover) { failoverServerWhenDown(serverHost, zkNodeType); @@ -336,8 +343,11 @@ public class ZKMasterClient extends AbstractZKClient { List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); + logger.info("failover process list size:{} ", needFailoverProcessInstanceList.size()); //updateProcessInstance host is null and insert into command for (ProcessInstance processInstance : needFailoverProcessInstanceList) { + logger.info("failover process instance id: {} host:{}", + processInstance.getId(), processInstance.getHost()); if (Constants.NULL.equals(processInstance.getHost())) { continue; }