Browse Source
* first commit * 1. sql: sync ddl 2. front-end: change to ternary expression 3. back-end: correct license header in ListenerEvent.java * test case * frontend: remove unnecessary console * fix unit test * remove log depends on user-provided value * fix dolphinscheduler_postgresql.sql * sync database schema * fix unit test * fix unit test * fix some NIT. * extract GLOBAL_ALERT_GROUP_ID into variable * fix ddl bug * add column task_type in t_ds_fav_task in upgrade/3.2.0_schema * add unit test3.2.1-prepare
Wei Xiaonan
1 year ago
committed by
GitHub
75 changed files with 2929 additions and 102 deletions
@ -0,0 +1,263 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.alert.service; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.alert.api.AlertChannel; |
||||||
|
import org.apache.dolphinscheduler.alert.api.AlertData; |
||||||
|
import org.apache.dolphinscheduler.alert.api.AlertInfo; |
||||||
|
import org.apache.dolphinscheduler.alert.api.AlertResult; |
||||||
|
import org.apache.dolphinscheduler.alert.config.AlertConfig; |
||||||
|
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; |
||||||
|
import org.apache.dolphinscheduler.common.constants.Constants; |
||||||
|
import org.apache.dolphinscheduler.common.enums.AlertStatus; |
||||||
|
import org.apache.dolphinscheduler.common.enums.AlertType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.WarningType; |
||||||
|
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; |
||||||
|
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; |
||||||
|
import org.apache.dolphinscheduler.common.thread.ThreadUtils; |
||||||
|
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.AlertSendStatus; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.AbstractListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionCreatedListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionDeletedListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionUpdatedListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ProcessEndListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ProcessFailListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ProcessStartListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ServerDownListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.TaskEndListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.TaskFailListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.TaskStartListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper; |
||||||
|
import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper; |
||||||
|
|
||||||
|
import org.apache.commons.collections4.CollectionUtils; |
||||||
|
import org.apache.curator.shaded.com.google.common.collect.Lists; |
||||||
|
|
||||||
|
import java.util.ArrayList; |
||||||
|
import java.util.Date; |
||||||
|
import java.util.List; |
||||||
|
import java.util.Map; |
||||||
|
import java.util.Optional; |
||||||
|
import java.util.concurrent.CompletableFuture; |
||||||
|
import java.util.concurrent.TimeUnit; |
||||||
|
|
||||||
|
import javax.annotation.Nullable; |
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j; |
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||||
|
import org.springframework.beans.factory.annotation.Value; |
||||||
|
import org.springframework.stereotype.Service; |
||||||
|
|
||||||
|
@Service |
||||||
|
@Slf4j |
||||||
|
public final class ListenerEventPostService extends BaseDaemonThread implements AutoCloseable { |
||||||
|
|
||||||
|
@Value("${alert.query_alert_threshold:100}") |
||||||
|
private Integer QUERY_ALERT_THRESHOLD; |
||||||
|
@Autowired |
||||||
|
private ListenerEventMapper listenerEventMapper; |
||||||
|
@Autowired |
||||||
|
private AlertPluginInstanceMapper alertPluginInstanceMapper; |
||||||
|
@Autowired |
||||||
|
private AlertPluginManager alertPluginManager; |
||||||
|
@Autowired |
||||||
|
private AlertConfig alertConfig; |
||||||
|
|
||||||
|
public ListenerEventPostService() { |
||||||
|
super("ListenerEventPostService"); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void run() { |
||||||
|
log.info("listener event post thread started"); |
||||||
|
while (!ServerLifeCycleManager.isStopped()) { |
||||||
|
try { |
||||||
|
List<ListenerEvent> listenerEvents = listenerEventMapper |
||||||
|
.listingListenerEventByStatus(AlertStatus.WAIT_EXECUTION, QUERY_ALERT_THRESHOLD); |
||||||
|
if (CollectionUtils.isEmpty(listenerEvents)) { |
||||||
|
log.debug("There is no waiting listener events"); |
||||||
|
continue; |
||||||
|
} |
||||||
|
this.send(listenerEvents); |
||||||
|
} catch (Exception e) { |
||||||
|
log.error("listener event post thread meet an exception", e); |
||||||
|
} finally { |
||||||
|
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L); |
||||||
|
} |
||||||
|
} |
||||||
|
log.info("listener event post thread stopped"); |
||||||
|
} |
||||||
|
|
||||||
|
public void send(List<ListenerEvent> listenerEvents) { |
||||||
|
for (ListenerEvent listenerEvent : listenerEvents) { |
||||||
|
int eventId = listenerEvent.getId(); |
||||||
|
List<AlertPluginInstance> globalAlertInstanceList = |
||||||
|
alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList(); |
||||||
|
if (CollectionUtils.isEmpty(globalAlertInstanceList)) { |
||||||
|
log.error("post listener event fail,no bind global plugin instance."); |
||||||
|
listenerEventMapper.updateListenerEvent(eventId, AlertStatus.EXECUTION_FAILURE, |
||||||
|
"no bind plugin instance", new Date()); |
||||||
|
continue; |
||||||
|
} |
||||||
|
AbstractListenerEvent event = generateEventFromContent(listenerEvent); |
||||||
|
if (event == null) { |
||||||
|
log.error("parse listener event to abstract listener event fail.ed {}", listenerEvent.getContent()); |
||||||
|
listenerEventMapper.updateListenerEvent(eventId, AlertStatus.EXECUTION_FAILURE, |
||||||
|
"parse listener event to abstract listener event failed", new Date()); |
||||||
|
continue; |
||||||
|
} |
||||||
|
List<AbstractListenerEvent> events = Lists.newArrayList(event); |
||||||
|
AlertData alertData = AlertData.builder() |
||||||
|
.id(eventId) |
||||||
|
.content(JSONUtils.toJsonString(events)) |
||||||
|
.log(listenerEvent.getLog()) |
||||||
|
.title(event.getTitle()) |
||||||
|
.warnType(WarningType.GLOBAL.getCode()) |
||||||
|
.alertType(event.getEventType().getCode()) |
||||||
|
.build(); |
||||||
|
|
||||||
|
int sendSuccessCount = 0; |
||||||
|
List<AlertSendStatus> failedPostResults = new ArrayList<>(); |
||||||
|
for (AlertPluginInstance instance : globalAlertInstanceList) { |
||||||
|
AlertResult alertResult = this.alertResultHandler(instance, alertData); |
||||||
|
if (alertResult != null) { |
||||||
|
AlertStatus sendStatus = Boolean.parseBoolean(alertResult.getStatus()) |
||||||
|
? AlertStatus.EXECUTION_SUCCESS |
||||||
|
: AlertStatus.EXECUTION_FAILURE; |
||||||
|
if (AlertStatus.EXECUTION_SUCCESS.equals(sendStatus)) { |
||||||
|
sendSuccessCount++; |
||||||
|
} else { |
||||||
|
AlertSendStatus alertSendStatus = AlertSendStatus.builder() |
||||||
|
.alertId(eventId) |
||||||
|
.alertPluginInstanceId(instance.getId()) |
||||||
|
.sendStatus(sendStatus) |
||||||
|
.log(JSONUtils.toJsonString(alertResult)) |
||||||
|
.createTime(new Date()) |
||||||
|
.build(); |
||||||
|
failedPostResults.add(alertSendStatus); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
if (sendSuccessCount == globalAlertInstanceList.size()) { |
||||||
|
listenerEventMapper.deleteById(eventId); |
||||||
|
} else { |
||||||
|
AlertStatus alertStatus = |
||||||
|
sendSuccessCount == 0 ? AlertStatus.EXECUTION_FAILURE : AlertStatus.EXECUTION_PARTIAL_SUCCESS; |
||||||
|
listenerEventMapper.updateListenerEvent(eventId, alertStatus, JSONUtils.toJsonString(failedPostResults), |
||||||
|
new Date()); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* alert result handler |
||||||
|
* |
||||||
|
* @param instance instance |
||||||
|
* @param alertData alertData |
||||||
|
* @return AlertResult |
||||||
|
*/ |
||||||
|
private @Nullable AlertResult alertResultHandler(AlertPluginInstance instance, AlertData alertData) { |
||||||
|
String pluginInstanceName = instance.getInstanceName(); |
||||||
|
int pluginDefineId = instance.getPluginDefineId(); |
||||||
|
Optional<AlertChannel> alertChannelOptional = alertPluginManager.getAlertChannel(instance.getPluginDefineId()); |
||||||
|
if (!alertChannelOptional.isPresent()) { |
||||||
|
String message = |
||||||
|
String.format("Global Alert Plugin %s send error: the channel doesn't exist, pluginDefineId: %s", |
||||||
|
pluginInstanceName, |
||||||
|
pluginDefineId); |
||||||
|
log.error("Global Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginDefineId); |
||||||
|
return new AlertResult("false", message); |
||||||
|
} |
||||||
|
AlertChannel alertChannel = alertChannelOptional.get(); |
||||||
|
|
||||||
|
Map<String, String> paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams()); |
||||||
|
|
||||||
|
AlertInfo alertInfo = AlertInfo.builder() |
||||||
|
.alertData(alertData) |
||||||
|
.alertParams(paramsMap) |
||||||
|
.alertPluginInstanceId(instance.getId()) |
||||||
|
.build(); |
||||||
|
int waitTimeout = alertConfig.getWaitTimeout(); |
||||||
|
try { |
||||||
|
AlertResult alertResult; |
||||||
|
if (waitTimeout <= 0) { |
||||||
|
if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) { |
||||||
|
alertResult = alertChannel.closeAlert(alertInfo); |
||||||
|
} else { |
||||||
|
alertResult = alertChannel.process(alertInfo); |
||||||
|
} |
||||||
|
} else { |
||||||
|
CompletableFuture<AlertResult> future; |
||||||
|
if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) { |
||||||
|
future = CompletableFuture.supplyAsync(() -> alertChannel.closeAlert(alertInfo)); |
||||||
|
} else { |
||||||
|
future = CompletableFuture.supplyAsync(() -> alertChannel.process(alertInfo)); |
||||||
|
} |
||||||
|
alertResult = future.get(waitTimeout, TimeUnit.MILLISECONDS); |
||||||
|
} |
||||||
|
if (alertResult == null) { |
||||||
|
throw new RuntimeException("Alert result cannot be null"); |
||||||
|
} |
||||||
|
return alertResult; |
||||||
|
} catch (InterruptedException e) { |
||||||
|
log.error("post listener event error alert data id :{},", alertData.getId(), e); |
||||||
|
Thread.currentThread().interrupt(); |
||||||
|
return new AlertResult("false", e.getMessage()); |
||||||
|
} catch (Exception e) { |
||||||
|
log.error("post listener event error alert data id :{},", alertData.getId(), e); |
||||||
|
return new AlertResult("false", e.getMessage()); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
private AbstractListenerEvent generateEventFromContent(ListenerEvent listenerEvent) { |
||||||
|
String content = listenerEvent.getContent(); |
||||||
|
switch (listenerEvent.getEventType()) { |
||||||
|
case SERVER_DOWN: |
||||||
|
return JSONUtils.parseObject(content, ServerDownListenerEvent.class); |
||||||
|
case PROCESS_DEFINITION_CREATED: |
||||||
|
return JSONUtils.parseObject(content, ProcessDefinitionCreatedListenerEvent.class); |
||||||
|
case PROCESS_DEFINITION_UPDATED: |
||||||
|
return JSONUtils.parseObject(content, ProcessDefinitionUpdatedListenerEvent.class); |
||||||
|
case PROCESS_DEFINITION_DELETED: |
||||||
|
return JSONUtils.parseObject(content, ProcessDefinitionDeletedListenerEvent.class); |
||||||
|
case PROCESS_START: |
||||||
|
return JSONUtils.parseObject(content, ProcessStartListenerEvent.class); |
||||||
|
case PROCESS_END: |
||||||
|
return JSONUtils.parseObject(content, ProcessEndListenerEvent.class); |
||||||
|
case PROCESS_FAIL: |
||||||
|
return JSONUtils.parseObject(content, ProcessFailListenerEvent.class); |
||||||
|
case TASK_START: |
||||||
|
return JSONUtils.parseObject(content, TaskStartListenerEvent.class); |
||||||
|
case TASK_END: |
||||||
|
return JSONUtils.parseObject(content, TaskEndListenerEvent.class); |
||||||
|
case TASK_FAIL: |
||||||
|
return JSONUtils.parseObject(content, TaskFailListenerEvent.class); |
||||||
|
default: |
||||||
|
return null; |
||||||
|
} |
||||||
|
} |
||||||
|
@Override |
||||||
|
public void close() { |
||||||
|
log.info("Closed ListenerEventPostService..."); |
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -0,0 +1,154 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.alert.runner; |
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock; |
||||||
|
import static org.mockito.Mockito.when; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.alert.api.AlertChannel; |
||||||
|
import org.apache.dolphinscheduler.alert.api.AlertResult; |
||||||
|
import org.apache.dolphinscheduler.alert.config.AlertConfig; |
||||||
|
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; |
||||||
|
import org.apache.dolphinscheduler.alert.service.ListenerEventPostService; |
||||||
|
import org.apache.dolphinscheduler.common.enums.AlertPluginInstanceType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.AlertStatus; |
||||||
|
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||||
|
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ServerDownListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper; |
||||||
|
import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper; |
||||||
|
|
||||||
|
import org.apache.commons.codec.digest.DigestUtils; |
||||||
|
|
||||||
|
import java.util.ArrayList; |
||||||
|
import java.util.Date; |
||||||
|
import java.util.List; |
||||||
|
import java.util.Optional; |
||||||
|
|
||||||
|
import org.junit.jupiter.api.Assertions; |
||||||
|
import org.junit.jupiter.api.BeforeEach; |
||||||
|
import org.junit.jupiter.api.Test; |
||||||
|
import org.mockito.InjectMocks; |
||||||
|
import org.mockito.Mock; |
||||||
|
import org.mockito.Mockito; |
||||||
|
import org.mockito.MockitoAnnotations; |
||||||
|
import org.slf4j.Logger; |
||||||
|
import org.slf4j.LoggerFactory; |
||||||
|
|
||||||
|
public class ListenerEventPostServiceTest { |
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(ListenerEventPostServiceTest.class); |
||||||
|
|
||||||
|
@Mock |
||||||
|
private ListenerEventMapper listenerEventMapper; |
||||||
|
@Mock |
||||||
|
private AlertPluginInstanceMapper alertPluginInstanceMapper; |
||||||
|
@Mock |
||||||
|
private AlertPluginManager alertPluginManager; |
||||||
|
@Mock |
||||||
|
private AlertConfig alertConfig; |
||||||
|
|
||||||
|
@InjectMocks |
||||||
|
private ListenerEventPostService listenerEventPostService; |
||||||
|
|
||||||
|
@BeforeEach |
||||||
|
public void before() { |
||||||
|
MockitoAnnotations.initMocks(this); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testSendServerDownEventSuccess() { |
||||||
|
List<ListenerEvent> events = new ArrayList<>(); |
||||||
|
ServerDownListenerEvent serverDownListenerEvent = new ServerDownListenerEvent(); |
||||||
|
serverDownListenerEvent.setEventTime(new Date()); |
||||||
|
serverDownListenerEvent.setType("WORKER"); |
||||||
|
serverDownListenerEvent.setHost("192.168.*.*"); |
||||||
|
ListenerEvent successEvent = new ListenerEvent(); |
||||||
|
successEvent.setId(1); |
||||||
|
successEvent.setPostStatus(AlertStatus.WAIT_EXECUTION); |
||||||
|
successEvent.setContent(JSONUtils.toJsonString(serverDownListenerEvent)); |
||||||
|
successEvent.setSign(DigestUtils.sha256Hex(successEvent.getContent())); |
||||||
|
successEvent.setEventType(ListenerEventType.SERVER_DOWN); |
||||||
|
successEvent.setCreateTime(new Date()); |
||||||
|
successEvent.setUpdateTime(new Date()); |
||||||
|
events.add(successEvent); |
||||||
|
|
||||||
|
int pluginDefineId = 1; |
||||||
|
String pluginInstanceParams = |
||||||
|
"{\"User\":\"xx\",\"receivers\":\"xx\",\"sender\":\"xx\",\"smtpSslTrust\":\"*\",\"enableSmtpAuth\":\"true\",\"receiverCcs\":null,\"showType\":\"table\",\"starttlsEnable\":\"false\",\"serverPort\":\"25\",\"serverHost\":\"xx\",\"Password\":\"xx\",\"sslEnable\":\"false\"}"; |
||||||
|
String pluginInstanceName = "alert-instance-mail"; |
||||||
|
List<AlertPluginInstance> alertInstanceList = new ArrayList<>(); |
||||||
|
AlertPluginInstance alertPluginInstance = new AlertPluginInstance( |
||||||
|
pluginDefineId, pluginInstanceParams, pluginInstanceName); |
||||||
|
alertPluginInstance.setInstanceType(AlertPluginInstanceType.GLOBAL); |
||||||
|
alertPluginInstance.setId(1); |
||||||
|
alertInstanceList.add(alertPluginInstance); |
||||||
|
when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList()).thenReturn(alertInstanceList); |
||||||
|
|
||||||
|
AlertResult sendResult = new AlertResult(); |
||||||
|
sendResult.setStatus(String.valueOf(true)); |
||||||
|
sendResult.setMessage(String.format("Alert Plugin %s send success", pluginInstanceName)); |
||||||
|
AlertChannel alertChannelMock = mock(AlertChannel.class); |
||||||
|
when(alertChannelMock.process(Mockito.any())).thenReturn(sendResult); |
||||||
|
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock)); |
||||||
|
Assertions.assertTrue(Boolean.parseBoolean(sendResult.getStatus())); |
||||||
|
when(listenerEventMapper.deleteById(1)).thenReturn(1); |
||||||
|
listenerEventPostService.send(events); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testSendServerDownEventFailed() { |
||||||
|
List<ListenerEvent> events = new ArrayList<>(); |
||||||
|
ServerDownListenerEvent serverDownListenerEvent = new ServerDownListenerEvent(); |
||||||
|
serverDownListenerEvent.setEventTime(new Date()); |
||||||
|
serverDownListenerEvent.setType("WORKER"); |
||||||
|
serverDownListenerEvent.setHost("192.168.*.*"); |
||||||
|
ListenerEvent successEvent = new ListenerEvent(); |
||||||
|
successEvent.setId(1); |
||||||
|
successEvent.setPostStatus(AlertStatus.WAIT_EXECUTION); |
||||||
|
successEvent.setContent(JSONUtils.toJsonString(serverDownListenerEvent)); |
||||||
|
successEvent.setSign(DigestUtils.sha1Hex(successEvent.getContent())); |
||||||
|
successEvent.setEventType(ListenerEventType.SERVER_DOWN); |
||||||
|
successEvent.setCreateTime(new Date()); |
||||||
|
successEvent.setUpdateTime(new Date()); |
||||||
|
events.add(successEvent); |
||||||
|
|
||||||
|
int pluginDefineId = 1; |
||||||
|
String pluginInstanceParams = |
||||||
|
"{\"User\":\"xx\",\"receivers\":\"xx\",\"sender\":\"xx\",\"smtpSslTrust\":\"*\",\"enableSmtpAuth\":\"true\",\"receiverCcs\":null,\"showType\":\"table\",\"starttlsEnable\":\"false\",\"serverPort\":\"25\",\"serverHost\":\"xx\",\"Password\":\"xx\",\"sslEnable\":\"false\"}"; |
||||||
|
String pluginInstanceName = "alert-instance-mail"; |
||||||
|
List<AlertPluginInstance> alertInstanceList = new ArrayList<>(); |
||||||
|
AlertPluginInstance alertPluginInstance = new AlertPluginInstance( |
||||||
|
pluginDefineId, pluginInstanceParams, pluginInstanceName); |
||||||
|
alertPluginInstance.setInstanceType(AlertPluginInstanceType.GLOBAL); |
||||||
|
alertPluginInstance.setId(1); |
||||||
|
alertInstanceList.add(alertPluginInstance); |
||||||
|
when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList()).thenReturn(alertInstanceList); |
||||||
|
|
||||||
|
AlertResult sendResult = new AlertResult(); |
||||||
|
sendResult.setStatus(String.valueOf(false)); |
||||||
|
sendResult.setMessage(String.format("Alert Plugin %s send failed", pluginInstanceName)); |
||||||
|
AlertChannel alertChannelMock = mock(AlertChannel.class); |
||||||
|
when(alertChannelMock.process(Mockito.any())).thenReturn(sendResult); |
||||||
|
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock)); |
||||||
|
Assertions.assertFalse(Boolean.parseBoolean(sendResult.getStatus())); |
||||||
|
listenerEventPostService.send(events); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,43 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.common.enums; |
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.EnumValue; |
||||||
|
|
||||||
|
public enum AlertPluginInstanceType { |
||||||
|
|
||||||
|
NORMAL(0, "NORMAL"), |
||||||
|
GLOBAL(1, "GLOBAL"); |
||||||
|
AlertPluginInstanceType(int code, String descp) { |
||||||
|
this.code = code; |
||||||
|
this.descp = descp; |
||||||
|
} |
||||||
|
|
||||||
|
@EnumValue |
||||||
|
private final int code; |
||||||
|
private final String descp; |
||||||
|
|
||||||
|
public int getCode() { |
||||||
|
return code; |
||||||
|
} |
||||||
|
|
||||||
|
public String getDescp() { |
||||||
|
return descp; |
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -0,0 +1,66 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.common.enums; |
||||||
|
|
||||||
|
import java.util.HashMap; |
||||||
|
import java.util.Map; |
||||||
|
|
||||||
|
import lombok.Getter; |
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.EnumValue; |
||||||
|
|
||||||
|
@Getter |
||||||
|
public enum ListenerEventType { |
||||||
|
|
||||||
|
SERVER_DOWN(0, "SERVER_DOWN"), |
||||||
|
PROCESS_DEFINITION_CREATED(1, "PROCESS_DEFINITION_CREATED"), |
||||||
|
PROCESS_DEFINITION_UPDATED(2, "PROCESS_DEFINITION_UPDATED"), |
||||||
|
PROCESS_DEFINITION_DELETED(3, "PROCESS_DEFINITION_DELETED"), |
||||||
|
PROCESS_START(4, "PROCESS_START"), |
||||||
|
PROCESS_END(5, "PROCESS_INSTANCE_END"), |
||||||
|
PROCESS_FAIL(6, "PROCESS_FAIL"), |
||||||
|
TASK_START(10, "TASK_START"), |
||||||
|
TASK_END(11, "TASK_END"), |
||||||
|
TASK_FAIL(12, "TASK_FAIL"); |
||||||
|
|
||||||
|
private static final Map<Integer, ListenerEventType> CODE_MAP = new HashMap<>(); |
||||||
|
|
||||||
|
static { |
||||||
|
for (ListenerEventType listenerEventType : ListenerEventType.values()) { |
||||||
|
CODE_MAP.put(listenerEventType.getCode(), listenerEventType); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
@EnumValue |
||||||
|
private final int code; |
||||||
|
private final String descp; |
||||||
|
|
||||||
|
ListenerEventType(int code, String descp) { |
||||||
|
this.code = code; |
||||||
|
this.descp = descp; |
||||||
|
} |
||||||
|
|
||||||
|
public static ListenerEventType of(int code) { |
||||||
|
ListenerEventType listenerEventType = CODE_MAP.get(code); |
||||||
|
if (listenerEventType == null) { |
||||||
|
throw new IllegalArgumentException(String.format("The task execution status code: %s is invalid", |
||||||
|
code)); |
||||||
|
} |
||||||
|
return listenerEventType; |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,43 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.common.enums; |
||||||
|
|
||||||
|
import org.junit.jupiter.api.Assertions; |
||||||
|
import org.junit.jupiter.api.Test; |
||||||
|
|
||||||
|
public class ListenerEventTypeTest { |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testGetCode() { |
||||||
|
Assertions.assertEquals(0, ListenerEventType.SERVER_DOWN.getCode()); |
||||||
|
Assertions.assertEquals(1, ListenerEventType.PROCESS_DEFINITION_CREATED.getCode()); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testGetDesp() { |
||||||
|
Assertions.assertEquals("PROCESS_DEFINITION_UPDATED", ListenerEventType.PROCESS_DEFINITION_UPDATED.getDescp()); |
||||||
|
Assertions.assertEquals("PROCESS_DEFINITION_DELETED", ListenerEventType.PROCESS_DEFINITION_DELETED.getDescp()); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testGetListenerEventTypeByCode() { |
||||||
|
Assertions.assertEquals(ListenerEventType.PROCESS_START, ListenerEventType.of(4)); |
||||||
|
Assertions.assertNotEquals(ListenerEventType.PROCESS_END, ListenerEventType.of(6)); |
||||||
|
Assertions.assertThrows(IllegalArgumentException.class, () -> ListenerEventType.of(-1)); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,88 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.entity; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.AlertStatus; |
||||||
|
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||||
|
|
||||||
|
import java.util.Date; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Builder; |
||||||
|
import lombok.Data; |
||||||
|
import lombok.NoArgsConstructor; |
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.IdType; |
||||||
|
import com.baomidou.mybatisplus.annotation.TableField; |
||||||
|
import com.baomidou.mybatisplus.annotation.TableId; |
||||||
|
import com.baomidou.mybatisplus.annotation.TableName; |
||||||
|
|
||||||
|
@Data |
||||||
|
@Builder |
||||||
|
@NoArgsConstructor |
||||||
|
@AllArgsConstructor |
||||||
|
@TableName("t_ds_listener_event") |
||||||
|
public class ListenerEvent { |
||||||
|
|
||||||
|
/** |
||||||
|
* primary key |
||||||
|
*/ |
||||||
|
@TableId(value = "id", type = IdType.AUTO) |
||||||
|
private Integer id; |
||||||
|
|
||||||
|
/** |
||||||
|
* content |
||||||
|
*/ |
||||||
|
@TableField(value = "content") |
||||||
|
private String content; |
||||||
|
|
||||||
|
/** |
||||||
|
* sign |
||||||
|
*/ |
||||||
|
@TableField(value = "sign") |
||||||
|
private String sign; |
||||||
|
|
||||||
|
/** |
||||||
|
* alert_status |
||||||
|
*/ |
||||||
|
@TableField(value = "event_type") |
||||||
|
private ListenerEventType eventType; |
||||||
|
|
||||||
|
/** |
||||||
|
* post_status |
||||||
|
*/ |
||||||
|
@TableField("post_status") |
||||||
|
private AlertStatus postStatus; |
||||||
|
|
||||||
|
/** |
||||||
|
* log |
||||||
|
*/ |
||||||
|
@TableField("log") |
||||||
|
private String log; |
||||||
|
/** |
||||||
|
* create_time |
||||||
|
*/ |
||||||
|
@TableField("create_time") |
||||||
|
private Date createTime; |
||||||
|
|
||||||
|
/** |
||||||
|
* update_time |
||||||
|
*/ |
||||||
|
@TableField("update_time") |
||||||
|
private Date updateTime; |
||||||
|
} |
@ -0,0 +1,27 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.entity.event; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||||
|
|
||||||
|
public interface AbstractListenerEvent { |
||||||
|
|
||||||
|
ListenerEventType getEventType(); |
||||||
|
|
||||||
|
String getTitle(); |
||||||
|
} |
@ -0,0 +1,195 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.entity.event; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.Flag; |
||||||
|
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; |
||||||
|
import org.apache.dolphinscheduler.common.enums.ReleaseState; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; |
||||||
|
import org.apache.dolphinscheduler.plugin.task.api.model.Property; |
||||||
|
|
||||||
|
import java.util.Date; |
||||||
|
import java.util.List; |
||||||
|
import java.util.Map; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Data; |
||||||
|
import lombok.NoArgsConstructor; |
||||||
|
|
||||||
|
@Data |
||||||
|
@AllArgsConstructor |
||||||
|
@NoArgsConstructor |
||||||
|
public class ProcessDefinitionCreatedListenerEvent implements AbstractListenerEvent { |
||||||
|
|
||||||
|
/** |
||||||
|
* id |
||||||
|
*/ |
||||||
|
private Integer id; |
||||||
|
|
||||||
|
/** |
||||||
|
* code |
||||||
|
*/ |
||||||
|
private long code; |
||||||
|
|
||||||
|
/** |
||||||
|
* name |
||||||
|
*/ |
||||||
|
private String name; |
||||||
|
|
||||||
|
/** |
||||||
|
* version |
||||||
|
*/ |
||||||
|
private int version; |
||||||
|
|
||||||
|
/** |
||||||
|
* release state : online/offline |
||||||
|
*/ |
||||||
|
private ReleaseState releaseState; |
||||||
|
|
||||||
|
/** |
||||||
|
* project code |
||||||
|
*/ |
||||||
|
private long projectCode; |
||||||
|
|
||||||
|
/** |
||||||
|
* description |
||||||
|
*/ |
||||||
|
private String description; |
||||||
|
|
||||||
|
/** |
||||||
|
* user defined parameters |
||||||
|
*/ |
||||||
|
private String globalParams; |
||||||
|
|
||||||
|
/** |
||||||
|
* user defined parameter list |
||||||
|
*/ |
||||||
|
private List<Property> globalParamList; |
||||||
|
|
||||||
|
/** |
||||||
|
* user define parameter map |
||||||
|
*/ |
||||||
|
private Map<String, String> globalParamMap; |
||||||
|
|
||||||
|
/** |
||||||
|
* create time |
||||||
|
*/ |
||||||
|
private Date createTime; |
||||||
|
|
||||||
|
/** |
||||||
|
* update time |
||||||
|
*/ |
||||||
|
private Date updateTime; |
||||||
|
|
||||||
|
/** |
||||||
|
* process is valid: yes/no |
||||||
|
*/ |
||||||
|
private Flag flag; |
||||||
|
|
||||||
|
/** |
||||||
|
* process user id |
||||||
|
*/ |
||||||
|
private int userId; |
||||||
|
|
||||||
|
/** |
||||||
|
* create user name |
||||||
|
*/ |
||||||
|
private String userName; |
||||||
|
|
||||||
|
/** |
||||||
|
* project name |
||||||
|
*/ |
||||||
|
private String projectName; |
||||||
|
|
||||||
|
/** |
||||||
|
* locations array for web |
||||||
|
*/ |
||||||
|
private String locations; |
||||||
|
|
||||||
|
/** |
||||||
|
* schedule release state : online/offline |
||||||
|
*/ |
||||||
|
private ReleaseState scheduleReleaseState; |
||||||
|
|
||||||
|
/** |
||||||
|
* process warning time out. unit: minute |
||||||
|
*/ |
||||||
|
private int timeout; |
||||||
|
|
||||||
|
/** |
||||||
|
* modify user name |
||||||
|
*/ |
||||||
|
private String modifyBy; |
||||||
|
|
||||||
|
/** |
||||||
|
* warningGroupId |
||||||
|
*/ |
||||||
|
private Integer warningGroupId; |
||||||
|
|
||||||
|
/** |
||||||
|
* execution type |
||||||
|
*/ |
||||||
|
private ProcessExecutionTypeEnum executionType; |
||||||
|
|
||||||
|
/** |
||||||
|
* task definitions |
||||||
|
*/ |
||||||
|
List<TaskDefinitionLog> taskDefinitionLogs; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
List<ProcessTaskRelationLog> taskRelationList; |
||||||
|
|
||||||
|
public ProcessDefinitionCreatedListenerEvent(ProcessDefinition processDefinition) { |
||||||
|
this.setId(processDefinition.getId()); |
||||||
|
this.setCode(processDefinition.getCode()); |
||||||
|
this.setName(processDefinition.getName()); |
||||||
|
this.setVersion(processDefinition.getVersion()); |
||||||
|
this.setReleaseState(processDefinition.getReleaseState()); |
||||||
|
this.setProjectCode(processDefinition.getProjectCode()); |
||||||
|
this.setDescription(processDefinition.getDescription()); |
||||||
|
this.setGlobalParams(processDefinition.getGlobalParams()); |
||||||
|
this.setGlobalParamList(processDefinition.getGlobalParamList()); |
||||||
|
this.setGlobalParamMap(processDefinition.getGlobalParamMap()); |
||||||
|
this.setCreateTime(processDefinition.getCreateTime()); |
||||||
|
this.setUpdateTime(processDefinition.getUpdateTime()); |
||||||
|
this.setFlag(processDefinition.getFlag()); |
||||||
|
this.setUserId(processDefinition.getUserId()); |
||||||
|
this.setUserName(processDefinition.getUserName()); |
||||||
|
this.setProjectName(processDefinition.getProjectName()); |
||||||
|
this.setLocations(processDefinition.getLocations()); |
||||||
|
this.setScheduleReleaseState(processDefinition.getScheduleReleaseState()); |
||||||
|
this.setTimeout(processDefinition.getTimeout()); |
||||||
|
this.setModifyBy(processDefinition.getModifyBy()); |
||||||
|
this.setWarningGroupId(processDefinition.getWarningGroupId()); |
||||||
|
this.setExecutionType(processDefinition.getExecutionType()); |
||||||
|
} |
||||||
|
@Override |
||||||
|
public ListenerEventType getEventType() { |
||||||
|
return ListenerEventType.PROCESS_DEFINITION_CREATED; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public String getTitle() { |
||||||
|
return String.format("process definition created:%s", this.name); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,52 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.entity.event; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||||
|
|
||||||
|
import java.util.Date; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Data; |
||||||
|
import lombok.NoArgsConstructor; |
||||||
|
|
||||||
|
@Data |
||||||
|
@AllArgsConstructor |
||||||
|
@NoArgsConstructor |
||||||
|
public class ProcessDefinitionDeletedListenerEvent implements AbstractListenerEvent { |
||||||
|
|
||||||
|
private Integer projectId; |
||||||
|
private Long projectCode; |
||||||
|
private String projectName; |
||||||
|
private String owner; |
||||||
|
private Integer id; |
||||||
|
private Long code; |
||||||
|
private String name; |
||||||
|
private Integer userId; |
||||||
|
private String modifiedBy; |
||||||
|
private Date eventTime; |
||||||
|
@Override |
||||||
|
public ListenerEventType getEventType() { |
||||||
|
return ListenerEventType.PROCESS_DEFINITION_DELETED; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public String getTitle() { |
||||||
|
return String.format("process definition deleted:%s", this.name); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,195 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.entity.event; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.Flag; |
||||||
|
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; |
||||||
|
import org.apache.dolphinscheduler.common.enums.ReleaseState; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; |
||||||
|
import org.apache.dolphinscheduler.plugin.task.api.model.Property; |
||||||
|
|
||||||
|
import java.util.Date; |
||||||
|
import java.util.List; |
||||||
|
import java.util.Map; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Data; |
||||||
|
import lombok.NoArgsConstructor; |
||||||
|
|
||||||
|
@Data |
||||||
|
@AllArgsConstructor |
||||||
|
@NoArgsConstructor |
||||||
|
public class ProcessDefinitionUpdatedListenerEvent implements AbstractListenerEvent { |
||||||
|
|
||||||
|
/** |
||||||
|
* id |
||||||
|
*/ |
||||||
|
private Integer id; |
||||||
|
|
||||||
|
/** |
||||||
|
* code |
||||||
|
*/ |
||||||
|
private long code; |
||||||
|
|
||||||
|
/** |
||||||
|
* name |
||||||
|
*/ |
||||||
|
private String name; |
||||||
|
|
||||||
|
/** |
||||||
|
* version |
||||||
|
*/ |
||||||
|
private int version; |
||||||
|
|
||||||
|
/** |
||||||
|
* release state : online/offline |
||||||
|
*/ |
||||||
|
private ReleaseState releaseState; |
||||||
|
|
||||||
|
/** |
||||||
|
* project code |
||||||
|
*/ |
||||||
|
private long projectCode; |
||||||
|
|
||||||
|
/** |
||||||
|
* description |
||||||
|
*/ |
||||||
|
private String description; |
||||||
|
|
||||||
|
/** |
||||||
|
* user defined parameters |
||||||
|
*/ |
||||||
|
private String globalParams; |
||||||
|
|
||||||
|
/** |
||||||
|
* user defined parameter list |
||||||
|
*/ |
||||||
|
private List<Property> globalParamList; |
||||||
|
|
||||||
|
/** |
||||||
|
* user define parameter map |
||||||
|
*/ |
||||||
|
private Map<String, String> globalParamMap; |
||||||
|
|
||||||
|
/** |
||||||
|
* create time |
||||||
|
*/ |
||||||
|
private Date createTime; |
||||||
|
|
||||||
|
/** |
||||||
|
* update time |
||||||
|
*/ |
||||||
|
private Date updateTime; |
||||||
|
|
||||||
|
/** |
||||||
|
* process is valid: yes/no |
||||||
|
*/ |
||||||
|
private Flag flag; |
||||||
|
|
||||||
|
/** |
||||||
|
* process user id |
||||||
|
*/ |
||||||
|
private int userId; |
||||||
|
|
||||||
|
/** |
||||||
|
* create user name |
||||||
|
*/ |
||||||
|
private String userName; |
||||||
|
|
||||||
|
/** |
||||||
|
* project name |
||||||
|
*/ |
||||||
|
private String projectName; |
||||||
|
|
||||||
|
/** |
||||||
|
* locations array for web |
||||||
|
*/ |
||||||
|
private String locations; |
||||||
|
|
||||||
|
/** |
||||||
|
* schedule release state : online/offline |
||||||
|
*/ |
||||||
|
private ReleaseState scheduleReleaseState; |
||||||
|
|
||||||
|
/** |
||||||
|
* process warning time out. unit: minute |
||||||
|
*/ |
||||||
|
private int timeout; |
||||||
|
|
||||||
|
/** |
||||||
|
* modify user name |
||||||
|
*/ |
||||||
|
private String modifyBy; |
||||||
|
|
||||||
|
/** |
||||||
|
* warningGroupId |
||||||
|
*/ |
||||||
|
private Integer warningGroupId; |
||||||
|
|
||||||
|
/** |
||||||
|
* execution type |
||||||
|
*/ |
||||||
|
private ProcessExecutionTypeEnum executionType; |
||||||
|
|
||||||
|
/** |
||||||
|
* task definitions |
||||||
|
*/ |
||||||
|
List<TaskDefinitionLog> taskDefinitionLogs; |
||||||
|
|
||||||
|
/** |
||||||
|
* |
||||||
|
*/ |
||||||
|
List<ProcessTaskRelationLog> taskRelationList; |
||||||
|
|
||||||
|
public ProcessDefinitionUpdatedListenerEvent(ProcessDefinition processDefinition) { |
||||||
|
this.setId(processDefinition.getId()); |
||||||
|
this.setCode(processDefinition.getCode()); |
||||||
|
this.setName(processDefinition.getName()); |
||||||
|
this.setVersion(processDefinition.getVersion()); |
||||||
|
this.setReleaseState(processDefinition.getReleaseState()); |
||||||
|
this.setProjectCode(processDefinition.getProjectCode()); |
||||||
|
this.setDescription(processDefinition.getDescription()); |
||||||
|
this.setGlobalParams(processDefinition.getGlobalParams()); |
||||||
|
this.setGlobalParamList(processDefinition.getGlobalParamList()); |
||||||
|
this.setGlobalParamMap(processDefinition.getGlobalParamMap()); |
||||||
|
this.setCreateTime(processDefinition.getCreateTime()); |
||||||
|
this.setUpdateTime(processDefinition.getUpdateTime()); |
||||||
|
this.setFlag(processDefinition.getFlag()); |
||||||
|
this.setUserId(processDefinition.getUserId()); |
||||||
|
this.setUserName(processDefinition.getUserName()); |
||||||
|
this.setProjectName(processDefinition.getProjectName()); |
||||||
|
this.setLocations(processDefinition.getLocations()); |
||||||
|
this.setScheduleReleaseState(processDefinition.getScheduleReleaseState()); |
||||||
|
this.setTimeout(processDefinition.getTimeout()); |
||||||
|
this.setModifyBy(processDefinition.getModifyBy()); |
||||||
|
this.setWarningGroupId(processDefinition.getWarningGroupId()); |
||||||
|
this.setExecutionType(processDefinition.getExecutionType()); |
||||||
|
} |
||||||
|
@Override |
||||||
|
public ListenerEventType getEventType() { |
||||||
|
return ListenerEventType.PROCESS_DEFINITION_UPDATED; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public String getTitle() { |
||||||
|
return String.format("process definition updated:%s", this.name); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,59 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.entity.event; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.CommandType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.Flag; |
||||||
|
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; |
||||||
|
|
||||||
|
import java.util.Date; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Data; |
||||||
|
import lombok.NoArgsConstructor; |
||||||
|
|
||||||
|
@Data |
||||||
|
@AllArgsConstructor |
||||||
|
@NoArgsConstructor |
||||||
|
public class ProcessEndListenerEvent implements AbstractListenerEvent { |
||||||
|
|
||||||
|
private Long projectCode; |
||||||
|
private String projectName; |
||||||
|
private String owner; |
||||||
|
private Integer processId; |
||||||
|
private Long processDefinitionCode; |
||||||
|
private String processName; |
||||||
|
private CommandType processType; |
||||||
|
private WorkflowExecutionStatus processState; |
||||||
|
private Flag recovery; |
||||||
|
private Integer runTimes; |
||||||
|
private Date processStartTime; |
||||||
|
private Date processEndTime; |
||||||
|
private String processHost; |
||||||
|
|
||||||
|
@Override |
||||||
|
public ListenerEventType getEventType() { |
||||||
|
return ListenerEventType.PROCESS_END; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public String getTitle() { |
||||||
|
return String.format("process end: %s", processName); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,59 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.entity.event; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.CommandType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.Flag; |
||||||
|
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; |
||||||
|
|
||||||
|
import java.util.Date; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Data; |
||||||
|
import lombok.NoArgsConstructor; |
||||||
|
|
||||||
|
@Data |
||||||
|
@AllArgsConstructor |
||||||
|
@NoArgsConstructor |
||||||
|
public class ProcessFailListenerEvent implements AbstractListenerEvent { |
||||||
|
|
||||||
|
private Long projectCode; |
||||||
|
private String projectName; |
||||||
|
private String owner; |
||||||
|
private Integer processId; |
||||||
|
private Long processDefinitionCode; |
||||||
|
private String processName; |
||||||
|
private CommandType processType; |
||||||
|
private WorkflowExecutionStatus processState; |
||||||
|
private Flag recovery; |
||||||
|
private Integer runTimes; |
||||||
|
private Date processStartTime; |
||||||
|
private Date processEndTime; |
||||||
|
private String processHost; |
||||||
|
|
||||||
|
@Override |
||||||
|
public ListenerEventType getEventType() { |
||||||
|
return ListenerEventType.PROCESS_FAIL; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public String getTitle() { |
||||||
|
return String.format("process fail: %s", processName); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,58 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.entity.event; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.CommandType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.Flag; |
||||||
|
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; |
||||||
|
|
||||||
|
import java.util.Date; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Data; |
||||||
|
import lombok.NoArgsConstructor; |
||||||
|
|
||||||
|
@Data |
||||||
|
@AllArgsConstructor |
||||||
|
@NoArgsConstructor |
||||||
|
public class ProcessStartListenerEvent implements AbstractListenerEvent { |
||||||
|
|
||||||
|
private Long projectCode; |
||||||
|
private String projectName; |
||||||
|
private String owner; |
||||||
|
private Long processDefinitionCode; |
||||||
|
private String processDefinitionName; |
||||||
|
private Integer processId; |
||||||
|
private String processName; |
||||||
|
private CommandType processType; |
||||||
|
private WorkflowExecutionStatus processState; |
||||||
|
private Integer runTimes; |
||||||
|
private Flag recovery; |
||||||
|
private Date processStartTime; |
||||||
|
|
||||||
|
@Override |
||||||
|
public ListenerEventType getEventType() { |
||||||
|
return ListenerEventType.PROCESS_START; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public String getTitle() { |
||||||
|
return String.format("process start: %s", processName); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,45 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.entity.event; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||||
|
|
||||||
|
import java.util.Date; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Data; |
||||||
|
import lombok.NoArgsConstructor; |
||||||
|
|
||||||
|
@Data |
||||||
|
@AllArgsConstructor |
||||||
|
@NoArgsConstructor |
||||||
|
public class ServerDownListenerEvent implements AbstractListenerEvent { |
||||||
|
|
||||||
|
private String type; |
||||||
|
private String host; |
||||||
|
private Date eventTime; |
||||||
|
@Override |
||||||
|
public ListenerEventType getEventType() { |
||||||
|
return ListenerEventType.SERVER_DOWN; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public String getTitle() { |
||||||
|
return String.format("%s server down: %s", type, host); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,59 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.entity.event; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||||
|
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; |
||||||
|
|
||||||
|
import java.util.Date; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Data; |
||||||
|
import lombok.NoArgsConstructor; |
||||||
|
|
||||||
|
@Data |
||||||
|
@AllArgsConstructor |
||||||
|
@NoArgsConstructor |
||||||
|
public class TaskEndListenerEvent implements AbstractListenerEvent { |
||||||
|
|
||||||
|
private long projectCode; |
||||||
|
private String projectName; |
||||||
|
private String owner; |
||||||
|
private long processId; |
||||||
|
private long processDefinitionCode; |
||||||
|
private String processName; |
||||||
|
private int taskInstanceId; |
||||||
|
private long taskCode; |
||||||
|
private String taskName; |
||||||
|
private String taskType; |
||||||
|
private TaskExecutionStatus taskState; |
||||||
|
private Date taskStartTime; |
||||||
|
private Date taskEndTime; |
||||||
|
private String taskHost; |
||||||
|
private String logPath; |
||||||
|
|
||||||
|
@Override |
||||||
|
public ListenerEventType getEventType() { |
||||||
|
return ListenerEventType.TASK_END; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public String getTitle() { |
||||||
|
return String.format("task end: %s", taskName); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,59 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.entity.event; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||||
|
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; |
||||||
|
|
||||||
|
import java.util.Date; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Data; |
||||||
|
import lombok.NoArgsConstructor; |
||||||
|
|
||||||
|
@Data |
||||||
|
@AllArgsConstructor |
||||||
|
@NoArgsConstructor |
||||||
|
public class TaskFailListenerEvent implements AbstractListenerEvent { |
||||||
|
|
||||||
|
private long projectCode; |
||||||
|
private String projectName; |
||||||
|
private String owner; |
||||||
|
private long processId; |
||||||
|
private long processDefinitionCode; |
||||||
|
private String processName; |
||||||
|
private int taskInstanceId; |
||||||
|
private long taskCode; |
||||||
|
private String taskName; |
||||||
|
private String taskType; |
||||||
|
private TaskExecutionStatus taskState; |
||||||
|
private Date taskStartTime; |
||||||
|
private Date taskEndTime; |
||||||
|
private String taskHost; |
||||||
|
private String logPath; |
||||||
|
|
||||||
|
@Override |
||||||
|
public ListenerEventType getEventType() { |
||||||
|
return ListenerEventType.TASK_FAIL; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public String getTitle() { |
||||||
|
return String.format("task fail: %s", taskName); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,59 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.entity.event; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||||
|
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; |
||||||
|
|
||||||
|
import java.util.Date; |
||||||
|
|
||||||
|
import lombok.AllArgsConstructor; |
||||||
|
import lombok.Data; |
||||||
|
import lombok.NoArgsConstructor; |
||||||
|
|
||||||
|
@Data |
||||||
|
@AllArgsConstructor |
||||||
|
@NoArgsConstructor |
||||||
|
public class TaskStartListenerEvent implements AbstractListenerEvent { |
||||||
|
|
||||||
|
private long projectCode; |
||||||
|
private String projectName; |
||||||
|
private String owner; |
||||||
|
private long processId; |
||||||
|
private long processDefinitionCode; |
||||||
|
private String processName; |
||||||
|
private int taskInstanceId; |
||||||
|
private long taskCode; |
||||||
|
private String taskName; |
||||||
|
private String taskType; |
||||||
|
private TaskExecutionStatus taskState; |
||||||
|
private Date taskStartTime; |
||||||
|
private Date taskEndTime; |
||||||
|
private String taskHost; |
||||||
|
private String logPath; |
||||||
|
|
||||||
|
@Override |
||||||
|
public ListenerEventType getEventType() { |
||||||
|
return ListenerEventType.TASK_START; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public String getTitle() { |
||||||
|
return String.format("task start: %s", taskName); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,42 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.mapper; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.AlertStatus; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ListenerEvent; |
||||||
|
|
||||||
|
import org.apache.ibatis.annotations.Param; |
||||||
|
|
||||||
|
import java.util.Date; |
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||||
|
|
||||||
|
public interface ListenerEventMapper extends BaseMapper<ListenerEvent> { |
||||||
|
|
||||||
|
int batchInsert(@Param("events") List<ListenerEvent> events); |
||||||
|
|
||||||
|
void insertServerDownEvent(@Param("event") ListenerEvent event, |
||||||
|
@Param("crashAlarmSuppressionStartTime") Date crashAlarmSuppressionStartTime); |
||||||
|
|
||||||
|
List<ListenerEvent> listingListenerEventByStatus(@Param("postStatus") AlertStatus postStatus, |
||||||
|
@Param("limit") int limit); |
||||||
|
|
||||||
|
void updateListenerEvent(@Param("eventId") int eventId, @Param("postStatus") AlertStatus postStatus, |
||||||
|
@Param("log") String log, @Param("updateTime") Date updateTime); |
||||||
|
} |
@ -0,0 +1,66 @@ |
|||||||
|
<?xml version="1.0" encoding="UTF-8" ?> |
||||||
|
<!-- |
||||||
|
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
~ contributor license agreements. See the NOTICE file distributed with |
||||||
|
~ this work for additional information regarding copyright ownership. |
||||||
|
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
~ (the "License"); you may not use this file except in compliance with |
||||||
|
~ the License. You may obtain a copy of the License at |
||||||
|
~ |
||||||
|
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
~ |
||||||
|
~ Unless required by applicable law or agreed to in writing, software |
||||||
|
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
~ See the License for the specific language governing permissions and |
||||||
|
~ limitations under the License. |
||||||
|
--> |
||||||
|
|
||||||
|
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > |
||||||
|
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper"> |
||||||
|
|
||||||
|
<sql id="baseSql"> |
||||||
|
id |
||||||
|
, content, sign, event_type, post_status, log, create_time, update_time |
||||||
|
</sql> |
||||||
|
|
||||||
|
<insert id="batchInsert"> |
||||||
|
insert into t_ds_listener_event ( content, sign, post_status, event_type, create_time, update_time) |
||||||
|
values |
||||||
|
<foreach collection='events' item='event' separator=','> |
||||||
|
(#{event.content},#{event.sign},#{event.postStatus.code}, |
||||||
|
#{event.eventType.code},#{event.createTime},#{event.updateTime}) |
||||||
|
</foreach> |
||||||
|
</insert> |
||||||
|
|
||||||
|
<insert id="insertServerDownEvent"> |
||||||
|
insert into t_ds_listener_event (content, sign, post_status, event_type, create_time, update_time) |
||||||
|
SELECT #{event.sign}, |
||||||
|
#{event.content}, |
||||||
|
#{event.postStatus.code}, |
||||||
|
#{event.eventType.code}, |
||||||
|
#{event.createTime}, |
||||||
|
#{event.updateTime} |
||||||
|
from t_ds_listener_event |
||||||
|
where create_time >= #{crashAlarmSuppressionStartTime} |
||||||
|
and sign = #{event.sign} |
||||||
|
and post_status = #{event.postStatus.code} |
||||||
|
having count(*) = 0 |
||||||
|
</insert> |
||||||
|
|
||||||
|
<update id="updateListenerEvent"> |
||||||
|
update t_ds_listener_event |
||||||
|
set log = #{log}, |
||||||
|
post_status = #{postStatus.code}, |
||||||
|
update_time = #{updateTime} |
||||||
|
where id = #{eventId} |
||||||
|
</update> |
||||||
|
|
||||||
|
<select id="listingListenerEventByStatus" resultType="org.apache.dolphinscheduler.dao.entity.ListenerEvent"> |
||||||
|
select |
||||||
|
<include refid="baseSql"/> |
||||||
|
from t_ds_listener_event |
||||||
|
where post_status = #{postStatus.code} |
||||||
|
limit #{limit} |
||||||
|
</select> |
||||||
|
</mapper> |
@ -0,0 +1,48 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.entity; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.ReleaseState; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionCreatedListenerEvent; |
||||||
|
|
||||||
|
import org.junit.jupiter.api.Assertions; |
||||||
|
import org.junit.jupiter.api.Test; |
||||||
|
|
||||||
|
public class ProcessDefinitionCreatedListenerEventTest { |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testBuildProcessDefinitionUpdatedListenerEvent() { |
||||||
|
int id = 1; |
||||||
|
long code = 1L; |
||||||
|
String name = "testName"; |
||||||
|
ReleaseState releaseState = ReleaseState.OFFLINE; |
||||||
|
ProcessDefinition processDefinition = new ProcessDefinition(); |
||||||
|
processDefinition.setId(id); |
||||||
|
processDefinition.setCode(code); |
||||||
|
processDefinition.setName(name); |
||||||
|
processDefinition.setReleaseState(releaseState); |
||||||
|
ProcessDefinitionCreatedListenerEvent event = new ProcessDefinitionCreatedListenerEvent(processDefinition); |
||||||
|
Assertions.assertEquals(event.getEventType(), ListenerEventType.PROCESS_DEFINITION_CREATED); |
||||||
|
Assertions.assertEquals(event.getId(), id); |
||||||
|
Assertions.assertEquals(event.getCode(), code); |
||||||
|
Assertions.assertEquals(event.getName(), name); |
||||||
|
Assertions.assertEquals(event.getReleaseState(), releaseState); |
||||||
|
Assertions.assertEquals(String.format("process definition created:%s", name), event.getTitle()); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,48 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.entity; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||||
|
import org.apache.dolphinscheduler.common.enums.ReleaseState; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionUpdatedListenerEvent; |
||||||
|
|
||||||
|
import org.junit.jupiter.api.Assertions; |
||||||
|
import org.junit.jupiter.api.Test; |
||||||
|
|
||||||
|
public class ProcessDefinitionUpdatedListenerEventTest { |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testBuildProcessDefinitionUpdatedListenerEvent() { |
||||||
|
int id = 1; |
||||||
|
long code = 1L; |
||||||
|
String name = "testName"; |
||||||
|
ReleaseState releaseState = ReleaseState.OFFLINE; |
||||||
|
ProcessDefinition processDefinition = new ProcessDefinition(); |
||||||
|
processDefinition.setId(id); |
||||||
|
processDefinition.setCode(code); |
||||||
|
processDefinition.setName(name); |
||||||
|
processDefinition.setReleaseState(releaseState); |
||||||
|
ProcessDefinitionUpdatedListenerEvent event = new ProcessDefinitionUpdatedListenerEvent(processDefinition); |
||||||
|
Assertions.assertEquals(event.getEventType(), ListenerEventType.PROCESS_DEFINITION_UPDATED); |
||||||
|
Assertions.assertEquals(event.getId(), id); |
||||||
|
Assertions.assertEquals(event.getCode(), code); |
||||||
|
Assertions.assertEquals(event.getName(), name); |
||||||
|
Assertions.assertEquals(event.getReleaseState(), releaseState); |
||||||
|
Assertions.assertEquals(String.format("process definition updated:%s", name), event.getTitle()); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,134 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.dao.mapper; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.AlertStatus; |
||||||
|
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||||
|
import org.apache.dolphinscheduler.common.utils.DateUtils; |
||||||
|
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||||
|
import org.apache.dolphinscheduler.dao.BaseDaoTest; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ServerDownListenerEvent; |
||||||
|
|
||||||
|
import org.apache.commons.codec.digest.DigestUtils; |
||||||
|
|
||||||
|
import java.util.Date; |
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
import org.junit.jupiter.api.Assertions; |
||||||
|
import org.junit.jupiter.api.Test; |
||||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; |
||||||
|
import com.google.common.collect.Lists; |
||||||
|
|
||||||
|
/** |
||||||
|
* AlertPluginInstanceMapper mapper test |
||||||
|
*/ |
||||||
|
public class ListenerEventMapperTest extends BaseDaoTest { |
||||||
|
|
||||||
|
@Autowired |
||||||
|
private ListenerEventMapper listenerEventMapper; |
||||||
|
|
||||||
|
/** |
||||||
|
* test insert |
||||||
|
* |
||||||
|
* @return |
||||||
|
*/ |
||||||
|
@Test |
||||||
|
public void testInsert() { |
||||||
|
ListenerEvent serverDownListenerEvent = generateServerDownListenerEvent("192.168.x.x"); |
||||||
|
listenerEventMapper.insert(serverDownListenerEvent); |
||||||
|
Assertions.assertTrue(serverDownListenerEvent.getId() > 0); |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* test batch insert |
||||||
|
* |
||||||
|
* @return |
||||||
|
*/ |
||||||
|
@Test |
||||||
|
public void testBatchInsert() { |
||||||
|
ListenerEvent event1 = generateServerDownListenerEvent("192.168.x.1"); |
||||||
|
ListenerEvent event2 = generateServerDownListenerEvent("192.168.x.2"); |
||||||
|
listenerEventMapper.batchInsert(Lists.newArrayList(event1, event2)); |
||||||
|
Assertions.assertEquals(listenerEventMapper.selectCount(new QueryWrapper<>()), 2L); |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* test list listener event by status |
||||||
|
* |
||||||
|
* @return |
||||||
|
*/ |
||||||
|
@Test |
||||||
|
public void testListingListenerEventByStatus() { |
||||||
|
ListenerEvent event1 = generateServerDownListenerEvent("192.168.x.1"); |
||||||
|
ListenerEvent event2 = generateServerDownListenerEvent("192.168.x.2"); |
||||||
|
listenerEventMapper.batchInsert(Lists.newArrayList(event1, event2)); |
||||||
|
List<ListenerEvent> listenerEvents = |
||||||
|
listenerEventMapper.listingListenerEventByStatus(AlertStatus.WAIT_EXECUTION, 50); |
||||||
|
Assertions.assertEquals(listenerEvents.size(), 2); |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* test update server down event |
||||||
|
* |
||||||
|
* @return |
||||||
|
*/ |
||||||
|
@Test |
||||||
|
public void testUpdateListenerEvent() { |
||||||
|
ListenerEvent event = generateServerDownListenerEvent("192.168.x.1"); |
||||||
|
listenerEventMapper.insert(event); |
||||||
|
listenerEventMapper.updateListenerEvent(event.getId(), AlertStatus.EXECUTION_FAILURE, "fail", new Date()); |
||||||
|
ListenerEvent updatedEvent = listenerEventMapper.selectById(event.getId()); |
||||||
|
Assertions.assertEquals(updatedEvent.getPostStatus(), AlertStatus.EXECUTION_FAILURE); |
||||||
|
Assertions.assertEquals(updatedEvent.getLog(), "fail"); |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* test delete listener event |
||||||
|
*/ |
||||||
|
@Test |
||||||
|
public void testDeleteListenerEvent() { |
||||||
|
ListenerEvent event = generateServerDownListenerEvent("192.168.x.1"); |
||||||
|
listenerEventMapper.insert(event); |
||||||
|
listenerEventMapper.deleteById(event); |
||||||
|
ListenerEvent actualAlert = listenerEventMapper.selectById(event.getId()); |
||||||
|
Assertions.assertNull(actualAlert); |
||||||
|
} |
||||||
|
/** |
||||||
|
* create server down event |
||||||
|
* @param host worker host |
||||||
|
* @return listener event |
||||||
|
*/ |
||||||
|
private ListenerEvent generateServerDownListenerEvent(String host) { |
||||||
|
ServerDownListenerEvent event = new ServerDownListenerEvent(); |
||||||
|
event.setEventTime(new Date()); |
||||||
|
event.setHost(host); |
||||||
|
event.setType("WORKER"); |
||||||
|
ListenerEvent listenerEvent = new ListenerEvent(); |
||||||
|
listenerEvent.setEventType(ListenerEventType.SERVER_DOWN); |
||||||
|
listenerEvent.setContent(JSONUtils.toJsonString(event)); |
||||||
|
listenerEvent.setSign(DigestUtils.sha1Hex(listenerEvent.getContent())); |
||||||
|
listenerEvent.setLog("success"); |
||||||
|
listenerEvent.setCreateTime(DateUtils.getCurrentDate()); |
||||||
|
listenerEvent.setUpdateTime(DateUtils.getCurrentDate()); |
||||||
|
listenerEvent.setPostStatus(AlertStatus.WAIT_EXECUTION); |
||||||
|
return listenerEvent; |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,278 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.service.alert; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.enums.AlertStatus; |
||||||
|
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||||
|
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.Project; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProjectUser; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.User; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.AbstractListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionCreatedListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionDeletedListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionUpdatedListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ProcessEndListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ProcessFailListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ProcessStartListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.ServerDownListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.TaskEndListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.TaskFailListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.event.TaskStartListenerEvent; |
||||||
|
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper; |
||||||
|
import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper; |
||||||
|
|
||||||
|
import org.apache.commons.codec.digest.DigestUtils; |
||||||
|
import org.apache.commons.collections4.CollectionUtils; |
||||||
|
|
||||||
|
import java.time.LocalDateTime; |
||||||
|
import java.time.ZoneId; |
||||||
|
import java.util.Date; |
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j; |
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||||
|
import org.springframework.beans.factory.annotation.Value; |
||||||
|
import org.springframework.stereotype.Component; |
||||||
|
|
||||||
|
@Component |
||||||
|
@Slf4j |
||||||
|
public class ListenerEventAlertManager { |
||||||
|
|
||||||
|
@Value("${alert.alarm-suppression.crash:60}") |
||||||
|
private int crashAlarmSuppression; |
||||||
|
|
||||||
|
@Autowired |
||||||
|
private ListenerEventMapper listenerEventMapper; |
||||||
|
|
||||||
|
@Autowired |
||||||
|
private AlertPluginInstanceMapper alertPluginInstanceMapper; |
||||||
|
|
||||||
|
public void publishServerDownListenerEvent(String host, String type) { |
||||||
|
ServerDownListenerEvent event = new ServerDownListenerEvent(); |
||||||
|
event.setEventTime(new Date()); |
||||||
|
event.setHost(host); |
||||||
|
event.setType(type); |
||||||
|
this.saveEvent(event); |
||||||
|
} |
||||||
|
|
||||||
|
public void publishProcessDefinitionCreatedListenerEvent(User user, |
||||||
|
ProcessDefinition processDefinition, |
||||||
|
List<TaskDefinitionLog> taskDefinitionLogs, |
||||||
|
List<ProcessTaskRelationLog> processTaskRelationLogs) { |
||||||
|
ProcessDefinitionCreatedListenerEvent event = new ProcessDefinitionCreatedListenerEvent(processDefinition); |
||||||
|
event.setUserName(user.getUserName()); |
||||||
|
event.setModifyBy(user.getUserName()); |
||||||
|
event.setTaskDefinitionLogs(taskDefinitionLogs); |
||||||
|
event.setTaskRelationList(processTaskRelationLogs); |
||||||
|
this.saveEvent(event); |
||||||
|
} |
||||||
|
|
||||||
|
public void publishProcessDefinitionUpdatedListenerEvent(User user, ProcessDefinition processDefinition, |
||||||
|
List<TaskDefinitionLog> taskDefinitionLogs, |
||||||
|
List<ProcessTaskRelationLog> processTaskRelationLogs) { |
||||||
|
ProcessDefinitionUpdatedListenerEvent event = new ProcessDefinitionUpdatedListenerEvent(processDefinition); |
||||||
|
event.setTaskDefinitionLogs(taskDefinitionLogs); |
||||||
|
event.setTaskRelationList(processTaskRelationLogs); |
||||||
|
event.setUserName(user.getUserName()); |
||||||
|
event.setModifyBy(user.getUserName()); |
||||||
|
this.saveEvent(event); |
||||||
|
} |
||||||
|
|
||||||
|
public void publishProcessDefinitionDeletedListenerEvent(User user, Project project, |
||||||
|
ProcessDefinition processDefinition) { |
||||||
|
ProcessDefinitionDeletedListenerEvent event = new ProcessDefinitionDeletedListenerEvent(); |
||||||
|
event.setProjectId(project.getId()); |
||||||
|
event.setProjectCode(project.getCode()); |
||||||
|
event.setProjectName(project.getName()); |
||||||
|
event.setOwner(processDefinition.getUserName()); |
||||||
|
event.setId(processDefinition.getId()); |
||||||
|
event.setCode(processDefinition.getCode()); |
||||||
|
event.setName(processDefinition.getName()); |
||||||
|
event.setEventTime(new Date()); |
||||||
|
event.setUserId(user.getId()); |
||||||
|
event.setModifiedBy(user.getUserName()); |
||||||
|
this.saveEvent(event); |
||||||
|
} |
||||||
|
|
||||||
|
public void publishProcessStartListenerEvent(ProcessInstance processInstance, ProjectUser projectUser) { |
||||||
|
ProcessStartListenerEvent event = new ProcessStartListenerEvent(); |
||||||
|
event.setProjectCode(projectUser.getProjectCode()); |
||||||
|
event.setProjectName(projectUser.getProjectName()); |
||||||
|
event.setOwner(projectUser.getUserName()); |
||||||
|
event.setProcessId(processInstance.getId()); |
||||||
|
event.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); |
||||||
|
event.setProcessName(processInstance.getName()); |
||||||
|
event.setProcessType(processInstance.getCommandType()); |
||||||
|
event.setProcessState(processInstance.getState()); |
||||||
|
event.setRunTimes(processInstance.getRunTimes()); |
||||||
|
event.setRecovery(processInstance.getRecovery()); |
||||||
|
event.setProcessStartTime(processInstance.getStartTime()); |
||||||
|
this.saveEvent(event); |
||||||
|
} |
||||||
|
|
||||||
|
public void publishProcessEndListenerEvent(ProcessInstance processInstance, ProjectUser projectUser) { |
||||||
|
ProcessEndListenerEvent event = new ProcessEndListenerEvent(); |
||||||
|
event.setProjectCode(projectUser.getProjectCode()); |
||||||
|
event.setProjectName(projectUser.getProjectName()); |
||||||
|
event.setOwner(projectUser.getUserName()); |
||||||
|
event.setProcessId(processInstance.getId()); |
||||||
|
event.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); |
||||||
|
event.setProcessName(processInstance.getName()); |
||||||
|
event.setProcessType(processInstance.getCommandType()); |
||||||
|
event.setProcessState(processInstance.getState()); |
||||||
|
event.setRecovery(processInstance.getRecovery()); |
||||||
|
event.setRunTimes(processInstance.getRunTimes()); |
||||||
|
event.setProcessStartTime(processInstance.getStartTime()); |
||||||
|
event.setProcessEndTime(processInstance.getEndTime()); |
||||||
|
event.setProcessHost(processInstance.getHost()); |
||||||
|
this.saveEvent(event); |
||||||
|
} |
||||||
|
|
||||||
|
public void publishProcessFailListenerEvent(ProcessInstance processInstance, |
||||||
|
ProjectUser projectUser) { |
||||||
|
ProcessFailListenerEvent event = new ProcessFailListenerEvent(); |
||||||
|
event.setProjectCode(projectUser.getProjectCode()); |
||||||
|
event.setProjectName(projectUser.getProjectName()); |
||||||
|
event.setOwner(projectUser.getUserName()); |
||||||
|
event.setProcessId(processInstance.getId()); |
||||||
|
event.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); |
||||||
|
event.setProcessName(processInstance.getName()); |
||||||
|
event.setProcessType(processInstance.getCommandType()); |
||||||
|
event.setProcessState(processInstance.getState()); |
||||||
|
event.setRecovery(processInstance.getRecovery()); |
||||||
|
event.setRunTimes(processInstance.getRunTimes()); |
||||||
|
event.setProcessStartTime(processInstance.getStartTime()); |
||||||
|
event.setProcessEndTime(processInstance.getEndTime()); |
||||||
|
event.setProcessHost(processInstance.getHost()); |
||||||
|
this.saveEvent(event); |
||||||
|
} |
||||||
|
|
||||||
|
public void publishTaskStartListenerEvent(ProcessInstance processInstance, |
||||||
|
TaskInstance taskInstance, |
||||||
|
ProjectUser projectUser) { |
||||||
|
TaskStartListenerEvent event = new TaskStartListenerEvent(); |
||||||
|
event.setProjectCode(projectUser.getProjectCode()); |
||||||
|
event.setProjectName(projectUser.getProjectName()); |
||||||
|
event.setOwner(projectUser.getUserName()); |
||||||
|
event.setProcessId(processInstance.getId()); |
||||||
|
event.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); |
||||||
|
event.setProcessName(processInstance.getName()); |
||||||
|
event.setTaskCode(taskInstance.getTaskCode()); |
||||||
|
event.setTaskName(taskInstance.getName()); |
||||||
|
event.setTaskType(taskInstance.getTaskType()); |
||||||
|
event.setTaskState(taskInstance.getState()); |
||||||
|
event.setTaskStartTime(taskInstance.getStartTime()); |
||||||
|
event.setTaskEndTime(taskInstance.getEndTime()); |
||||||
|
event.setTaskHost(taskInstance.getHost()); |
||||||
|
event.setLogPath(taskInstance.getLogPath()); |
||||||
|
this.saveEvent(event); |
||||||
|
} |
||||||
|
|
||||||
|
public void publishTaskEndListenerEvent(ProcessInstance processInstance, |
||||||
|
TaskInstance taskInstance, |
||||||
|
ProjectUser projectUser) { |
||||||
|
TaskEndListenerEvent event = new TaskEndListenerEvent(); |
||||||
|
event.setProjectCode(projectUser.getProjectCode()); |
||||||
|
event.setProjectName(projectUser.getProjectName()); |
||||||
|
event.setOwner(projectUser.getUserName()); |
||||||
|
event.setProcessId(processInstance.getId()); |
||||||
|
event.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); |
||||||
|
event.setProcessName(processInstance.getName()); |
||||||
|
event.setTaskCode(taskInstance.getTaskCode()); |
||||||
|
event.setTaskName(taskInstance.getName()); |
||||||
|
event.setTaskType(taskInstance.getTaskType()); |
||||||
|
event.setTaskState(taskInstance.getState()); |
||||||
|
event.setTaskStartTime(taskInstance.getStartTime()); |
||||||
|
event.setTaskEndTime(taskInstance.getEndTime()); |
||||||
|
event.setTaskHost(taskInstance.getHost()); |
||||||
|
event.setLogPath(taskInstance.getLogPath()); |
||||||
|
this.saveEvent(event); |
||||||
|
} |
||||||
|
|
||||||
|
public void publishTaskFailListenerEvent(ProcessInstance processInstance, |
||||||
|
TaskInstance taskInstance, |
||||||
|
ProjectUser projectUser) { |
||||||
|
TaskFailListenerEvent event = new TaskFailListenerEvent(); |
||||||
|
event.setProjectCode(projectUser.getProjectCode()); |
||||||
|
event.setProjectName(projectUser.getProjectName()); |
||||||
|
event.setOwner(projectUser.getUserName()); |
||||||
|
event.setProcessId(processInstance.getId()); |
||||||
|
event.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); |
||||||
|
event.setProcessName(processInstance.getName()); |
||||||
|
event.setTaskCode(taskInstance.getTaskCode()); |
||||||
|
event.setTaskName(taskInstance.getName()); |
||||||
|
event.setTaskType(taskInstance.getTaskType()); |
||||||
|
event.setTaskState(taskInstance.getState()); |
||||||
|
event.setTaskStartTime(taskInstance.getStartTime()); |
||||||
|
event.setTaskEndTime(taskInstance.getEndTime()); |
||||||
|
event.setTaskHost(taskInstance.getHost()); |
||||||
|
event.setLogPath(taskInstance.getLogPath()); |
||||||
|
this.saveEvent(event); |
||||||
|
} |
||||||
|
|
||||||
|
private void saveEvent(AbstractListenerEvent event) { |
||||||
|
if (!needSendGlobalListenerEvent()) { |
||||||
|
return; |
||||||
|
} |
||||||
|
ListenerEvent listenerEvent = new ListenerEvent(); |
||||||
|
String content = JSONUtils.toJsonString(event); |
||||||
|
listenerEvent.setContent(content); |
||||||
|
listenerEvent.setPostStatus(AlertStatus.WAIT_EXECUTION); |
||||||
|
listenerEvent.setSign(generateSign(content)); |
||||||
|
listenerEvent.setCreateTime(new Date()); |
||||||
|
listenerEvent.setUpdateTime(new Date()); |
||||||
|
listenerEvent.setEventType(event.getEventType()); |
||||||
|
if (event.getEventType() == ListenerEventType.SERVER_DOWN) { |
||||||
|
saveServerDownEvent(listenerEvent); |
||||||
|
} else { |
||||||
|
saveNormalEvent(listenerEvent); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
private void saveNormalEvent(ListenerEvent listenerEvent) { |
||||||
|
int insert = listenerEventMapper.insert(listenerEvent); |
||||||
|
if (insert < 1) { |
||||||
|
log.error("insert listener event failed: {}", listenerEvent); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
private void saveServerDownEvent(ListenerEvent listenerEvent) { |
||||||
|
Date crashAlarmSuppressionStartTime = Date.from( |
||||||
|
LocalDateTime.now().plusMinutes(-crashAlarmSuppression).atZone(ZoneId.systemDefault()).toInstant()); |
||||||
|
listenerEventMapper.insertServerDownEvent(listenerEvent, crashAlarmSuppressionStartTime); |
||||||
|
} |
||||||
|
|
||||||
|
private String generateSign(String content) { |
||||||
|
return DigestUtils.sha256Hex(content).toLowerCase(); |
||||||
|
} |
||||||
|
|
||||||
|
private boolean needSendGlobalListenerEvent() { |
||||||
|
List<AlertPluginInstance> globalPluginInstanceList = |
||||||
|
alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList(); |
||||||
|
return CollectionUtils.isNotEmpty(globalPluginInstanceList); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,148 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.service.alert; |
||||||
|
|
||||||
|
import static org.mockito.ArgumentMatchers.any; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.Project; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.ProjectUser; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||||
|
import org.apache.dolphinscheduler.dao.entity.User; |
||||||
|
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper; |
||||||
|
import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper; |
||||||
|
|
||||||
|
import java.util.ArrayList; |
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test; |
||||||
|
import org.junit.jupiter.api.extension.ExtendWith; |
||||||
|
import org.mockito.InjectMocks; |
||||||
|
import org.mockito.Mock; |
||||||
|
import org.mockito.Mockito; |
||||||
|
import org.mockito.junit.jupiter.MockitoExtension; |
||||||
|
import org.slf4j.Logger; |
||||||
|
import org.slf4j.LoggerFactory; |
||||||
|
|
||||||
|
/** |
||||||
|
* ProcessAlertManager Test |
||||||
|
*/ |
||||||
|
@ExtendWith(MockitoExtension.class) |
||||||
|
public class ListenerEventAlertManagerTest { |
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(ListenerEventAlertManagerTest.class); |
||||||
|
|
||||||
|
@InjectMocks |
||||||
|
ListenerEventAlertManager listenerEventAlertManager; |
||||||
|
|
||||||
|
@Mock |
||||||
|
AlertPluginInstanceMapper alertPluginInstanceMapper; |
||||||
|
|
||||||
|
@Mock |
||||||
|
ListenerEventMapper listenerEventMapper; |
||||||
|
|
||||||
|
@Test |
||||||
|
public void sendServerDownListenerEventTest() { |
||||||
|
String host = "127.0.0.1"; |
||||||
|
String type = "WORKER"; |
||||||
|
List<AlertPluginInstance> globalPluginInstanceList = new ArrayList<>(); |
||||||
|
AlertPluginInstance instance = new AlertPluginInstance(1, "instanceParams", "instanceName"); |
||||||
|
globalPluginInstanceList.add(instance); |
||||||
|
Mockito.when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList()) |
||||||
|
.thenReturn(globalPluginInstanceList); |
||||||
|
Mockito.doNothing().when(listenerEventMapper).insertServerDownEvent(any(), any()); |
||||||
|
listenerEventAlertManager.publishServerDownListenerEvent(host, type); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void sendProcessDefinitionCreatedListenerEvent() { |
||||||
|
User user = Mockito.mock(User.class); |
||||||
|
ProcessDefinition processDefinition = Mockito.mock(ProcessDefinition.class); |
||||||
|
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>(); |
||||||
|
List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>(); |
||||||
|
AlertPluginInstance instance = new AlertPluginInstance(1, "instanceParams", "instanceName"); |
||||||
|
List<AlertPluginInstance> globalPluginInstanceList = new ArrayList<>(); |
||||||
|
globalPluginInstanceList.add(instance); |
||||||
|
Mockito.when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList()) |
||||||
|
.thenReturn(globalPluginInstanceList); |
||||||
|
Mockito.when(listenerEventMapper.insert(any())).thenReturn(1); |
||||||
|
listenerEventAlertManager.publishProcessDefinitionCreatedListenerEvent(user, processDefinition, |
||||||
|
taskDefinitionLogs, processTaskRelationLogs); |
||||||
|
} |
||||||
|
@Test |
||||||
|
public void sendProcessDefinitionUpdatedListenerEvent() { |
||||||
|
User user = new User(); |
||||||
|
ProcessDefinition processDefinition = new ProcessDefinition(); |
||||||
|
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>(); |
||||||
|
List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>(); |
||||||
|
listenerEventAlertManager.publishProcessDefinitionUpdatedListenerEvent(user, processDefinition, |
||||||
|
taskDefinitionLogs, processTaskRelationLogs); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void sendProcessDefinitionDeletedListenerEvent() { |
||||||
|
User user = new User(); |
||||||
|
Project project = new Project(); |
||||||
|
ProcessDefinition processDefinition = new ProcessDefinition(); |
||||||
|
listenerEventAlertManager.publishProcessDefinitionDeletedListenerEvent(user, project, processDefinition); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void sendProcessStartListenerEvent() { |
||||||
|
ProcessInstance processInstance = new ProcessInstance(); |
||||||
|
ProjectUser projectUser = new ProjectUser(); |
||||||
|
listenerEventAlertManager.publishProcessStartListenerEvent(processInstance, projectUser); |
||||||
|
} |
||||||
|
@Test |
||||||
|
public void sendProcessEndListenerEvent() { |
||||||
|
ProcessInstance processInstance = new ProcessInstance(); |
||||||
|
ProjectUser projectUser = new ProjectUser(); |
||||||
|
listenerEventAlertManager.publishProcessEndListenerEvent(processInstance, projectUser); |
||||||
|
} |
||||||
|
@Test |
||||||
|
public void sendProcessFailListenerEvent() { |
||||||
|
ProcessInstance processInstance = new ProcessInstance(); |
||||||
|
ProjectUser projectUser = new ProjectUser(); |
||||||
|
listenerEventAlertManager.publishProcessFailListenerEvent(processInstance, projectUser); |
||||||
|
} |
||||||
|
@Test |
||||||
|
public void sendTaskStartListenerEvent() { |
||||||
|
ProcessInstance processInstance = Mockito.mock(ProcessInstance.class); |
||||||
|
TaskInstance taskInstance = Mockito.mock(TaskInstance.class); |
||||||
|
ProjectUser projectUser = Mockito.mock(ProjectUser.class); |
||||||
|
listenerEventAlertManager.publishTaskStartListenerEvent(processInstance, taskInstance, projectUser); |
||||||
|
} |
||||||
|
@Test |
||||||
|
public void sendTaskEndListenerEvent() { |
||||||
|
ProcessInstance processInstance = Mockito.mock(ProcessInstance.class); |
||||||
|
TaskInstance taskInstance = Mockito.mock(TaskInstance.class); |
||||||
|
ProjectUser projectUser = Mockito.mock(ProjectUser.class); |
||||||
|
listenerEventAlertManager.publishTaskEndListenerEvent(processInstance, taskInstance, projectUser); |
||||||
|
} |
||||||
|
@Test |
||||||
|
public void sendTaskFailListenerEvent() { |
||||||
|
ProcessInstance processInstance = Mockito.mock(ProcessInstance.class); |
||||||
|
TaskInstance taskInstance = Mockito.mock(TaskInstance.class); |
||||||
|
ProjectUser projectUser = Mockito.mock(ProjectUser.class); |
||||||
|
listenerEventAlertManager.publishTaskFailListenerEvent(processInstance, taskInstance, projectUser); |
||||||
|
} |
||||||
|
} |
Loading…
Reference in new issue