zhuangchong
4 years ago
11 changed files with 527 additions and 29 deletions
@ -0,0 +1,67 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.alert.processor; |
||||
|
||||
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; |
||||
import org.apache.dolphinscheduler.alert.runner.AlertSender; |
||||
import org.apache.dolphinscheduler.common.utils.Preconditions; |
||||
import org.apache.dolphinscheduler.dao.AlertDao; |
||||
import org.apache.dolphinscheduler.dao.PluginDao; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand; |
||||
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand; |
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; |
||||
import org.apache.dolphinscheduler.remote.utils.JsonSerializer; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import io.netty.channel.Channel; |
||||
|
||||
/** |
||||
* alert request processor |
||||
*/ |
||||
public class AlertRequestProcessor implements NettyRequestProcessor { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(AlertRequestProcessor.class); |
||||
private AlertDao alertDao; |
||||
private PluginDao pluginDao; |
||||
private AlertPluginManager alertPluginManager; |
||||
|
||||
public AlertRequestProcessor(AlertDao alertDao, AlertPluginManager alertPluginManager, PluginDao pluginDao) { |
||||
this.alertDao = alertDao; |
||||
this.pluginDao = pluginDao; |
||||
this.alertPluginManager = alertPluginManager; |
||||
} |
||||
|
||||
@Override |
||||
public void process(Channel channel, Command command) { |
||||
Preconditions.checkArgument(CommandType.ALERT_SEND_REQUEST == command.getType(), |
||||
String.format("invalid command type : %s", command.getType())); |
||||
|
||||
AlertSendRequestCommand alertSendRequestCommand = JsonSerializer.deserialize( |
||||
command.getBody(), AlertSendRequestCommand.class); |
||||
logger.info("received command : {}", alertSendRequestCommand); |
||||
|
||||
AlertSender alertSender = new AlertSender(alertDao, alertPluginManager, pluginDao); |
||||
AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(alertSendRequestCommand.getGroupId(), alertSendRequestCommand.getTitle(), alertSendRequestCommand.getContent()); |
||||
channel.writeAndFlush(alertSendResponseCommand.convert2Command(command.getOpaque())); |
||||
|
||||
} |
||||
} |
@ -1 +1 @@
|
||||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* remove task log request,
*/
REMOVE_TAK_LOG_REQUEST,
/**
* remove task log response
*/
REMOVE_TAK_LOG_RESPONSE,
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
TASK_EXECUTE_REQUEST,
/**
* execute task ack
*/
TASK_EXECUTE_ACK,
/**
* execute task response
*/
TASK_EXECUTE_RESPONSE,
/**
* kill task
*/
TASK_KILL_REQUEST,
/**
* kill task response
*/
TASK_KILL_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG;
} |
||||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* remove task log request,
*/
REMOVE_TAK_LOG_REQUEST,
/**
* remove task log response
*/
REMOVE_TAK_LOG_RESPONSE,
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
TASK_EXECUTE_REQUEST,
/**
* execute task ack
*/
TASK_EXECUTE_ACK,
/**
* execute task response
*/
TASK_EXECUTE_RESPONSE,
/**
* kill task
*/
TASK_KILL_REQUEST,
/**
* kill task response
*/
TASK_KILL_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG,
/**
* alert send request
*/
ALERT_SEND_REQUEST,
/**
* alert send response
*/
ALERT_SEND_RESPONSE;
} |
@ -0,0 +1,80 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command.alert; |
||||
|
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.utils.JsonSerializer; |
||||
|
||||
import java.io.Serializable; |
||||
|
||||
public class AlertSendRequestCommand implements Serializable { |
||||
|
||||
private int groupId; |
||||
|
||||
private String title; |
||||
|
||||
private String content; |
||||
|
||||
public int getGroupId() { |
||||
return groupId; |
||||
} |
||||
|
||||
public void setGroupId(int groupId) { |
||||
this.groupId = groupId; |
||||
} |
||||
|
||||
public String getTitle() { |
||||
return title; |
||||
} |
||||
|
||||
public void setTitle(String title) { |
||||
this.title = title; |
||||
} |
||||
|
||||
public String getContent() { |
||||
return content; |
||||
} |
||||
|
||||
public void setContent(String content) { |
||||
this.content = content; |
||||
} |
||||
|
||||
public AlertSendRequestCommand(){ |
||||
|
||||
} |
||||
|
||||
public AlertSendRequestCommand(int groupId, String title, String content) { |
||||
this.groupId = groupId; |
||||
this.title = title; |
||||
this.content = content; |
||||
} |
||||
|
||||
/** |
||||
* package request command |
||||
* |
||||
* @return command |
||||
*/ |
||||
public Command convert2Command() { |
||||
Command command = new Command(); |
||||
command.setType(CommandType.ALERT_SEND_REQUEST); |
||||
byte[] body = JsonSerializer.serialize(this); |
||||
command.setBody(body); |
||||
return command; |
||||
} |
||||
} |
@ -0,0 +1,76 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command.alert; |
||||
|
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.utils.JsonSerializer; |
||||
import org.apache.dolphinscheduler.spi.alert.AlertResult; |
||||
|
||||
import java.io.Serializable; |
||||
import java.util.List; |
||||
|
||||
public class AlertSendResponseCommand implements Serializable { |
||||
|
||||
/** |
||||
* true:All alert are successful, |
||||
* false:As long as one alert fails |
||||
*/ |
||||
private boolean alertStatus; |
||||
|
||||
private List<AlertResult> alertResults; |
||||
|
||||
public boolean getAlertStatus() { |
||||
return alertStatus; |
||||
} |
||||
|
||||
public void setAlertStatus(boolean alertStatus) { |
||||
this.alertStatus = alertStatus; |
||||
} |
||||
|
||||
public List<AlertResult> getAlertResults() { |
||||
return alertResults; |
||||
} |
||||
|
||||
public void setAlertResults(List<AlertResult> alertResults) { |
||||
this.alertResults = alertResults; |
||||
} |
||||
|
||||
public AlertSendResponseCommand() { |
||||
|
||||
} |
||||
|
||||
public AlertSendResponseCommand(boolean alertStatus, List<AlertResult> alertResults) { |
||||
this.alertStatus = alertStatus; |
||||
this.alertResults = alertResults; |
||||
} |
||||
|
||||
/** |
||||
* package response command |
||||
* |
||||
* @param opaque request unique identification |
||||
* @return command |
||||
*/ |
||||
public Command convert2Command(long opaque) { |
||||
Command command = new Command(opaque); |
||||
command.setType(CommandType.ALERT_SEND_RESPONSE); |
||||
byte[] body = JsonSerializer.serialize(this); |
||||
command.setBody(body); |
||||
return command; |
||||
} |
||||
} |
@ -0,0 +1,96 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.service.alert; |
||||
|
||||
import org.apache.dolphinscheduler.remote.NettyRemotingClient; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand; |
||||
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand; |
||||
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; |
||||
import org.apache.dolphinscheduler.remote.utils.Host; |
||||
import org.apache.dolphinscheduler.remote.utils.JsonSerializer; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
public class AlertClientService { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(AlertClientService.class); |
||||
|
||||
private final NettyClientConfig clientConfig; |
||||
|
||||
private final NettyRemotingClient client; |
||||
|
||||
private volatile boolean isRunning; |
||||
|
||||
/** |
||||
* request time out |
||||
*/ |
||||
private static final long ALERT_REQUEST_TIMEOUT = 10 * 1000L; |
||||
|
||||
/** |
||||
* alert client |
||||
*/ |
||||
public AlertClientService() { |
||||
this.clientConfig = new NettyClientConfig(); |
||||
this.client = new NettyRemotingClient(clientConfig); |
||||
this.isRunning = true; |
||||
} |
||||
|
||||
/** |
||||
* close |
||||
*/ |
||||
public void close() { |
||||
this.client.close(); |
||||
this.isRunning = false; |
||||
logger.info("alter client closed"); |
||||
} |
||||
|
||||
/** |
||||
* alert sync send data |
||||
* @param host host |
||||
* @param port port |
||||
* @param groupId groupId |
||||
* @param title title |
||||
* @param content content |
||||
* @return AlertSendResponseCommand |
||||
*/ |
||||
public AlertSendResponseCommand sendAlert(String host, int port, int groupId, String title, String content) { |
||||
logger.info("sync alert send, host : {}, port : {}, groupId : {}, title : {} ", host, port, groupId, title); |
||||
AlertSendRequestCommand request = new AlertSendRequestCommand(groupId, title, content); |
||||
final Host address = new Host(host, port); |
||||
try { |
||||
Command command = request.convert2Command(); |
||||
Command response = this.client.sendSync(address, command, ALERT_REQUEST_TIMEOUT); |
||||
if (response != null) { |
||||
AlertSendResponseCommand sendResponse = JsonSerializer.deserialize( |
||||
response.getBody(), AlertSendResponseCommand.class); |
||||
return sendResponse; |
||||
} |
||||
} catch (Exception e) { |
||||
logger.error("sync alert send error", e); |
||||
} finally { |
||||
this.client.closeChannel(address); |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
public boolean isRunning() { |
||||
return isRunning; |
||||
} |
||||
} |
@ -0,0 +1,58 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.service.alert; |
||||
|
||||
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand; |
||||
import org.junit.Assert; |
||||
import org.junit.Test; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.util.Objects; |
||||
|
||||
/** |
||||
* alert client service test |
||||
*/ |
||||
public class AlertClientServiceTest { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(AlertClientServiceTest.class); |
||||
|
||||
|
||||
@Test |
||||
public void testSendAlert(){ |
||||
String host; |
||||
int port = 50501; |
||||
int groupId = 1; |
||||
String title = "test-title"; |
||||
String content = "test-content"; |
||||
AlertClientService alertClient = new AlertClientService(); |
||||
|
||||
// alter server does not exist
|
||||
host = "128.0.10.1"; |
||||
AlertSendResponseCommand alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content); |
||||
Assert.assertNull(alertSendResponseCommand); |
||||
|
||||
host = "127.0.0.1"; |
||||
AlertSendResponseCommand alertSendResponseCommand_1 = alertClient.sendAlert(host, port, groupId, title, content); |
||||
|
||||
if (Objects.nonNull(alertClient) && alertClient.isRunning()) { |
||||
alertClient.close(); |
||||
} |
||||
|
||||
} |
||||
} |
Loading…
Reference in new issue