Browse Source

Adapting partial code(file name start with M #1) to the sonar cloud rule (#2173)

* Adapting partial code(file name start with M #1) to the sonar cloud rule

* remove unused import
pull/3/MERGE
gabry.wu 5 years ago committed by gaojun2048
parent
commit
2f4fda7cdb
  1. 52
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java
  2. 14
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java
  3. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
  4. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  5. 61
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  6. 17
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java

52
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java

@ -39,29 +39,29 @@ public class MailUtils {
public static final Logger logger = LoggerFactory.getLogger(MailUtils.class); public static final Logger logger = LoggerFactory.getLogger(MailUtils.class);
public static final String mailProtocol = PropertyUtils.getString(Constants.MAIL_PROTOCOL); public static final String MAIL_PROTOCOL = PropertyUtils.getString(Constants.MAIL_PROTOCOL);
public static final String mailServerHost = PropertyUtils.getString(Constants.MAIL_SERVER_HOST); public static final String MAIL_SERVER_HOST = PropertyUtils.getString(Constants.MAIL_SERVER_HOST);
public static final Integer mailServerPort = PropertyUtils.getInt(Constants.MAIL_SERVER_PORT); public static final Integer MAIL_SERVER_PORT = PropertyUtils.getInt(Constants.MAIL_SERVER_PORT);
public static final String mailSender = PropertyUtils.getString(Constants.MAIL_SENDER); public static final String MAIL_SENDER = PropertyUtils.getString(Constants.MAIL_SENDER);
public static final String mailUser = PropertyUtils.getString(Constants.MAIL_USER); public static final String MAIL_USER = PropertyUtils.getString(Constants.MAIL_USER);
public static final String mailPasswd = PropertyUtils.getString(Constants.MAIL_PASSWD); public static final String MAIL_PASSWD = PropertyUtils.getString(Constants.MAIL_PASSWD);
public static final Boolean mailUseStartTLS = PropertyUtils.getBoolean(Constants.MAIL_SMTP_STARTTLS_ENABLE); public static final Boolean MAIL_USE_START_TLS = PropertyUtils.getBoolean(Constants.MAIL_SMTP_STARTTLS_ENABLE);
public static final Boolean mailUseSSL = PropertyUtils.getBoolean(Constants.MAIL_SMTP_SSL_ENABLE); public static final Boolean MAIL_USE_SSL = PropertyUtils.getBoolean(Constants.MAIL_SMTP_SSL_ENABLE);
public static final String xlsFilePath = PropertyUtils.getString(Constants.XLS_FILE_PATH); public static final String XLS_FILE_PATH = PropertyUtils.getString(Constants.XLS_FILE_PATH);
public static final String starttlsEnable = PropertyUtils.getString(Constants.MAIL_SMTP_STARTTLS_ENABLE); public static final String STARTTLS_ENABLE = PropertyUtils.getString(Constants.MAIL_SMTP_STARTTLS_ENABLE);
public static final String sslEnable = PropertyUtils.getString(Constants.MAIL_SMTP_SSL_ENABLE); public static final String SSL_ENABLE = PropertyUtils.getString(Constants.MAIL_SMTP_SSL_ENABLE);
public static final String sslTrust = PropertyUtils.getString(Constants.MAIL_SMTP_SSL_TRUST); public static final String SSL_TRUST = PropertyUtils.getString(Constants.MAIL_SMTP_SSL_TRUST);
public static final AlertTemplate alertTemplate = AlertTemplateFactory.getMessageTemplate(); public static final AlertTemplate alertTemplate = AlertTemplateFactory.getMessageTemplate();
@ -105,7 +105,7 @@ public class MailUtils {
try { try {
Session session = getSession(); Session session = getSession();
email.setMailSession(session); email.setMailSession(session);
email.setFrom(mailSender); email.setFrom(MAIL_SENDER);
email.setCharset(Constants.UTF_8); email.setCharset(Constants.UTF_8);
if (CollectionUtils.isNotEmpty(receivers)){ if (CollectionUtils.isNotEmpty(receivers)){
// receivers mail // receivers mail
@ -199,10 +199,10 @@ public class MailUtils {
// 2. creating mail: Creating a MimeMessage // 2. creating mail: Creating a MimeMessage
MimeMessage msg = new MimeMessage(session); MimeMessage msg = new MimeMessage(session);
// 3. set sender // 3. set sender
msg.setFrom(new InternetAddress(mailSender)); msg.setFrom(new InternetAddress(MAIL_SENDER));
// 4. set receivers // 4. set receivers
for (String receiver : receivers) { for (String receiver : receivers) {
msg.addRecipients(MimeMessage.RecipientType.TO, InternetAddress.parse(receiver)); msg.addRecipients(Message.RecipientType.TO, InternetAddress.parse(receiver));
} }
return msg; return msg;
} }
@ -213,19 +213,19 @@ public class MailUtils {
*/ */
private static Session getSession() { private static Session getSession() {
Properties props = new Properties(); Properties props = new Properties();
props.setProperty(Constants.MAIL_HOST, mailServerHost); props.setProperty(Constants.MAIL_HOST, MAIL_SERVER_HOST);
props.setProperty(Constants.MAIL_PORT, String.valueOf(mailServerPort)); props.setProperty(Constants.MAIL_PORT, String.valueOf(MAIL_SERVER_PORT));
props.setProperty(Constants.MAIL_SMTP_AUTH, Constants.STRING_TRUE); props.setProperty(Constants.MAIL_SMTP_AUTH, Constants.STRING_TRUE);
props.setProperty(Constants.MAIL_TRANSPORT_PROTOCOL, mailProtocol); props.setProperty(Constants.MAIL_TRANSPORT_PROTOCOL, MAIL_PROTOCOL);
props.setProperty(Constants.MAIL_SMTP_STARTTLS_ENABLE, starttlsEnable); props.setProperty(Constants.MAIL_SMTP_STARTTLS_ENABLE, STARTTLS_ENABLE);
props.setProperty(Constants.MAIL_SMTP_SSL_ENABLE, sslEnable); props.setProperty(Constants.MAIL_SMTP_SSL_ENABLE, SSL_ENABLE);
props.setProperty(Constants.MAIL_SMTP_SSL_TRUST, sslTrust); props.setProperty(Constants.MAIL_SMTP_SSL_TRUST, SSL_TRUST);
Authenticator auth = new Authenticator() { Authenticator auth = new Authenticator() {
@Override @Override
protected PasswordAuthentication getPasswordAuthentication() { protected PasswordAuthentication getPasswordAuthentication() {
// mail username and password // mail username and password
return new PasswordAuthentication(mailUser, mailPasswd); return new PasswordAuthentication(MAIL_USER, MAIL_PASSWD);
} }
}; };
@ -248,12 +248,10 @@ public class MailUtils {
*/ */
if(CollectionUtils.isNotEmpty(receiversCc)){ if(CollectionUtils.isNotEmpty(receiversCc)){
for (String receiverCc : receiversCc){ for (String receiverCc : receiversCc){
msg.addRecipients(MimeMessage.RecipientType.CC, InternetAddress.parse(receiverCc)); msg.addRecipients(Message.RecipientType.CC, InternetAddress.parse(receiverCc));
} }
} }
// set receivers type to cc
// msg.addRecipients(MimeMessage.RecipientType.CC, InternetAddress.parse(propMap.get("${CC}")));
// set subject // set subject
msg.setSubject(title); msg.setSubject(title);
MimeMultipart partList = new MimeMultipart(); MimeMultipart partList = new MimeMultipart();
@ -263,8 +261,8 @@ public class MailUtils {
// set attach file // set attach file
MimeBodyPart part2 = new MimeBodyPart(); MimeBodyPart part2 = new MimeBodyPart();
// make excel file // make excel file
ExcelUtils.genExcelFile(content,title,xlsFilePath); ExcelUtils.genExcelFile(content,title, XLS_FILE_PATH);
File file = new File(xlsFilePath + Constants.SINGLE_SLASH + title + Constants.EXCEL_SUFFIX_XLS); File file = new File(XLS_FILE_PATH + Constants.SINGLE_SLASH + title + Constants.EXCEL_SUFFIX_XLS);
part2.attachFile(file); part2.attachFile(file);
part2.setFileName(MimeUtility.encodeText(title + Constants.EXCEL_SUFFIX_XLS,Constants.UTF_8,"B")); part2.setFileName(MimeUtility.encodeText(title + Constants.EXCEL_SUFFIX_XLS,Constants.UTF_8,"B"));
// add components to collection // add components to collection

14
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java

@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -125,12 +126,15 @@ public class MapreduceParameters extends AbstractParameters {
@Override @Override
public List<String> getResourceFilesList() { public List<String> getResourceFilesList() {
if (resourceList != null) { if(resourceList != null ) {
this.resourceList.add(mainJar); List<String> resourceFiles = resourceList.stream()
return resourceList.stream() .map(ResourceInfo::getRes).collect(Collectors.toList());
.map(p -> p.getRes()).collect(Collectors.toList()); if(mainJar != null) {
resourceFiles.add(mainJar.getRes());
}
return resourceFiles;
} }
return null; return Collections.emptyList();
} }
@Override @Override

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java

@ -506,7 +506,7 @@ public class ProcessInstance {
* check this process is start complement data * check this process is start complement data
* @return whether complement data * @return whether complement data
*/ */
public Boolean isComplementData(){ public boolean isComplementData(){
if(!StringUtils.isNotEmpty(this.historyCmd)){ if(!StringUtils.isNotEmpty(this.historyCmd)){
return false; return false;
} }

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -373,7 +373,7 @@ public class TaskInstance {
} }
public Boolean isSubProcess(){ public boolean isSubProcess(){
return TaskType.SUB_PROCESS.getDescp().equals(this.taskType); return TaskType.SUB_PROCESS.getDescp().equals(this.taskType);
} }
@ -442,7 +442,7 @@ public class TaskInstance {
this.executorName = executorName; this.executorName = executorName;
} }
public Boolean isTaskComplete() { public boolean isTaskComplete() {
return this.getState().typeIsPause() return this.getState().typeIsPause()
|| this.getState().typeIsSuccess() || this.getState().typeIsSuccess()

61
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -68,7 +68,7 @@ public class MasterExecThread implements Runnable {
/** /**
* runing TaskNode * runing TaskNode
*/ */
private final Map<MasterBaseTaskExecThread,Future<Boolean>> activeTaskNode = new ConcurrentHashMap<MasterBaseTaskExecThread,Future<Boolean>>(); private final Map<MasterBaseTaskExecThread,Future<Boolean>> activeTaskNode = new ConcurrentHashMap<>();
/** /**
* task exec service * task exec service
@ -78,7 +78,7 @@ public class MasterExecThread implements Runnable {
/** /**
* submit failure nodes * submit failure nodes
*/ */
private Boolean taskFailedSubmit = false; private boolean taskFailedSubmit = false;
/** /**
* recover node id list * recover node id list
@ -652,7 +652,7 @@ public class MasterExecThread implements Runnable {
continue; continue;
} }
if(task.getState().typeIsPause() || task.getState().typeIsCancel()){ if(task.getState().typeIsPause() || task.getState().typeIsCancel()){
logger.info("task {} stopped, the state is {}", task.getName(), task.getState().toString()); logger.info("task {} stopped, the state is {}", task.getName(), task.getState());
}else{ }else{
addTaskToStandByList(task); addTaskToStandByList(task);
} }
@ -685,11 +685,12 @@ public class MasterExecThread implements Runnable {
} }
ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState();
// conditions task would not return failed. // conditions task would not return failed.
if(depTaskState.typeIsFailure()){ if(depTaskState.typeIsFailure()
if(!haveConditionsAfterNode(depsNode) && !dag.getNode(depsNode).isConditionsTask()){ && !haveConditionsAfterNode(depsNode)
return DependResult.FAILED; && !dag.getNode(depsNode).isConditionsTask()){
} return DependResult.FAILED;
} }
if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){ if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){
return DependResult.WAITING; return DependResult.WAITING;
} }
@ -737,7 +738,7 @@ public class MasterExecThread implements Runnable {
* *
* @return Boolean whether has failed task * @return Boolean whether has failed task
*/ */
private Boolean hasFailedTask(){ private boolean hasFailedTask(){
if(this.taskFailedSubmit){ if(this.taskFailedSubmit){
return true; return true;
@ -753,7 +754,7 @@ public class MasterExecThread implements Runnable {
* *
* @return Boolean whether process instance failed * @return Boolean whether process instance failed
*/ */
private Boolean processFailed(){ private boolean processFailed(){
if(hasFailedTask()) { if(hasFailedTask()) {
if(processInstance.getFailureStrategy() == FailureStrategy.END){ if(processInstance.getFailureStrategy() == FailureStrategy.END){
return true; return true;
@ -769,9 +770,9 @@ public class MasterExecThread implements Runnable {
* whether task for waiting thread * whether task for waiting thread
* @return Boolean whether has waiting thread task * @return Boolean whether has waiting thread task
*/ */
private Boolean hasWaitingThreadTask(){ private boolean hasWaitingThreadTask(){
List<TaskInstance> waitingList = getCompleteTaskByState(ExecutionStatus.WAITTING_THREAD); List<TaskInstance> waitingList = getCompleteTaskByState(ExecutionStatus.WAITTING_THREAD);
return waitingList.size() > 0; return CollectionUtils.isNotEmpty(waitingList);
} }
/** /**
@ -787,7 +788,7 @@ public class MasterExecThread implements Runnable {
} }
List<TaskInstance> pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE); List<TaskInstance> pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE);
if(pauseList.size() > 0 if(CollectionUtils.isNotEmpty(pauseList)
|| !isComplementEnd() || !isComplementEnd()
|| readyToSubmitTaskList.size() > 0){ || readyToSubmitTaskList.size() > 0){
return ExecutionStatus.PAUSE; return ExecutionStatus.PAUSE;
@ -827,7 +828,8 @@ public class MasterExecThread implements Runnable {
if(state == ExecutionStatus.READY_STOP){ if(state == ExecutionStatus.READY_STOP){
List<TaskInstance> stopList = getCompleteTaskByState(ExecutionStatus.STOP); List<TaskInstance> stopList = getCompleteTaskByState(ExecutionStatus.STOP);
List<TaskInstance> killList = getCompleteTaskByState(ExecutionStatus.KILL); List<TaskInstance> killList = getCompleteTaskByState(ExecutionStatus.KILL);
if(stopList.size() > 0 || killList.size() > 0 || !isComplementEnd()){ if(CollectionUtils.isNotEmpty(stopList)
|| CollectionUtils.isNotEmpty(killList) || !isComplementEnd()){
return ExecutionStatus.STOP; return ExecutionStatus.STOP;
}else{ }else{
return ExecutionStatus.SUCCESS; return ExecutionStatus.SUCCESS;
@ -852,7 +854,7 @@ public class MasterExecThread implements Runnable {
* whether complement end * whether complement end
* @return Boolean whether is complement end * @return Boolean whether is complement end
*/ */
private Boolean isComplementEnd() { private boolean isComplementEnd() {
if(!processInstance.isComplementData()){ if(!processInstance.isComplementData()){
return true; return true;
} }
@ -877,8 +879,8 @@ public class MasterExecThread implements Runnable {
logger.info( logger.info(
"work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
processInstance.getId(), processInstance.getName(), processInstance.getId(), processInstance.getName(),
processInstance.getState().toString(), state.toString(), processInstance.getState(), state,
processInstance.getCommandType().toString()); processInstance.getCommandType());
processInstance.setState(state); processInstance.setState(state);
ProcessInstance instance = processService.findProcessInstanceById(processInstance.getId()); ProcessInstance instance = processService.findProcessInstanceById(processInstance.getId());
instance.setState(state); instance.setState(state);
@ -894,8 +896,7 @@ public class MasterExecThread implements Runnable {
* @return DependResult * @return DependResult
*/ */
private DependResult getDependResultForTask(TaskInstance taskInstance){ private DependResult getDependResultForTask(TaskInstance taskInstance){
DependResult inner = isTaskDepsComplete(taskInstance.getName()); return isTaskDepsComplete(taskInstance.getName());
return inner;
} }
/** /**
@ -920,7 +921,7 @@ public class MasterExecThread implements Runnable {
* has retry task in standby * has retry task in standby
* @return Boolean whether has retry task in standby * @return Boolean whether has retry task in standby
*/ */
private Boolean hasRetryTaskInStandBy(){ private boolean hasRetryTaskInStandBy(){
for (Map.Entry<String, TaskInstance> entry: readyToSubmitTaskList.entrySet()) { for (Map.Entry<String, TaskInstance> entry: readyToSubmitTaskList.entrySet()) {
if(entry.getValue().getState().typeIsFailure()){ if(entry.getValue().getState().typeIsFailure()){
return true; return true;
@ -958,7 +959,7 @@ public class MasterExecThread implements Runnable {
continue; continue;
} }
logger.info("task :{}, id:{} complete, state is {} ", logger.info("task :{}, id:{} complete, state is {} ",
task.getName(), task.getId(), task.getState().toString()); task.getName(), task.getId(), task.getState());
// node success , post node submit // node success , post node submit
if(task.getState() == ExecutionStatus.SUCCESS){ if(task.getState() == ExecutionStatus.SUCCESS){
completeTaskList.put(task.getName(), task); completeTaskList.put(task.getName(), task);
@ -990,7 +991,7 @@ public class MasterExecThread implements Runnable {
completeTaskList.put(task.getName(), task); completeTaskList.put(task.getName(), task);
} }
// send alert // send alert
if(this.recoverToleranceFaultTaskList.size() > 0){ if(CollectionUtils.isNotEmpty(this.recoverToleranceFaultTaskList)){
alertManager.sendAlertWorkerToleranceFault(processInstance, recoverToleranceFaultTaskList); alertManager.sendAlertWorkerToleranceFault(processInstance, recoverToleranceFaultTaskList);
this.recoverToleranceFaultTaskList.clear(); this.recoverToleranceFaultTaskList.clear();
} }
@ -1034,10 +1035,7 @@ public class MasterExecThread implements Runnable {
Date now = new Date(); Date now = new Date();
long runningTime = DateUtils.diffMin(now, processInstance.getStartTime()); long runningTime = DateUtils.diffMin(now, processInstance.getStartTime());
if(runningTime > processInstance.getTimeout()){ return runningTime > processInstance.getTimeout();
return true;
}
return false;
} }
/** /**
@ -1081,22 +1079,19 @@ public class MasterExecThread implements Runnable {
* @param taskInstance task instance * @param taskInstance task instance
* @return Boolean * @return Boolean
*/ */
private Boolean retryTaskIntervalOverTime(TaskInstance taskInstance){ private boolean retryTaskIntervalOverTime(TaskInstance taskInstance){
if(taskInstance.getState() != ExecutionStatus.FAILURE){ if(taskInstance.getState() != ExecutionStatus.FAILURE){
return Boolean.TRUE; return true;
} }
if(taskInstance.getId() == 0 || if(taskInstance.getId() == 0 ||
taskInstance.getMaxRetryTimes() ==0 || taskInstance.getMaxRetryTimes() ==0 ||
taskInstance.getRetryInterval() == 0 ){ taskInstance.getRetryInterval() == 0 ){
return Boolean.TRUE; return true;
} }
Date now = new Date(); Date now = new Date();
long failedTimeInterval = DateUtils.differSec(now, taskInstance.getEndTime()); long failedTimeInterval = DateUtils.differSec(now, taskInstance.getEndTime());
// task retry does not over time, return false // task retry does not over time, return false
if(taskInstance.getRetryInterval() * SEC_2_MINUTES_TIME_UNIT >= failedTimeInterval){ return taskInstance.getRetryInterval() * SEC_2_MINUTES_TIME_UNIT < failedTimeInterval;
return Boolean.FALSE;
}
return Boolean.TRUE;
} }
/** /**
@ -1189,7 +1184,7 @@ public class MasterExecThread implements Runnable {
*/ */
private List<String> getRecoveryNodeNameList(){ private List<String> getRecoveryNodeNameList(){
List<String> recoveryNodeNameList = new ArrayList<>(); List<String> recoveryNodeNameList = new ArrayList<>();
if(recoverNodeIdList.size() > 0) { if(CollectionUtils.isNotEmpty(recoverNodeIdList)) {
for (TaskInstance task : recoverNodeIdList) { for (TaskInstance task : recoverNodeIdList) {
recoveryNodeNameList.add(task.getName()); recoveryNodeNameList.add(task.getName());
} }

17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java

@ -122,22 +122,19 @@ public class MapReduceTask extends AbstractYarnTask {
} }
// main class // main class
if(mapreduceParameters.getProgramType() !=null ){ if(!ProgramType.PYTHON.equals(mapreduceParameters.getProgramType())
if(mapreduceParameters.getProgramType()!= ProgramType.PYTHON){ && StringUtils.isNotEmpty(mapreduceParameters.getMainClass())){
if(StringUtils.isNotEmpty(mapreduceParameters.getMainClass())){ result.add(mapreduceParameters.getMainClass());
result.add(mapreduceParameters.getMainClass());
}
}
} }
// others // others
if (StringUtils.isNotEmpty(mapreduceParameters.getOthers())) { if (StringUtils.isNotEmpty(mapreduceParameters.getOthers())) {
String others = mapreduceParameters.getOthers(); String others = mapreduceParameters.getOthers();
if(!others.contains(Constants.MR_QUEUE)){ if (!others.contains(Constants.MR_QUEUE)
if (StringUtils.isNotEmpty(mapreduceParameters.getQueue())) { && StringUtils.isNotEmpty(mapreduceParameters.getQueue())) {
result.add(String.format("%s %s=%s", Constants.D, Constants.MR_QUEUE, mapreduceParameters.getQueue())); result.add(String.format("%s %s=%s", Constants.D, Constants.MR_QUEUE, mapreduceParameters.getQueue()));
}
} }
result.add(mapreduceParameters.getOthers()); result.add(mapreduceParameters.getOthers());
}else if (StringUtils.isNotEmpty(mapreduceParameters.getQueue())) { }else if (StringUtils.isNotEmpty(mapreduceParameters.getQueue())) {
result.add(String.format("%s %s=%s", Constants.D, Constants.MR_QUEUE, mapreduceParameters.getQueue())); result.add(String.format("%s %s=%s", Constants.D, Constants.MR_QUEUE, mapreduceParameters.getQueue()));

Loading…
Cancel
Save