Browse Source

[fix-#11753] send alert error alert data id (#11774) (#12762)

Co-authored-by: fuchanghai <changhai.fu@marketingforce.com>
3.0.2-release
Eric Gao 2 years ago committed by GitHub
parent
commit
ffb8b6ef70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
  3. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  4. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  5. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
  6. 14
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
  7. 19
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
  8. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
  9. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  10. 8
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java

@ -132,7 +132,7 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
@RequestParam(value = "execType", required = false) CommandType execType,
@RequestParam(value = "warningType") WarningType warningType,
@RequestParam(value = "warningGroupId", required = false, defaultValue = "0") Integer warningGroupId,
@RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
@RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@ -216,7 +216,7 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
@RequestParam(value = "execType", required = false) CommandType execType,
@RequestParam(value = "warningType") WarningType warningType,
@RequestParam(value = "warningGroupId", required = false) int warningGroupId,
@RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
@RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -325,7 +325,7 @@ public class PythonGateway {
String cronTime,
String workerGroup,
String warningType,
int warningGroupId,
Integer warningGroupId,
Integer timeout
) {
User user = usersService.queryUser(userName);

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -61,7 +61,7 @@ public interface ExecutorService {
Map<String, Object> execProcessInstance(User loginUser, long projectCode,
long processDefinitionCode, String cronTime, CommandType commandType,
FailureStrategy failureStrategy, String startNodeList,
TaskDependType taskDependType, WarningType warningType, int warningGroupId,
TaskDependType taskDependType, WarningType warningType, Integer warningGroupId,
RunMode runMode,
Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber,

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -160,7 +160,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
public Map<String, Object> execProcessInstance(User loginUser, long projectCode,
long processDefinitionCode, String cronTime, CommandType commandType,
FailureStrategy failureStrategy, String startNodeList,
TaskDependType taskDependType, WarningType warningType, int warningGroupId,
TaskDependType taskDependType, WarningType warningType, Integer warningGroupId,
RunMode runMode,
Priority processInstancePriority, String workerGroup, Long environmentCode,Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber,
@ -613,7 +613,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
private int createCommand(CommandType commandType, long processDefineCode,
TaskDependType nodeDep, FailureStrategy failureStrategy,
String startNodeList, String schedule, WarningType warningType,
int executorId, int warningGroupId,
int executorId, Integer warningGroupId,
RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode,
Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun, ComplementDependentMode complementDependentMode) {

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java

@ -206,7 +206,7 @@ public class ExecutorControllerTest extends AbstractControllerTest {
when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode),
eq(scheduleTime), eq(null), eq(failureStrategy), eq(null), eq(null), eq(warningType),
eq(0), eq(null), eq(null), eq("default"), eq(-1L),
eq(null), eq(null), eq(null), eq("default"), eq(-1L),
eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0),
eq(complementDependentMode))).thenReturn(executeServiceResult);

14
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java

@ -197,7 +197,7 @@ public class ExecutorServiceTest {
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.START_PROCESS,
null, null,
null, null, 0,
null, null, null,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
@ -216,7 +216,7 @@ public class ExecutorServiceTest {
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.START_PROCESS,
null, "n1,n2",
null, null, 0,
null, null, null,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
@ -235,7 +235,7 @@ public class ExecutorServiceTest {
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
null, null, null,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
@ -253,7 +253,7 @@ public class ExecutorServiceTest {
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
null, null, null,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
@ -271,7 +271,7 @@ public class ExecutorServiceTest {
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
null, null, null,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
@ -290,7 +290,7 @@ public class ExecutorServiceTest {
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
null, null, null,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 15, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
@ -306,7 +306,7 @@ public class ExecutorServiceTest {
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
null, null, null,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);

19
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.dolphinscheduler.common.enums.AlertEvent;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
@ -47,6 +48,8 @@ import java.util.Optional;
import java.util.stream.Collectors;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@ -58,6 +61,13 @@ import com.google.common.collect.Lists;
@Component
public class AlertDao {
/**
* logger of AlertDao
*/
private static final Logger logger = LoggerFactory.getLogger(AlertDao.class);
private static final int QUERY_ALERT_THRESHOLD = 100;
@Value("${alert.alarm-suppression.crash:60}")
private Integer crashAlarmSuppression;
@ -80,9 +90,16 @@ public class AlertDao {
* @return add alert result
*/
public int addAlert(Alert alert) {
if (null == alert.getAlertGroupId() || NumberUtils.INTEGER_ZERO.equals(alert.getAlertGroupId())) {
logger.warn("the value of alertGroupId is null or 0 ");
return 0;
}
String sign = generateSign(alert);
alert.setSign(sign);
return alertMapper.insert(alert);
int count = alertMapper.insert(alert);
logger.info("add alert to db , alert: {}", alert);
return count;
}
/**

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java

@ -165,7 +165,7 @@ public class ProcessDefinition {
* warningGroupId
*/
@TableField(exist = false)
private int warningGroupId;
private Integer warningGroupId;
/**
* execution type

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -697,13 +697,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (processDefinition.getExecutionType().typeIsSerialWait()) {
checkSerialProcess(processDefinition);
}
if (processInstance.getState().typeIsWaitingThread()) {
processService.createRecoveryWaitingThreadCommand(null, processInstance);
}
if (processAlertManager.isNeedToSendWarning(processInstance)) {
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
}
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
if (checkTaskQueue()) {
//release task group
processService.releaseAllTaskGroup(processInstance.getId());

8
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java

@ -197,7 +197,6 @@ public class ProcessAlertManager {
alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1 : processInstance.getWarningGroupId());
alert.setAlertType(AlertType.FAULT_TOLERANCE_WARNING);
alertDao.addAlert(alert);
logger.info("add alert to db , alert : {}", alert);
} catch (Exception e) {
logger.error("send alert failed:{} ", e.getMessage());
@ -214,13 +213,10 @@ public class ProcessAlertManager {
public void sendAlertProcessInstance(ProcessInstance processInstance,
List<TaskInstance> taskInstances,
ProjectUser projectUser) {
if (!isNeedToSendWarning(processInstance)) {
return;
}
Alert alert = new Alert();
String cmdName = getCommandCnName(processInstance.getCommandType());
String success = processInstance.getState().typeIsSuccess() ? "success" : "failed";
alert.setTitle(cmdName + " " + success);
@ -234,7 +230,6 @@ public class ProcessAlertManager {
alert.setProcessInstanceId(processInstance.getId());
alert.setAlertType(processInstance.getState().typeIsSuccess() ? AlertType.PROCESS_INSTANCE_SUCCESS : AlertType.PROCESS_INSTANCE_FAILURE);
alertDao.addAlert(alert);
logger.info("add alert to db , alert: {}", alert);
}
/**
@ -297,7 +292,6 @@ public class ProcessAlertManager {
//might need to change to data quality status
alert.setAlertType(processInstance.getState().typeIsSuccess() ? AlertType.PROCESS_INSTANCE_SUCCESS : AlertType.PROCESS_INSTANCE_FAILURE);
alertDao.addAlert(alert);
logger.info("add alert to db , alert: {}", alert);
}
/**
@ -314,7 +308,6 @@ public class ProcessAlertManager {
alert.setProcessInstanceId(processInstance.getId());
alert.setAlertType(AlertType.TASK_FAILURE);
alertDao.addAlert(alert);
logger.info("add alert to db , alert: {}", alert);
}
/**
@ -411,6 +404,5 @@ public class ProcessAlertManager {
alert.setProcessInstanceId(processInstance.getId());
alert.setAlertType(AlertType.PROCESS_INSTANCE_BLOCKED);
alertDao.addAlert(alert);
logger.info("add alert to db, alert: {}",alert);
}
}

Loading…
Cancel
Save