xiangzihao
4 months ago
committed by
GitHub
63 changed files with 173 additions and 1835 deletions
@ -1,51 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.alert.service; |
||||
|
||||
import org.apache.dolphinscheduler.dao.entity.ListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.repository.ListenerEventDao; |
||||
|
||||
import java.util.List; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Slf4j |
||||
@Component |
||||
public class ListenerEventFetcher extends AbstractEventFetcher<ListenerEvent> { |
||||
|
||||
private final ListenerEventDao listenerEventDao; |
||||
|
||||
protected ListenerEventFetcher(AlertHAServer alertHAServer, |
||||
ListenerEventDao listenerEventDao, |
||||
ListenerEventPendingQueue listenerEventPendingQueue) { |
||||
super("ListenerEventFetcher", alertHAServer, listenerEventPendingQueue); |
||||
this.listenerEventDao = listenerEventDao; |
||||
} |
||||
|
||||
@Override |
||||
protected int getEventOffset(ListenerEvent event) { |
||||
return event.getId(); |
||||
} |
||||
|
||||
@Override |
||||
public List<ListenerEvent> fetchPendingEvent(int eventOffset) { |
||||
return listenerEventDao.listingPendingEvents(eventOffset, FETCH_SIZE); |
||||
} |
||||
} |
@ -1,40 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.alert.service; |
||||
|
||||
import org.apache.dolphinscheduler.dao.entity.ListenerEvent; |
||||
|
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Component |
||||
public class ListenerEventLoop extends AbstractEventLoop<ListenerEvent> { |
||||
|
||||
private final ListenerEventSender listenerEventSender; |
||||
|
||||
protected ListenerEventLoop(AlertSenderThreadPoolFactory alertSenderThreadPoolFactory, |
||||
ListenerEventSender listenerEventSender, |
||||
ListenerEventPendingQueue listenerEventPendingQueue) { |
||||
super("ListenerEventLoop", alertSenderThreadPoolFactory.getThreadPool(), listenerEventPendingQueue); |
||||
this.listenerEventSender = listenerEventSender; |
||||
} |
||||
|
||||
@Override |
||||
public void handleEvent(ListenerEvent event) { |
||||
listenerEventSender.sendEvent(event); |
||||
} |
||||
} |
@ -1,32 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.alert.service; |
||||
|
||||
import org.apache.dolphinscheduler.alert.config.AlertConfig; |
||||
import org.apache.dolphinscheduler.dao.entity.ListenerEvent; |
||||
|
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Component |
||||
public class ListenerEventPendingQueue extends AbstractEventPendingQueue<ListenerEvent> { |
||||
|
||||
public ListenerEventPendingQueue(AlertConfig alertConfig) { |
||||
super(alertConfig.getSenderParallelism() * 3 + 1); |
||||
} |
||||
|
||||
} |
@ -1,146 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.alert.service; |
||||
|
||||
import org.apache.dolphinscheduler.alert.api.AlertData; |
||||
import org.apache.dolphinscheduler.alert.config.AlertConfig; |
||||
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; |
||||
import org.apache.dolphinscheduler.common.enums.AlertStatus; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.ListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.AbstractListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionCreatedListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionDeletedListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionUpdatedListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.ProcessEndListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.ProcessFailListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.ProcessStartListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.ServerDownListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.TaskEndListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.TaskFailListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.TaskStartListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper; |
||||
import org.apache.dolphinscheduler.dao.repository.ListenerEventDao; |
||||
|
||||
import org.apache.curator.shaded.com.google.common.collect.Lists; |
||||
|
||||
import java.util.Date; |
||||
import java.util.List; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Slf4j |
||||
@Component |
||||
public class ListenerEventSender extends AbstractEventSender<ListenerEvent> { |
||||
|
||||
private final ListenerEventDao listenerEventDao; |
||||
|
||||
private final AlertPluginInstanceMapper alertPluginInstanceMapper; |
||||
|
||||
public ListenerEventSender(ListenerEventDao listenerEventDao, |
||||
AlertPluginInstanceMapper alertPluginInstanceMapper, |
||||
AlertPluginManager alertPluginManager, |
||||
AlertConfig alertConfig) { |
||||
super(alertPluginManager, alertConfig.getWaitTimeout()); |
||||
this.listenerEventDao = listenerEventDao; |
||||
this.alertPluginInstanceMapper = alertPluginInstanceMapper; |
||||
} |
||||
|
||||
private AbstractListenerEvent generateEventFromContent(ListenerEvent listenerEvent) { |
||||
String content = listenerEvent.getContent(); |
||||
AbstractListenerEvent event = null; |
||||
switch (listenerEvent.getEventType()) { |
||||
case SERVER_DOWN: |
||||
event = JSONUtils.parseObject(content, ServerDownListenerEvent.class); |
||||
break; |
||||
case PROCESS_DEFINITION_CREATED: |
||||
event = JSONUtils.parseObject(content, ProcessDefinitionCreatedListenerEvent.class); |
||||
break; |
||||
case PROCESS_DEFINITION_UPDATED: |
||||
event = JSONUtils.parseObject(content, ProcessDefinitionUpdatedListenerEvent.class); |
||||
break; |
||||
case PROCESS_DEFINITION_DELETED: |
||||
event = JSONUtils.parseObject(content, ProcessDefinitionDeletedListenerEvent.class); |
||||
break; |
||||
case PROCESS_START: |
||||
event = JSONUtils.parseObject(content, ProcessStartListenerEvent.class); |
||||
break; |
||||
case PROCESS_END: |
||||
event = JSONUtils.parseObject(content, ProcessEndListenerEvent.class); |
||||
break; |
||||
case PROCESS_FAIL: |
||||
event = JSONUtils.parseObject(content, ProcessFailListenerEvent.class); |
||||
break; |
||||
case TASK_START: |
||||
event = JSONUtils.parseObject(content, TaskStartListenerEvent.class); |
||||
break; |
||||
case TASK_END: |
||||
event = JSONUtils.parseObject(content, TaskEndListenerEvent.class); |
||||
break; |
||||
case TASK_FAIL: |
||||
event = JSONUtils.parseObject(content, TaskFailListenerEvent.class); |
||||
break; |
||||
default: |
||||
throw new IllegalArgumentException("Unsupported event type: " + listenerEvent.getEventType()); |
||||
} |
||||
if (event == null) { |
||||
throw new IllegalArgumentException("Failed to parse event from content: " + content); |
||||
} |
||||
return event; |
||||
} |
||||
|
||||
@Override |
||||
public List<AlertPluginInstance> getAlertPluginInstanceList(ListenerEvent event) { |
||||
return alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList(); |
||||
} |
||||
|
||||
@Override |
||||
public AlertData getAlertData(ListenerEvent listenerEvent) { |
||||
AbstractListenerEvent event = generateEventFromContent(listenerEvent); |
||||
return AlertData.builder() |
||||
.id(listenerEvent.getId()) |
||||
.content(JSONUtils.toJsonString(Lists.newArrayList(event))) |
||||
.log(listenerEvent.getLog()) |
||||
.title(event.getTitle()) |
||||
.alertType(event.getEventType().getCode()) |
||||
.build(); |
||||
} |
||||
|
||||
@Override |
||||
public Integer getEventId(ListenerEvent event) { |
||||
return event.getId(); |
||||
} |
||||
|
||||
@Override |
||||
public void onError(ListenerEvent event, String log) { |
||||
listenerEventDao.updateListenerEvent(event.getId(), AlertStatus.EXECUTION_FAILURE, log, new Date()); |
||||
} |
||||
|
||||
@Override |
||||
public void onPartialSuccess(ListenerEvent event, String log) { |
||||
listenerEventDao.updateListenerEvent(event.getId(), AlertStatus.EXECUTION_PARTIAL_SUCCESS, log, new Date()); |
||||
} |
||||
|
||||
@Override |
||||
public void onSuccess(ListenerEvent event, String log) { |
||||
listenerEventDao.updateListenerEvent(event.getId(), AlertStatus.EXECUTION_FAILURE, log, new Date()); |
||||
} |
||||
} |
@ -1,143 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.alert.runner; |
||||
|
||||
import static org.mockito.Mockito.mock; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
import org.apache.dolphinscheduler.alert.api.AlertChannel; |
||||
import org.apache.dolphinscheduler.alert.api.AlertResult; |
||||
import org.apache.dolphinscheduler.alert.config.AlertConfig; |
||||
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; |
||||
import org.apache.dolphinscheduler.alert.service.ListenerEventSender; |
||||
import org.apache.dolphinscheduler.common.enums.AlertPluginInstanceType; |
||||
import org.apache.dolphinscheduler.common.enums.AlertStatus; |
||||
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.ListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.ServerDownListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper; |
||||
import org.apache.dolphinscheduler.dao.repository.ListenerEventDao; |
||||
|
||||
import org.apache.commons.codec.digest.DigestUtils; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.Date; |
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
|
||||
import org.junit.jupiter.api.Assertions; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.junit.jupiter.api.extension.ExtendWith; |
||||
import org.mockito.InjectMocks; |
||||
import org.mockito.Mock; |
||||
import org.mockito.Mockito; |
||||
import org.mockito.junit.jupiter.MockitoExtension; |
||||
|
||||
@ExtendWith(MockitoExtension.class) |
||||
class ListenerEventSenderTest { |
||||
|
||||
@Mock |
||||
private ListenerEventDao listenerEventDao; |
||||
|
||||
@Mock |
||||
private AlertPluginInstanceMapper alertPluginInstanceMapper; |
||||
@Mock |
||||
private AlertPluginManager alertPluginManager; |
||||
|
||||
@Mock |
||||
private AlertConfig alertConfig; |
||||
|
||||
@InjectMocks |
||||
private ListenerEventSender listenerEventSender; |
||||
|
||||
@Test |
||||
void testSendServerDownEventSuccess() { |
||||
ServerDownListenerEvent serverDownListenerEvent = new ServerDownListenerEvent(); |
||||
serverDownListenerEvent.setEventTime(new Date()); |
||||
serverDownListenerEvent.setType("WORKER"); |
||||
serverDownListenerEvent.setHost("192.168.*.*"); |
||||
ListenerEvent successEvent = new ListenerEvent(); |
||||
successEvent.setId(1); |
||||
successEvent.setPostStatus(AlertStatus.WAIT_EXECUTION); |
||||
successEvent.setContent(JSONUtils.toJsonString(serverDownListenerEvent)); |
||||
successEvent.setSign(DigestUtils.sha256Hex(successEvent.getContent())); |
||||
successEvent.setEventType(ListenerEventType.SERVER_DOWN); |
||||
successEvent.setCreateTime(new Date()); |
||||
successEvent.setUpdateTime(new Date()); |
||||
|
||||
int pluginDefineId = 1; |
||||
String pluginInstanceParams = |
||||
"{\"User\":\"xx\",\"receivers\":\"xx\",\"sender\":\"xx\",\"smtpSslTrust\":\"*\",\"enableSmtpAuth\":\"true\",\"receiverCcs\":null,\"showType\":\"table\",\"starttlsEnable\":\"false\",\"serverPort\":\"25\",\"serverHost\":\"xx\",\"Password\":\"xx\",\"sslEnable\":\"false\"}"; |
||||
String pluginInstanceName = "alert-instance-mail"; |
||||
List<AlertPluginInstance> alertInstanceList = new ArrayList<>(); |
||||
AlertPluginInstance alertPluginInstance = new AlertPluginInstance( |
||||
pluginDefineId, pluginInstanceParams, pluginInstanceName); |
||||
alertPluginInstance.setInstanceType(AlertPluginInstanceType.GLOBAL); |
||||
alertPluginInstance.setId(1); |
||||
alertInstanceList.add(alertPluginInstance); |
||||
when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList()).thenReturn(alertInstanceList); |
||||
|
||||
AlertResult sendResult = new AlertResult(); |
||||
sendResult.setSuccess(true); |
||||
sendResult.setMessage(String.format("Alert Plugin %s send success", pluginInstanceName)); |
||||
AlertChannel alertChannelMock = mock(AlertChannel.class); |
||||
when(alertChannelMock.process(Mockito.any())).thenReturn(sendResult); |
||||
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock)); |
||||
Assertions.assertTrue(sendResult.isSuccess()); |
||||
listenerEventSender.sendEvent(successEvent); |
||||
} |
||||
|
||||
@Test |
||||
void testSendServerDownEventFailed() { |
||||
ServerDownListenerEvent serverDownListenerEvent = new ServerDownListenerEvent(); |
||||
serverDownListenerEvent.setEventTime(new Date()); |
||||
serverDownListenerEvent.setType("WORKER"); |
||||
serverDownListenerEvent.setHost("192.168.*.*"); |
||||
ListenerEvent successEvent = new ListenerEvent(); |
||||
successEvent.setId(1); |
||||
successEvent.setPostStatus(AlertStatus.WAIT_EXECUTION); |
||||
successEvent.setContent(JSONUtils.toJsonString(serverDownListenerEvent)); |
||||
successEvent.setSign(DigestUtils.sha1Hex(successEvent.getContent())); |
||||
successEvent.setEventType(ListenerEventType.SERVER_DOWN); |
||||
successEvent.setCreateTime(new Date()); |
||||
successEvent.setUpdateTime(new Date()); |
||||
|
||||
int pluginDefineId = 1; |
||||
String pluginInstanceParams = |
||||
"{\"User\":\"xx\",\"receivers\":\"xx\",\"sender\":\"xx\",\"smtpSslTrust\":\"*\",\"enableSmtpAuth\":\"true\",\"receiverCcs\":null,\"showType\":\"table\",\"starttlsEnable\":\"false\",\"serverPort\":\"25\",\"serverHost\":\"xx\",\"Password\":\"xx\",\"sslEnable\":\"false\"}"; |
||||
String pluginInstanceName = "alert-instance-mail"; |
||||
List<AlertPluginInstance> alertInstanceList = new ArrayList<>(); |
||||
AlertPluginInstance alertPluginInstance = new AlertPluginInstance( |
||||
pluginDefineId, pluginInstanceParams, pluginInstanceName); |
||||
alertPluginInstance.setInstanceType(AlertPluginInstanceType.GLOBAL); |
||||
alertPluginInstance.setId(1); |
||||
alertInstanceList.add(alertPluginInstance); |
||||
when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList()).thenReturn(alertInstanceList); |
||||
|
||||
AlertResult sendResult = new AlertResult(); |
||||
sendResult.setSuccess(false); |
||||
sendResult.setMessage(String.format("Alert Plugin %s send failed", pluginInstanceName)); |
||||
AlertChannel alertChannelMock = mock(AlertChannel.class); |
||||
when(alertChannelMock.process(Mockito.any())).thenReturn(sendResult); |
||||
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock)); |
||||
Assertions.assertFalse(sendResult.isSuccess()); |
||||
listenerEventSender.sendEvent(successEvent); |
||||
} |
||||
} |
@ -1,43 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.dao.mapper; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.AlertStatus; |
||||
import org.apache.dolphinscheduler.dao.entity.ListenerEvent; |
||||
|
||||
import org.apache.ibatis.annotations.Param; |
||||
|
||||
import java.util.Date; |
||||
import java.util.List; |
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||
|
||||
public interface ListenerEventMapper extends BaseMapper<ListenerEvent> { |
||||
|
||||
int batchInsert(@Param("events") List<ListenerEvent> events); |
||||
|
||||
void insertServerDownEvent(@Param("event") ListenerEvent event, |
||||
@Param("crashAlarmSuppressionStartTime") Date crashAlarmSuppressionStartTime); |
||||
|
||||
List<ListenerEvent> listingListenerEventByStatus(@Param("minId") int minId, |
||||
@Param("postStatus") int postStatus, |
||||
@Param("limit") int limit); |
||||
|
||||
void updateListenerEvent(@Param("eventId") int eventId, @Param("postStatus") AlertStatus postStatus, |
||||
@Param("log") String log, @Param("updateTime") Date updateTime); |
||||
} |
@ -1,31 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.dao.repository; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.AlertStatus; |
||||
import org.apache.dolphinscheduler.dao.entity.ListenerEvent; |
||||
|
||||
import java.util.Date; |
||||
import java.util.List; |
||||
|
||||
public interface ListenerEventDao extends IDao<ListenerEvent> { |
||||
|
||||
List<ListenerEvent> listingPendingEvents(int minId, int limit); |
||||
|
||||
void updateListenerEvent(int eventId, AlertStatus alertStatus, String message, Date date); |
||||
} |
@ -1,51 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.dao.repository.impl; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.AlertStatus; |
||||
import org.apache.dolphinscheduler.dao.entity.ListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper; |
||||
import org.apache.dolphinscheduler.dao.repository.BaseDao; |
||||
import org.apache.dolphinscheduler.dao.repository.ListenerEventDao; |
||||
|
||||
import java.util.Date; |
||||
import java.util.List; |
||||
|
||||
import lombok.NonNull; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import org.springframework.stereotype.Repository; |
||||
|
||||
@Slf4j |
||||
@Repository |
||||
public class ListenerEventDaoImpl extends BaseDao<ListenerEvent, ListenerEventMapper> implements ListenerEventDao { |
||||
|
||||
public ListenerEventDaoImpl(@NonNull ListenerEventMapper listenerEventMapper) { |
||||
super(listenerEventMapper); |
||||
} |
||||
|
||||
@Override |
||||
public List<ListenerEvent> listingPendingEvents(int minId, int limit) { |
||||
return mybatisMapper.listingListenerEventByStatus(minId, AlertStatus.WAIT_EXECUTION.getCode(), limit); |
||||
} |
||||
|
||||
@Override |
||||
public void updateListenerEvent(int eventId, AlertStatus alertStatus, String message, Date date) { |
||||
mybatisMapper.updateListenerEvent(eventId, alertStatus, message, date); |
||||
} |
||||
} |
@ -1,67 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8" ?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > |
||||
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper"> |
||||
|
||||
<sql id="baseSql"> |
||||
id |
||||
, content, sign, event_type, post_status, log, create_time, update_time |
||||
</sql> |
||||
|
||||
<insert id="batchInsert"> |
||||
insert into t_ds_listener_event ( content, sign, post_status, event_type, create_time, update_time) |
||||
values |
||||
<foreach collection='events' item='event' separator=','> |
||||
(#{event.content},#{event.sign},#{event.postStatus.code}, |
||||
#{event.eventType.code},#{event.createTime},#{event.updateTime}) |
||||
</foreach> |
||||
</insert> |
||||
|
||||
<insert id="insertServerDownEvent"> |
||||
insert into t_ds_listener_event (content, sign, post_status, event_type, create_time, update_time) |
||||
SELECT #{event.sign}, |
||||
#{event.content}, |
||||
#{event.postStatus.code}, |
||||
#{event.eventType.code}, |
||||
#{event.createTime}, |
||||
#{event.updateTime} |
||||
from t_ds_listener_event |
||||
where create_time >= #{crashAlarmSuppressionStartTime} |
||||
and sign = #{event.sign} |
||||
and post_status = #{event.postStatus.code} |
||||
having count(*) = 0 |
||||
</insert> |
||||
|
||||
<update id="updateListenerEvent"> |
||||
update t_ds_listener_event |
||||
set log = #{log}, |
||||
post_status = #{postStatus.code}, |
||||
update_time = #{updateTime} |
||||
where id = #{eventId} |
||||
</update> |
||||
|
||||
<select id="listingListenerEventByStatus" resultType="org.apache.dolphinscheduler.dao.entity.ListenerEvent"> |
||||
select |
||||
<include refid="baseSql"/> |
||||
from t_ds_listener_event |
||||
where id > #{minId} and post_status = #{postStatus} |
||||
order by id asc |
||||
limit #{limit} |
||||
</select> |
||||
</mapper> |
@ -1,136 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.dao.mapper; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.AlertStatus; |
||||
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||
import org.apache.dolphinscheduler.common.utils.DateUtils; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.dao.BaseDaoTest; |
||||
import org.apache.dolphinscheduler.dao.entity.ListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.ServerDownListenerEvent; |
||||
|
||||
import org.apache.commons.codec.digest.DigestUtils; |
||||
|
||||
import java.util.Date; |
||||
import java.util.List; |
||||
|
||||
import org.junit.jupiter.api.Assertions; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; |
||||
import com.google.common.collect.Lists; |
||||
|
||||
/** |
||||
* AlertPluginInstanceMapper mapper test |
||||
*/ |
||||
public class ListenerEventMapperTest extends BaseDaoTest { |
||||
|
||||
@Autowired |
||||
private ListenerEventMapper listenerEventMapper; |
||||
|
||||
/** |
||||
* test insert |
||||
* |
||||
* @return |
||||
*/ |
||||
@Test |
||||
public void testInsert() { |
||||
ListenerEvent serverDownListenerEvent = generateServerDownListenerEvent("192.168.x.x"); |
||||
listenerEventMapper.insert(serverDownListenerEvent); |
||||
Assertions.assertTrue(serverDownListenerEvent.getId() > 0); |
||||
} |
||||
|
||||
/** |
||||
* test batch insert |
||||
* |
||||
* @return |
||||
*/ |
||||
@Test |
||||
public void testBatchInsert() { |
||||
ListenerEvent event1 = generateServerDownListenerEvent("192.168.x.1"); |
||||
ListenerEvent event2 = generateServerDownListenerEvent("192.168.x.2"); |
||||
listenerEventMapper.batchInsert(Lists.newArrayList(event1, event2)); |
||||
Assertions.assertEquals(listenerEventMapper.selectCount(new QueryWrapper<>()), 2L); |
||||
} |
||||
|
||||
/** |
||||
* test list listener event by status |
||||
* |
||||
* @return |
||||
*/ |
||||
@Test |
||||
public void testListingListenerEventByStatus() { |
||||
ListenerEvent event1 = generateServerDownListenerEvent("192.168.x.1"); |
||||
ListenerEvent event2 = generateServerDownListenerEvent("192.168.x.2"); |
||||
listenerEventMapper.batchInsert(Lists.newArrayList(event1, event2)); |
||||
List<ListenerEvent> listenerEvents = |
||||
listenerEventMapper.listingListenerEventByStatus(-1, AlertStatus.WAIT_EXECUTION.getCode(), 50); |
||||
Assertions.assertEquals(listenerEvents.size(), 2); |
||||
} |
||||
|
||||
/** |
||||
* test update server down event |
||||
* |
||||
* @return |
||||
*/ |
||||
@Test |
||||
public void testUpdateListenerEvent() { |
||||
ListenerEvent event = generateServerDownListenerEvent("192.168.x.1"); |
||||
listenerEventMapper.insert(event); |
||||
listenerEventMapper.updateListenerEvent(event.getId(), AlertStatus.EXECUTION_FAILURE, "fail", new Date()); |
||||
ListenerEvent updatedEvent = listenerEventMapper.selectById(event.getId()); |
||||
Assertions.assertEquals(updatedEvent.getPostStatus(), AlertStatus.EXECUTION_FAILURE); |
||||
Assertions.assertEquals(updatedEvent.getLog(), "fail"); |
||||
} |
||||
|
||||
/** |
||||
* test delete listener event |
||||
*/ |
||||
@Test |
||||
public void testDeleteListenerEvent() { |
||||
ListenerEvent event = generateServerDownListenerEvent("192.168.x.1"); |
||||
listenerEventMapper.insert(event); |
||||
listenerEventMapper.deleteById(event); |
||||
ListenerEvent actualAlert = listenerEventMapper.selectById(event.getId()); |
||||
Assertions.assertNull(actualAlert); |
||||
} |
||||
|
||||
/** |
||||
* create server down event |
||||
* |
||||
* @param host worker host |
||||
* @return listener event |
||||
*/ |
||||
private ListenerEvent generateServerDownListenerEvent(String host) { |
||||
ServerDownListenerEvent event = new ServerDownListenerEvent(); |
||||
event.setEventTime(new Date()); |
||||
event.setHost(host); |
||||
event.setType("WORKER"); |
||||
ListenerEvent listenerEvent = new ListenerEvent(); |
||||
listenerEvent.setEventType(ListenerEventType.SERVER_DOWN); |
||||
listenerEvent.setContent(JSONUtils.toJsonString(event)); |
||||
listenerEvent.setSign(DigestUtils.sha1Hex(listenerEvent.getContent())); |
||||
listenerEvent.setLog("success"); |
||||
listenerEvent.setCreateTime(DateUtils.getCurrentDate()); |
||||
listenerEvent.setUpdateTime(DateUtils.getCurrentDate()); |
||||
listenerEvent.setPostStatus(AlertStatus.WAIT_EXECUTION); |
||||
return listenerEvent; |
||||
} |
||||
} |
@ -1,79 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.dao.repository.impl; |
||||
|
||||
import static com.google.common.truth.Truth.assertThat; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.AlertStatus; |
||||
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||
import org.apache.dolphinscheduler.dao.BaseDaoTest; |
||||
import org.apache.dolphinscheduler.dao.entity.ListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.repository.ListenerEventDao; |
||||
|
||||
import java.util.Date; |
||||
|
||||
import org.junit.jupiter.api.Test; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
||||
class ListenerEventDaoImplTest extends BaseDaoTest { |
||||
|
||||
@Autowired |
||||
private ListenerEventDao listenerEventDao; |
||||
|
||||
@Test |
||||
void listingPendingEvents() { |
||||
int minId = -1; |
||||
int limit = 10; |
||||
assertThat(listenerEventDao.listingPendingEvents(minId, limit)).isEmpty(); |
||||
|
||||
ListenerEvent listenerEvent = ListenerEvent.builder() |
||||
.eventType(ListenerEventType.SERVER_DOWN) |
||||
.sign("test") |
||||
.createTime(new Date()) |
||||
.updateTime(new Date()) |
||||
.postStatus(AlertStatus.WAIT_EXECUTION) |
||||
.build(); |
||||
listenerEventDao.insert(listenerEvent); |
||||
|
||||
listenerEvent = ListenerEvent.builder() |
||||
.eventType(ListenerEventType.SERVER_DOWN) |
||||
.sign("test") |
||||
.createTime(new Date()) |
||||
.updateTime(new Date()) |
||||
.postStatus(AlertStatus.EXECUTION_SUCCESS) |
||||
.build(); |
||||
listenerEventDao.insert(listenerEvent); |
||||
|
||||
assertThat(listenerEventDao.listingPendingEvents(minId, limit)).hasSize(1); |
||||
} |
||||
|
||||
@Test |
||||
void updateListenerEvent() { |
||||
ListenerEvent listenerEvent = ListenerEvent.builder() |
||||
.eventType(ListenerEventType.SERVER_DOWN) |
||||
.sign("test") |
||||
.createTime(new Date()) |
||||
.updateTime(new Date()) |
||||
.postStatus(AlertStatus.WAIT_EXECUTION) |
||||
.build(); |
||||
listenerEventDao.insert(listenerEvent); |
||||
listenerEventDao.updateListenerEvent(listenerEvent.getId(), AlertStatus.EXECUTION_SUCCESS, "test", new Date()); |
||||
assertThat(listenerEventDao.queryById(listenerEvent.getId()).getPostStatus()) |
||||
.isEqualTo(AlertStatus.EXECUTION_SUCCESS); |
||||
} |
||||
} |
@ -1,283 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.service.alert; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.AlertStatus; |
||||
import org.apache.dolphinscheduler.common.enums.ListenerEventType; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.ListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; |
||||
import org.apache.dolphinscheduler.dao.entity.Project; |
||||
import org.apache.dolphinscheduler.dao.entity.ProjectUser; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.User; |
||||
import org.apache.dolphinscheduler.dao.entity.event.AbstractListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionCreatedListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionDeletedListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionUpdatedListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.ProcessEndListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.ProcessFailListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.ProcessStartListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.ServerDownListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.TaskEndListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.TaskFailListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.entity.event.TaskStartListenerEvent; |
||||
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper; |
||||
import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
|
||||
import org.apache.commons.codec.digest.DigestUtils; |
||||
import org.apache.commons.collections4.CollectionUtils; |
||||
|
||||
import java.time.LocalDateTime; |
||||
import java.time.ZoneId; |
||||
import java.util.Date; |
||||
import java.util.List; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.beans.factory.annotation.Value; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Component |
||||
@Slf4j |
||||
public class ListenerEventAlertManager { |
||||
|
||||
@Value("${alert.alarm-suppression.crash:60}") |
||||
private int crashAlarmSuppression; |
||||
|
||||
@Autowired |
||||
private ListenerEventMapper listenerEventMapper; |
||||
|
||||
@Autowired |
||||
private AlertPluginInstanceMapper alertPluginInstanceMapper; |
||||
|
||||
@Autowired |
||||
private ProcessService processService; |
||||
|
||||
public void publishServerDownListenerEvent(String host, String type) { |
||||
ServerDownListenerEvent event = new ServerDownListenerEvent(); |
||||
event.setEventTime(new Date()); |
||||
event.setHost(host); |
||||
event.setType(type); |
||||
this.saveEvent(event); |
||||
} |
||||
|
||||
public void publishProcessDefinitionCreatedListenerEvent(User user, |
||||
ProcessDefinition processDefinition, |
||||
List<TaskDefinitionLog> taskDefinitionLogs, |
||||
List<ProcessTaskRelationLog> processTaskRelationLogs) { |
||||
ProcessDefinitionCreatedListenerEvent event = new ProcessDefinitionCreatedListenerEvent(processDefinition); |
||||
event.setUserName(user.getUserName()); |
||||
event.setModifyBy(user.getUserName()); |
||||
event.setTaskDefinitionLogs(taskDefinitionLogs); |
||||
event.setTaskRelationList(processTaskRelationLogs); |
||||
this.saveEvent(event); |
||||
} |
||||
|
||||
public void publishProcessDefinitionUpdatedListenerEvent(User user, ProcessDefinition processDefinition, |
||||
List<TaskDefinitionLog> taskDefinitionLogs, |
||||
List<ProcessTaskRelationLog> processTaskRelationLogs) { |
||||
ProcessDefinitionUpdatedListenerEvent event = new ProcessDefinitionUpdatedListenerEvent(processDefinition); |
||||
event.setTaskDefinitionLogs(taskDefinitionLogs); |
||||
event.setTaskRelationList(processTaskRelationLogs); |
||||
event.setUserName(user.getUserName()); |
||||
event.setModifyBy(user.getUserName()); |
||||
this.saveEvent(event); |
||||
} |
||||
|
||||
public void publishProcessDefinitionDeletedListenerEvent(User user, Project project, |
||||
ProcessDefinition processDefinition) { |
||||
ProcessDefinitionDeletedListenerEvent event = new ProcessDefinitionDeletedListenerEvent(); |
||||
event.setProjectId(project.getId()); |
||||
event.setProjectCode(project.getCode()); |
||||
event.setProjectName(project.getName()); |
||||
event.setOwner(processDefinition.getUserName()); |
||||
event.setId(processDefinition.getId()); |
||||
event.setCode(processDefinition.getCode()); |
||||
event.setName(processDefinition.getName()); |
||||
event.setEventTime(new Date()); |
||||
event.setUserId(user.getId()); |
||||
event.setModifiedBy(user.getUserName()); |
||||
this.saveEvent(event); |
||||
} |
||||
|
||||
public void publishProcessStartListenerEvent(ProcessInstance processInstance, ProjectUser projectUser) { |
||||
ProcessStartListenerEvent event = new ProcessStartListenerEvent(); |
||||
event.setProjectCode(projectUser.getProjectCode()); |
||||
event.setProjectName(projectUser.getProjectName()); |
||||
event.setOwner(projectUser.getUserName()); |
||||
event.setProcessId(processInstance.getId()); |
||||
event.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); |
||||
event.setProcessName(processInstance.getName()); |
||||
event.setProcessType(processInstance.getCommandType()); |
||||
event.setProcessState(processInstance.getState()); |
||||
event.setRunTimes(processInstance.getRunTimes()); |
||||
event.setRecovery(processInstance.getRecovery()); |
||||
event.setProcessStartTime(processInstance.getStartTime()); |
||||
this.saveEvent(event); |
||||
} |
||||
|
||||
public void publishProcessEndListenerEvent(ProcessInstance processInstance, ProjectUser projectUser) { |
||||
ProcessEndListenerEvent event = new ProcessEndListenerEvent(); |
||||
event.setProjectCode(projectUser.getProjectCode()); |
||||
event.setProjectName(projectUser.getProjectName()); |
||||
event.setOwner(projectUser.getUserName()); |
||||
event.setProcessId(processInstance.getId()); |
||||
event.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); |
||||
event.setProcessName(processInstance.getName()); |
||||
event.setProcessType(processInstance.getCommandType()); |
||||
event.setProcessState(processInstance.getState()); |
||||
event.setRecovery(processInstance.getRecovery()); |
||||
event.setRunTimes(processInstance.getRunTimes()); |
||||
event.setProcessStartTime(processInstance.getStartTime()); |
||||
event.setProcessEndTime(processInstance.getEndTime()); |
||||
event.setProcessHost(processInstance.getHost()); |
||||
this.saveEvent(event); |
||||
} |
||||
|
||||
public void publishProcessFailListenerEvent(ProcessInstance processInstance, |
||||
ProjectUser projectUser) { |
||||
ProcessFailListenerEvent event = new ProcessFailListenerEvent(); |
||||
event.setProjectCode(projectUser.getProjectCode()); |
||||
event.setProjectName(projectUser.getProjectName()); |
||||
event.setOwner(projectUser.getUserName()); |
||||
event.setProcessId(processInstance.getId()); |
||||
event.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); |
||||
event.setProcessName(processInstance.getName()); |
||||
event.setProcessType(processInstance.getCommandType()); |
||||
event.setProcessState(processInstance.getState()); |
||||
event.setRecovery(processInstance.getRecovery()); |
||||
event.setRunTimes(processInstance.getRunTimes()); |
||||
event.setProcessStartTime(processInstance.getStartTime()); |
||||
event.setProcessEndTime(processInstance.getEndTime()); |
||||
event.setProcessHost(processInstance.getHost()); |
||||
this.saveEvent(event); |
||||
} |
||||
|
||||
public void publishTaskStartListenerEvent(ProcessInstance processInstance, |
||||
TaskInstance taskInstance, |
||||
ProjectUser projectUser) { |
||||
TaskStartListenerEvent event = new TaskStartListenerEvent(); |
||||
event.setProjectCode(projectUser.getProjectCode()); |
||||
event.setProjectName(projectUser.getProjectName()); |
||||
event.setOwner(projectUser.getUserName()); |
||||
event.setProcessId(processInstance.getId()); |
||||
event.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); |
||||
event.setProcessName(processInstance.getName()); |
||||
event.setTaskCode(taskInstance.getTaskCode()); |
||||
event.setTaskName(taskInstance.getName()); |
||||
event.setTaskType(taskInstance.getTaskType()); |
||||
event.setTaskState(taskInstance.getState()); |
||||
event.setTaskStartTime(taskInstance.getStartTime()); |
||||
event.setTaskEndTime(taskInstance.getEndTime()); |
||||
event.setTaskHost(taskInstance.getHost()); |
||||
event.setLogPath(taskInstance.getLogPath()); |
||||
this.saveEvent(event); |
||||
} |
||||
|
||||
public void publishTaskEndListenerEvent(ProcessInstance processInstance, |
||||
TaskInstance taskInstance, |
||||
ProjectUser projectUser) { |
||||
TaskEndListenerEvent event = new TaskEndListenerEvent(); |
||||
event.setProjectCode(projectUser.getProjectCode()); |
||||
event.setProjectName(projectUser.getProjectName()); |
||||
event.setOwner(projectUser.getUserName()); |
||||
event.setProcessId(processInstance.getId()); |
||||
event.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); |
||||
event.setProcessName(processInstance.getName()); |
||||
event.setTaskCode(taskInstance.getTaskCode()); |
||||
event.setTaskName(taskInstance.getName()); |
||||
event.setTaskType(taskInstance.getTaskType()); |
||||
event.setTaskState(taskInstance.getState()); |
||||
event.setTaskStartTime(taskInstance.getStartTime()); |
||||
event.setTaskEndTime(taskInstance.getEndTime()); |
||||
event.setTaskHost(taskInstance.getHost()); |
||||
event.setLogPath(taskInstance.getLogPath()); |
||||
this.saveEvent(event); |
||||
} |
||||
|
||||
public void publishTaskFailListenerEvent(ProcessInstance processInstance, |
||||
TaskInstance taskInstance) { |
||||
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); |
||||
|
||||
TaskFailListenerEvent event = new TaskFailListenerEvent(); |
||||
event.setProjectCode(projectUser.getProjectCode()); |
||||
event.setProjectName(projectUser.getProjectName()); |
||||
event.setOwner(projectUser.getUserName()); |
||||
event.setProcessId(processInstance.getId()); |
||||
event.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); |
||||
event.setProcessName(processInstance.getName()); |
||||
event.setTaskCode(taskInstance.getTaskCode()); |
||||
event.setTaskName(taskInstance.getName()); |
||||
event.setTaskType(taskInstance.getTaskType()); |
||||
event.setTaskState(taskInstance.getState()); |
||||
event.setTaskStartTime(taskInstance.getStartTime()); |
||||
event.setTaskEndTime(taskInstance.getEndTime()); |
||||
event.setTaskHost(taskInstance.getHost()); |
||||
event.setLogPath(taskInstance.getLogPath()); |
||||
this.saveEvent(event); |
||||
} |
||||
|
||||
private void saveEvent(AbstractListenerEvent event) { |
||||
if (!needSendGlobalListenerEvent()) { |
||||
return; |
||||
} |
||||
ListenerEvent listenerEvent = new ListenerEvent(); |
||||
String content = JSONUtils.toJsonString(event); |
||||
listenerEvent.setContent(content); |
||||
listenerEvent.setPostStatus(AlertStatus.WAIT_EXECUTION); |
||||
listenerEvent.setSign(generateSign(content)); |
||||
listenerEvent.setCreateTime(new Date()); |
||||
listenerEvent.setUpdateTime(new Date()); |
||||
listenerEvent.setEventType(event.getEventType()); |
||||
if (event.getEventType() == ListenerEventType.SERVER_DOWN) { |
||||
saveServerDownEvent(listenerEvent); |
||||
} else { |
||||
saveNormalEvent(listenerEvent); |
||||
} |
||||
} |
||||
|
||||
private void saveNormalEvent(ListenerEvent listenerEvent) { |
||||
int insert = listenerEventMapper.insert(listenerEvent); |
||||
if (insert < 1) { |
||||
log.error("insert listener event failed: {}", listenerEvent); |
||||
} |
||||
} |
||||
|
||||
private void saveServerDownEvent(ListenerEvent listenerEvent) { |
||||
Date crashAlarmSuppressionStartTime = Date.from( |
||||
LocalDateTime.now().plusMinutes(-crashAlarmSuppression).atZone(ZoneId.systemDefault()).toInstant()); |
||||
listenerEventMapper.insertServerDownEvent(listenerEvent, crashAlarmSuppressionStartTime); |
||||
} |
||||
|
||||
private String generateSign(String content) { |
||||
return DigestUtils.sha256Hex(content).toLowerCase(); |
||||
} |
||||
|
||||
private boolean needSendGlobalListenerEvent() { |
||||
List<AlertPluginInstance> globalPluginInstanceList = |
||||
alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList(); |
||||
return CollectionUtils.isNotEmpty(globalPluginInstanceList); |
||||
} |
||||
} |
@ -1,150 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.service.alert; |
||||
|
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; |
||||
import org.apache.dolphinscheduler.dao.entity.Project; |
||||
import org.apache.dolphinscheduler.dao.entity.ProjectUser; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.User; |
||||
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper; |
||||
import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
|
||||
import org.junit.jupiter.api.Test; |
||||
import org.junit.jupiter.api.extension.ExtendWith; |
||||
import org.mockito.InjectMocks; |
||||
import org.mockito.Mock; |
||||
import org.mockito.Mockito; |
||||
import org.mockito.junit.jupiter.MockitoExtension; |
||||
|
||||
/** |
||||
* ProcessAlertManager Test |
||||
*/ |
||||
@ExtendWith(MockitoExtension.class) |
||||
public class ListenerEventAlertManagerTest { |
||||
|
||||
@InjectMocks |
||||
ListenerEventAlertManager listenerEventAlertManager; |
||||
|
||||
@Mock |
||||
AlertPluginInstanceMapper alertPluginInstanceMapper; |
||||
|
||||
@Mock |
||||
ListenerEventMapper listenerEventMapper; |
||||
|
||||
@Mock |
||||
ProcessService processService; |
||||
|
||||
@Test |
||||
public void sendServerDownListenerEventTest() { |
||||
String host = "127.0.0.1"; |
||||
String type = "WORKER"; |
||||
List<AlertPluginInstance> globalPluginInstanceList = new ArrayList<>(); |
||||
AlertPluginInstance instance = new AlertPluginInstance(1, "instanceParams", "instanceName"); |
||||
globalPluginInstanceList.add(instance); |
||||
when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList()) |
||||
.thenReturn(globalPluginInstanceList); |
||||
Mockito.doNothing().when(listenerEventMapper).insertServerDownEvent(any(), any()); |
||||
listenerEventAlertManager.publishServerDownListenerEvent(host, type); |
||||
} |
||||
|
||||
@Test |
||||
public void sendProcessDefinitionCreatedListenerEvent() { |
||||
User user = Mockito.mock(User.class); |
||||
ProcessDefinition processDefinition = Mockito.mock(ProcessDefinition.class); |
||||
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>(); |
||||
List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>(); |
||||
AlertPluginInstance instance = new AlertPluginInstance(1, "instanceParams", "instanceName"); |
||||
List<AlertPluginInstance> globalPluginInstanceList = new ArrayList<>(); |
||||
globalPluginInstanceList.add(instance); |
||||
when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList()) |
||||
.thenReturn(globalPluginInstanceList); |
||||
when(listenerEventMapper.insert(any())).thenReturn(1); |
||||
listenerEventAlertManager.publishProcessDefinitionCreatedListenerEvent(user, processDefinition, |
||||
taskDefinitionLogs, processTaskRelationLogs); |
||||
} |
||||
@Test |
||||
public void sendProcessDefinitionUpdatedListenerEvent() { |
||||
User user = new User(); |
||||
ProcessDefinition processDefinition = new ProcessDefinition(); |
||||
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>(); |
||||
List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>(); |
||||
listenerEventAlertManager.publishProcessDefinitionUpdatedListenerEvent(user, processDefinition, |
||||
taskDefinitionLogs, processTaskRelationLogs); |
||||
} |
||||
|
||||
@Test |
||||
public void sendProcessDefinitionDeletedListenerEvent() { |
||||
User user = new User(); |
||||
Project project = new Project(); |
||||
ProcessDefinition processDefinition = new ProcessDefinition(); |
||||
listenerEventAlertManager.publishProcessDefinitionDeletedListenerEvent(user, project, processDefinition); |
||||
} |
||||
|
||||
@Test |
||||
public void sendProcessStartListenerEvent() { |
||||
ProcessInstance processInstance = new ProcessInstance(); |
||||
ProjectUser projectUser = new ProjectUser(); |
||||
listenerEventAlertManager.publishProcessStartListenerEvent(processInstance, projectUser); |
||||
} |
||||
@Test |
||||
public void sendProcessEndListenerEvent() { |
||||
ProcessInstance processInstance = new ProcessInstance(); |
||||
ProjectUser projectUser = new ProjectUser(); |
||||
listenerEventAlertManager.publishProcessEndListenerEvent(processInstance, projectUser); |
||||
} |
||||
@Test |
||||
public void sendProcessFailListenerEvent() { |
||||
ProcessInstance processInstance = new ProcessInstance(); |
||||
ProjectUser projectUser = new ProjectUser(); |
||||
listenerEventAlertManager.publishProcessFailListenerEvent(processInstance, projectUser); |
||||
} |
||||
@Test |
||||
public void sendTaskStartListenerEvent() { |
||||
ProcessInstance processInstance = Mockito.mock(ProcessInstance.class); |
||||
TaskInstance taskInstance = Mockito.mock(TaskInstance.class); |
||||
ProjectUser projectUser = Mockito.mock(ProjectUser.class); |
||||
listenerEventAlertManager.publishTaskStartListenerEvent(processInstance, taskInstance, projectUser); |
||||
} |
||||
@Test |
||||
public void sendTaskEndListenerEvent() { |
||||
ProcessInstance processInstance = Mockito.mock(ProcessInstance.class); |
||||
TaskInstance taskInstance = Mockito.mock(TaskInstance.class); |
||||
ProjectUser projectUser = Mockito.mock(ProjectUser.class); |
||||
listenerEventAlertManager.publishTaskEndListenerEvent(processInstance, taskInstance, projectUser); |
||||
} |
||||
@Test |
||||
public void sendTaskFailListenerEvent() { |
||||
ProcessInstance processInstance = Mockito.mock(ProcessInstance.class); |
||||
TaskInstance taskInstance = Mockito.mock(TaskInstance.class); |
||||
when(processService.queryProjectWithUserByProcessInstanceId(processInstance.getId())) |
||||
.thenReturn(new ProjectUser()); |
||||
listenerEventAlertManager.publishTaskFailListenerEvent(processInstance, taskInstance); |
||||
} |
||||
} |
Loading…
Reference in new issue