Browse Source

merge from 1.3.3-release

pull/3/MERGE
baoliang 4 years ago
parent
commit
ce4d123347
  1. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java
  2. 18
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -197,6 +198,9 @@ public class AlertManager {
public void sendAlertProcessInstance(ProcessInstance processInstance,
List<TaskInstance> taskInstances) {
if(Flag.YES == processInstance.getIsSubProcess()){
return;
}
boolean sendWarnning = false;
WarningType warningType = processInstance.getWarningType();
switch (warningType) {

18
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -83,10 +83,10 @@ public class ZKMasterClient extends AbstractZKClient {
// self tolerant
if (getActiveMasterNum() == 1) {
failoverWorker(null, true);
failoverMaster(null);
removeZKNodePath(null, ZKNodeType.MASTER, true);
removeZKNodePath(null, ZKNodeType.WORKER, true);
}
registerListener();
} catch (Exception e) {
logger.error("master start up exception", e);
} finally {
@ -133,9 +133,16 @@ public class ZKMasterClient extends AbstractZKClient {
mutex = new InterProcessMutex(getZkClient(), failoverPath);
mutex.acquire();
String serverHost = getHostByEventDataPath(path);
String serverHost = null;
if(StringUtils.isNotEmpty(path)){
serverHost = getHostByEventDataPath(path);
if(StringUtils.isEmpty(serverHost)){
logger.error("server down error: unknown path: {}", path);
return;
}
// handle dead server
handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
}
//failover server
if (failover) {
failoverServerWhenDown(serverHost, zkNodeType);
@ -336,8 +343,11 @@ public class ZKMasterClient extends AbstractZKClient {
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
logger.info("failover process list size:{} ", needFailoverProcessInstanceList.size());
//updateProcessInstance host is null and insert into command
for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
logger.info("failover process instance id: {} host:{}",
processInstance.getId(), processInstance.getHost());
if (Constants.NULL.equals(processInstance.getHost())) {
continue;
}

Loading…
Cancel
Save