|
|
|
@ -18,146 +18,172 @@
|
|
|
|
|
package org.apache.dolphinscheduler.plugin.alert.http; |
|
|
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.alert.api.AlertResult; |
|
|
|
|
import org.apache.dolphinscheduler.alert.api.HttpServiceRetryStrategy; |
|
|
|
|
import org.apache.dolphinscheduler.common.model.OkHttpRequestHeaderContentType; |
|
|
|
|
import org.apache.dolphinscheduler.common.model.OkHttpRequestHeaders; |
|
|
|
|
import org.apache.dolphinscheduler.common.model.OkHttpResponse; |
|
|
|
|
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
|
|
|
|
import org.apache.dolphinscheduler.common.utils.OkHttpUtils; |
|
|
|
|
|
|
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
|
import org.apache.http.HttpEntity; |
|
|
|
|
import org.apache.http.client.config.RequestConfig; |
|
|
|
|
import org.apache.http.client.methods.CloseableHttpResponse; |
|
|
|
|
import org.apache.http.client.methods.HttpGet; |
|
|
|
|
import org.apache.http.client.methods.HttpPost; |
|
|
|
|
import org.apache.http.client.methods.HttpRequestBase; |
|
|
|
|
import org.apache.http.entity.StringEntity; |
|
|
|
|
import org.apache.http.impl.client.CloseableHttpClient; |
|
|
|
|
import org.apache.http.impl.client.HttpClients; |
|
|
|
|
import org.apache.http.util.EntityUtils; |
|
|
|
|
import org.apache.http.HttpStatus; |
|
|
|
|
|
|
|
|
|
import java.io.UnsupportedEncodingException; |
|
|
|
|
import java.net.MalformedURLException; |
|
|
|
|
import java.net.URI; |
|
|
|
|
import java.net.URISyntaxException; |
|
|
|
|
import java.net.URL; |
|
|
|
|
import java.net.URLEncoder; |
|
|
|
|
import java.nio.charset.StandardCharsets; |
|
|
|
|
import java.util.Collections; |
|
|
|
|
import java.util.HashMap; |
|
|
|
|
import java.util.Map; |
|
|
|
|
|
|
|
|
|
import lombok.SneakyThrows; |
|
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
|
|
|
|
|
|
import com.fasterxml.jackson.databind.node.ObjectNode; |
|
|
|
|
|
|
|
|
|
@Slf4j |
|
|
|
|
public final class HttpSender { |
|
|
|
|
|
|
|
|
|
private static final String URL_SPLICE_CHAR = "?"; |
|
|
|
|
/** |
|
|
|
|
* request type post |
|
|
|
|
*/ |
|
|
|
|
private static final String REQUEST_TYPE_POST = "POST"; |
|
|
|
|
/** |
|
|
|
|
* request type get |
|
|
|
|
*/ |
|
|
|
|
private static final String REQUEST_TYPE_GET = "GET"; |
|
|
|
|
private final String headerParams; |
|
|
|
|
private final String bodyParams; |
|
|
|
|
private final String contentField; |
|
|
|
|
private final String requestType; |
|
|
|
|
private final int timeout; |
|
|
|
|
private Map<String, String> headerParams; |
|
|
|
|
private OkHttpRequestHeaderContentType contentType; |
|
|
|
|
private Map<String, String> bodyParams; |
|
|
|
|
private HttpRequestMethod requestType; |
|
|
|
|
private int timeout; |
|
|
|
|
private String url; |
|
|
|
|
private HttpRequestBase httpRequest; |
|
|
|
|
|
|
|
|
|
public HttpSender(Map<String, String> paramsMap) { |
|
|
|
|
paramsValidator(paramsMap); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void paramsValidator(Map<String, String> paramsMap) { |
|
|
|
|
url = paramsMap.get(HttpAlertConstants.NAME_URL); |
|
|
|
|
headerParams = paramsMap.get(HttpAlertConstants.NAME_HEADER_PARAMS); |
|
|
|
|
bodyParams = paramsMap.get(HttpAlertConstants.NAME_BODY_PARAMS); |
|
|
|
|
contentField = paramsMap.get(HttpAlertConstants.NAME_CONTENT_FIELD); |
|
|
|
|
requestType = paramsMap.get(HttpAlertConstants.NAME_REQUEST_TYPE); |
|
|
|
|
if (StringUtils.isBlank(url)) { |
|
|
|
|
throw new IllegalArgumentException("url can not be null"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
String headerParamsString = paramsMap.get(HttpAlertConstants.NAME_HEADER_PARAMS); |
|
|
|
|
if (StringUtils.isNotBlank(headerParamsString)) { |
|
|
|
|
headerParams = JSONUtils.toMap(headerParamsString); |
|
|
|
|
if (headerParams == null) { |
|
|
|
|
throw new IllegalArgumentException("headerParams is not a valid json"); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
headerParams = new HashMap<>(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
String bodyParamsString = paramsMap.get(HttpAlertConstants.NAME_BODY_PARAMS); |
|
|
|
|
if (StringUtils.isNotBlank(bodyParamsString)) { |
|
|
|
|
bodyParams = JSONUtils.toMap(bodyParamsString); |
|
|
|
|
if (bodyParams == null) { |
|
|
|
|
throw new IllegalArgumentException("bodyParams is not a valid json"); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
bodyParams = new HashMap<>(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
requestType = HttpRequestMethod.valueOf(paramsMap.get(HttpAlertConstants.NAME_REQUEST_TYPE)); |
|
|
|
|
} catch (IllegalArgumentException e) { |
|
|
|
|
throw new IllegalArgumentException("requestType is not a valid value"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
contentType = OkHttpRequestHeaderContentType.fromValue(paramsMap.get(HttpAlertConstants.NAME_CONTENT_TYPE)); |
|
|
|
|
if (contentType == null) { |
|
|
|
|
throw new IllegalArgumentException("contentType is not a valid value"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
timeout = StringUtils.isNotBlank(paramsMap.get(HttpAlertConstants.NAME_TIMEOUT)) |
|
|
|
|
? Integer.parseInt(paramsMap.get(HttpAlertConstants.NAME_TIMEOUT)) |
|
|
|
|
: HttpAlertConstants.DEFAULT_TIMEOUT; |
|
|
|
|
: HttpAlertConstants.DEFAULT_TIMEOUT * 1000; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public AlertResult send(String msg) { |
|
|
|
|
|
|
|
|
|
AlertResult alertResult = new AlertResult(); |
|
|
|
|
OkHttpResponse okHttpResponse; |
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
createHttpRequest(msg); |
|
|
|
|
} catch (MalformedURLException e) { |
|
|
|
|
throw new RuntimeException(e); |
|
|
|
|
} catch (URISyntaxException e) { |
|
|
|
|
okHttpResponse = sendHttpRequest(msg); |
|
|
|
|
} catch (RuntimeException e) { |
|
|
|
|
throw new RuntimeException(e); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (httpRequest == null) { |
|
|
|
|
alertResult.setSuccess(false); |
|
|
|
|
alertResult.setMessage("Request types are not supported"); |
|
|
|
|
return alertResult; |
|
|
|
|
} |
|
|
|
|
validateResponse(okHttpResponse, alertResult); |
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
String resp = this.getResponseString(httpRequest); |
|
|
|
|
alertResult.setSuccess(true); |
|
|
|
|
alertResult.setMessage(resp); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
log.error("send http alert msg exception : {}", e.getMessage()); |
|
|
|
|
return alertResult; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void validateResponse(OkHttpResponse okHttpResponse, AlertResult alertResult) { |
|
|
|
|
if (okHttpResponse.getStatusCode() != HttpStatus.SC_OK) { |
|
|
|
|
alertResult.setSuccess(false); |
|
|
|
|
alertResult.setMessage( |
|
|
|
|
String.format("Send http request alert failed: %s", e.getMessage())); |
|
|
|
|
alertResult |
|
|
|
|
.setMessage(String.format("send http alert failed, response body: %s", okHttpResponse.getBody())); |
|
|
|
|
} else { |
|
|
|
|
alertResult.setSuccess(true); |
|
|
|
|
alertResult |
|
|
|
|
.setMessage(String.format("send http alert success, response body: %s", okHttpResponse.getBody())); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return alertResult; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public String getResponseString(HttpRequestBase httpRequest) throws Exception { |
|
|
|
|
private OkHttpResponse sendHttpRequest(String msg) throws RuntimeException { |
|
|
|
|
switch (requestType) { |
|
|
|
|
case POST: |
|
|
|
|
setMsgInHeader(msg); |
|
|
|
|
setMsgInRequestBody(msg); |
|
|
|
|
return sendPostRequest(); |
|
|
|
|
case GET: |
|
|
|
|
setMsgInUrl(msg); |
|
|
|
|
setMsgInHeader(msg); |
|
|
|
|
return sendGetRequest(); |
|
|
|
|
case PUT: |
|
|
|
|
setMsgInHeader(msg); |
|
|
|
|
setMsgInRequestBody(msg); |
|
|
|
|
return sendPutRequest(); |
|
|
|
|
default: |
|
|
|
|
throw new RuntimeException(String.format("http request method %s not supported", |
|
|
|
|
requestType)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
RequestConfig requestConfig = RequestConfig.custom() |
|
|
|
|
.setConnectTimeout(timeout * 1000) |
|
|
|
|
.setConnectionRequestTimeout(timeout * 1000) |
|
|
|
|
.setSocketTimeout(timeout * 1000) |
|
|
|
|
.build(); |
|
|
|
|
CloseableHttpClient httpClient = HttpClients.custom() |
|
|
|
|
.setDefaultRequestConfig(requestConfig) |
|
|
|
|
.setRetryHandler(HttpServiceRetryStrategy.retryStrategy).build(); |
|
|
|
|
@SneakyThrows |
|
|
|
|
private OkHttpResponse sendGetRequest() { |
|
|
|
|
OkHttpRequestHeaders okHttpRequestHeaders = new OkHttpRequestHeaders(); |
|
|
|
|
okHttpRequestHeaders.setHeaders(headerParams); |
|
|
|
|
okHttpRequestHeaders.setOkHttpRequestHeaderContentType(contentType); |
|
|
|
|
Map<String, Object> requestParams = new HashMap<>(); |
|
|
|
|
log.info("sending http alert get request, url: {}, header: {}, requestParams: {}, contentType: {}", |
|
|
|
|
url, headerParams, requestParams, contentType.getValue()); |
|
|
|
|
return OkHttpUtils.get(url, okHttpRequestHeaders, |
|
|
|
|
requestParams, timeout, timeout, timeout); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
CloseableHttpResponse response = httpClient.execute(httpRequest); |
|
|
|
|
HttpEntity entity = response.getEntity(); |
|
|
|
|
return EntityUtils.toString(entity, StandardCharsets.UTF_8); |
|
|
|
|
@SneakyThrows |
|
|
|
|
private OkHttpResponse sendPostRequest() { |
|
|
|
|
OkHttpRequestHeaders okHttpRequestHeaders = new OkHttpRequestHeaders(); |
|
|
|
|
okHttpRequestHeaders.setHeaders(headerParams); |
|
|
|
|
okHttpRequestHeaders.setOkHttpRequestHeaderContentType(contentType); |
|
|
|
|
Map<String, Object> requestBody = Collections.unmodifiableMap(bodyParams); |
|
|
|
|
log.info("sending http alert post request, url: {}, header: {}, requestBody: {}, contentType: {}", |
|
|
|
|
url, headerParams, requestBody, contentType.getValue()); |
|
|
|
|
return OkHttpUtils.post(url, okHttpRequestHeaders, null, |
|
|
|
|
requestBody, timeout, timeout, timeout); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void createHttpRequest(String msg) throws MalformedURLException, URISyntaxException { |
|
|
|
|
if (REQUEST_TYPE_POST.equalsIgnoreCase(requestType)) { |
|
|
|
|
httpRequest = new HttpPost(url); |
|
|
|
|
setHeader(); |
|
|
|
|
// POST request add param in request body
|
|
|
|
|
setMsgInRequestBody(msg); |
|
|
|
|
} else if (REQUEST_TYPE_GET.equalsIgnoreCase(requestType)) { |
|
|
|
|
// GET request add param in url
|
|
|
|
|
setMsgInUrl(msg); |
|
|
|
|
URL unencodeUrl = new URL(url); |
|
|
|
|
URI uri = new URI(unencodeUrl.getProtocol(), unencodeUrl.getAuthority(), unencodeUrl.getPath(), |
|
|
|
|
unencodeUrl.getQuery(), null); |
|
|
|
|
|
|
|
|
|
httpRequest = new HttpGet(uri); |
|
|
|
|
setHeader(); |
|
|
|
|
} |
|
|
|
|
@SneakyThrows |
|
|
|
|
private OkHttpResponse sendPutRequest() { |
|
|
|
|
OkHttpRequestHeaders okHttpRequestHeaders = new OkHttpRequestHeaders(); |
|
|
|
|
okHttpRequestHeaders.setHeaders(headerParams); |
|
|
|
|
okHttpRequestHeaders.setOkHttpRequestHeaderContentType(contentType); |
|
|
|
|
Map<String, Object> requestBody = Collections.unmodifiableMap(bodyParams); |
|
|
|
|
log.info("sending http alert put request, url: {}, header: {}, requestBody: {}, contentType: {}", |
|
|
|
|
url, headerParams, requestBody, contentType.getValue()); |
|
|
|
|
return OkHttpUtils.put(url, okHttpRequestHeaders, |
|
|
|
|
requestBody, timeout, timeout, timeout); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* add msg param in url |
|
|
|
|
*/ |
|
|
|
|
private void setMsgInUrl(String msg) { |
|
|
|
|
|
|
|
|
|
if (StringUtils.isNotBlank(contentField)) { |
|
|
|
|
String type = "&"; |
|
|
|
|
// check splice char is & or ?
|
|
|
|
|
if (!url.contains(URL_SPLICE_CHAR)) { |
|
|
|
|
type = URL_SPLICE_CHAR; |
|
|
|
|
} |
|
|
|
|
if (url.contains(HttpAlertConstants.MSG_PARAMS)) { |
|
|
|
|
try { |
|
|
|
|
url = String.format("%s%s%s=%s", url, type, contentField, |
|
|
|
|
url = url.replace(HttpAlertConstants.MSG_PARAMS, |
|
|
|
|
URLEncoder.encode(msg, StandardCharsets.UTF_8.name())); |
|
|
|
|
} catch (UnsupportedEncodingException e) { |
|
|
|
|
throw new RuntimeException(e); |
|
|
|
@ -168,37 +194,31 @@ public final class HttpSender {
|
|
|
|
|
/** |
|
|
|
|
* set header params |
|
|
|
|
*/ |
|
|
|
|
private void setHeader() { |
|
|
|
|
|
|
|
|
|
if (httpRequest == null) { |
|
|
|
|
private void setMsgInHeader(String msg) { |
|
|
|
|
if (msg == null) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
HashMap<String, Object> map = JSONUtils.parseObject(headerParams, HashMap.class); |
|
|
|
|
for (Map.Entry<String, Object> entry : map.entrySet()) { |
|
|
|
|
httpRequest.setHeader(entry.getKey(), String.valueOf(entry.getValue())); |
|
|
|
|
} |
|
|
|
|
headerParams.forEach((key, value) -> { |
|
|
|
|
if (value.contains(HttpAlertConstants.MSG_PARAMS)) { |
|
|
|
|
headerParams.put(key, value.replace(HttpAlertConstants.MSG_PARAMS, msg)); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* set body params |
|
|
|
|
*/ |
|
|
|
|
private void setMsgInRequestBody(String msg) { |
|
|
|
|
try { |
|
|
|
|
ObjectNode objectNode = JSONUtils.createObjectNode(); |
|
|
|
|
if (StringUtils.isNotBlank(bodyParams)) { |
|
|
|
|
objectNode = JSONUtils.parseObject(bodyParams); |
|
|
|
|
} |
|
|
|
|
// set msg content field
|
|
|
|
|
objectNode.put(contentField, msg); |
|
|
|
|
StringEntity entity = new StringEntity(JSONUtils.toJsonString(objectNode), StandardCharsets.UTF_8); |
|
|
|
|
((HttpPost) httpRequest).setEntity(entity); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
log.error("send http alert msg exception : {}", e.getMessage()); |
|
|
|
|
if (bodyParams == null) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public String getRequestUrl() { |
|
|
|
|
return httpRequest.getURI().toString(); |
|
|
|
|
bodyParams.forEach((key, value) -> { |
|
|
|
|
if (value.contains(HttpAlertConstants.MSG_PARAMS)) { |
|
|
|
|
bodyParams.put(key, value.replace(HttpAlertConstants.MSG_PARAMS, msg)); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|