Browse Source

Fix bug: NullPointerException etc. (#1721)

* #1720 Fix bug: NullPointerException etc.

* fix code smell

* Update DateInterval.java

refactor equals method

Co-authored-by: dailidong <dailidong66@gmail.com>
pull/2/head
Jave-Chen 5 years ago committed by dailidong
parent
commit
c522ea7ebb
  1. 5
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java
  2. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  3. 13
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java
  4. 8
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
  5. 9
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

5
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java

@ -101,6 +101,11 @@ public class AlertSender{
}else if (alert.getAlertType() == AlertType.SMS){ }else if (alert.getAlertType() == AlertType.SMS){
retMaps = emailManager.send(getReciversForSMS(users), alert.getTitle(), alert.getContent(),alert.getShowType()); retMaps = emailManager.send(getReciversForSMS(users), alert.getTitle(), alert.getContent(),alert.getShowType());
alert.setInfo(retMaps); alert.setInfo(retMaps);
} else {
logger.error("AlertType is not defined. code: {}, descp: {}",
alert.getAlertType().getCode(),
alert.getAlertType().getDescp());
return;
} }
//send flag //send flag

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -499,6 +499,10 @@ public class ExecutorService extends BaseService{
} }
} }
if ( start == null || end == null) {
return 0;
}
if(commandType == CommandType.COMPLEMENT_DATA){ if(commandType == CommandType.COMPLEMENT_DATA){
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode; runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
if(null != start && null != end && start.before(end)){ if(null != start && null != end && start.before(end)){
@ -547,8 +551,6 @@ public class ExecutorService extends BaseService{
command.setCommandParam(JSONUtils.toJson(cmdParam)); command.setCommandParam(JSONUtils.toJson(cmdParam));
return processDao.createCommand(command); return processDao.createCommand(command);
} }
return 0;
} }
/** /**

13
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java

@ -35,13 +35,12 @@ public class DateInterval {
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
try{ if (this == o) return true;
DateInterval dateInterval = (DateInterval) obj; if (o == null || getClass() != o.getClass()) return false;
return startTime.equals(dateInterval.getStartTime()) && DateInterval that = (DateInterval) o;
endTime.equals(dateInterval.getEndTime()); return startTime.equals(that.startTime) &&
}catch (Exception e){ endTime.equals(that.endTime);
return false;
}
} }
public Date getStartTime() { public Date getStartTime() {

8
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java

@ -119,7 +119,9 @@ public class HadoopUtils implements Closeable {
fsRelatedProps.forEach((key, value) -> configuration.set(key, value)); fsRelatedProps.forEach((key, value) -> configuration.set(key, value));
}else{ }else{
logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS ); logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS );
throw new RuntimeException("property:{} can not to be empty, please set!"); throw new RuntimeException(
String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULTFS)
);
} }
}else{ }else{
logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS); logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS);
@ -219,12 +221,14 @@ public class HadoopUtils implements Closeable {
return null; return null;
} }
FSDataInputStream in = fs.open(new Path(hdfsFilePath)); try (FSDataInputStream in = fs.open(new Path(hdfsFilePath))){
BufferedReader br = new BufferedReader(new InputStreamReader(in)); BufferedReader br = new BufferedReader(new InputStreamReader(in));
Stream<String> stream = br.lines().skip(skipLineNums).limit(limit); Stream<String> stream = br.lines().skip(skipLineNums).limit(limit);
return stream.collect(Collectors.toList()); return stream.collect(Collectors.toList());
} }
}
/** /**
* make the given file and all non-existent parents into * make the given file and all non-existent parents into
* directories. Has the semantics of Unix 'mkdir -p'. * directories. Has the semantics of Unix 'mkdir -p'.

9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -96,7 +96,13 @@ public class DagHelper {
for (String startNodeName : startNodeList) { for (String startNodeName : startNodeList) {
TaskNode startNode = findNodeByName(taskNodeList, startNodeName); TaskNode startNode = findNodeByName(taskNodeList, startNodeName);
List<TaskNode> childNodeList = new ArrayList<>(); List<TaskNode> childNodeList = new ArrayList<>();
if (TaskDependType.TASK_POST == taskDependType) { if (startNode == null) {
logger.error("start node name [{}] is not in task node list [{}] ",
startNodeName,
taskNodeList
);
continue;
} else if (TaskDependType.TASK_POST == taskDependType) {
childNodeList = getFlowNodeListPost(startNode, taskNodeList); childNodeList = getFlowNodeListPost(startNode, taskNodeList);
} else if (TaskDependType.TASK_PRE == taskDependType) { } else if (TaskDependType.TASK_PRE == taskDependType) {
childNodeList = getFlowNodeListPre(startNode, recoveryNodeNameList, taskNodeList); childNodeList = getFlowNodeListPre(startNode, recoveryNodeNameList, taskNodeList);
@ -129,7 +135,6 @@ public class DagHelper {
if (null != depList && null != startNode && depList.contains(startNode.getName())) { if (null != depList && null != startNode && depList.contains(startNode.getName())) {
resultList.addAll(getFlowNodeListPost(taskNode, taskNodeList)); resultList.addAll(getFlowNodeListPost(taskNode, taskNodeList));
} }
} }
resultList.add(startNode); resultList.add(startNode);
return resultList; return resultList;

Loading…
Cancel
Save