xiangzihao
6 months ago
committed by
GitHub
48 changed files with 38 additions and 1211 deletions
@ -1,22 +0,0 @@
|
||||
# Pigeon |
||||
|
||||
## Overview |
||||
|
||||
Pigeon is a task used to trigger remote tasks, acquire logs or status by calling remote WebSocket service. It is DolphinScheduler uses a remote WebSocket service to call tasks. |
||||
|
||||
## Create Task |
||||
|
||||
- Click `Project Management -> Project Name -> Workflow Definition`, and click the `Create Workflow` button to enter the DAG editing page. |
||||
- Drag from the toolbar <img src="../../../../img/pigeon.png" width="20"/> to the canvas to create a new Pigeon task. |
||||
|
||||
## Task Parameters |
||||
|
||||
[//]: # (TODO: use the commented anchor below once our website template supports this syntax) |
||||
[//]: # (- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md#default-task-parameters) `Default Task Parameters` section for default parameters.) |
||||
|
||||
- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters. |
||||
|
||||
| **Parameter** | **Description** | |
||||
|------------------|---------------------------------------| |
||||
| Target task name | Target task name of this Pigeon node. | |
||||
|
@ -1,19 +0,0 @@
|
||||
# Pigeon |
||||
|
||||
Pigeon任务类型是通过调用远程websocket服务,实现远程任务的触发,状态、日志的获取,是 DolphinScheduler 通用远程 websocket 服务调用任务 |
||||
|
||||
## 创建任务 |
||||
|
||||
拖动工具栏中的<img src="../../../../img/pigeon.png" width="20"/>任务节点到画板中即能完成任务创建 |
||||
|
||||
## 任务参数 |
||||
|
||||
[//]: # (TODO: use the commented anchor below once our website template supports this syntax) |
||||
[//]: # (- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md#默认任务参数)`默认任务参数`一栏。) |
||||
|
||||
- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。 |
||||
|
||||
| **任务参数** | **描述** | |
||||
|----------|-------------------| |
||||
| 目标任务名 | 输入Pigeon任务的目标任务名称 | |
||||
|
Before Width: | Height: | Size: 1.2 KiB |
@ -1,22 +0,0 @@
|
||||
Copyright (c) 2010-2020 Nathan Rajlich |
||||
|
||||
Permission is hereby granted, free of charge, to any person |
||||
obtaining a copy of this software and associated documentation |
||||
files (the "Software"), to deal in the Software without |
||||
restriction, including without limitation the rights to use, |
||||
copy, modify, merge, publish, distribute, sublicense, and/or sell |
||||
copies of the Software, and to permit persons to whom the |
||||
Software is furnished to do so, subject to the following |
||||
conditions: |
||||
|
||||
The above copyright notice and this permission notice shall be |
||||
included in all copies or substantial portions of the Software. |
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
||||
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES |
||||
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND |
||||
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT |
||||
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
||||
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
||||
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR |
||||
OTHER DEALINGS IN THE SOFTWARE. |
@ -1,85 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
||||
|
||||
<modelVersion>4.0.0</modelVersion> |
||||
<parent> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-task-plugin</artifactId> |
||||
<version>dev-SNAPSHOT</version> |
||||
</parent> |
||||
|
||||
<artifactId>dolphinscheduler-task-pigeon</artifactId> |
||||
<packaging>jar</packaging> |
||||
<dependencies> |
||||
|
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-task-api</artifactId> |
||||
<version>${project.version}</version> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-spi</artifactId> |
||||
<scope>provided</scope> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>org.apache.commons</groupId> |
||||
<artifactId>commons-collections4</artifactId> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>org.slf4j</groupId> |
||||
<artifactId>slf4j-api</artifactId> |
||||
</dependency> |
||||
|
||||
<!--https://github.com/dreamhead/moco/blob/master/moco-doc/usage.md#socket--> |
||||
<dependency> |
||||
<groupId>com.github.dreamhead</groupId> |
||||
<artifactId>moco-core</artifactId> |
||||
<version>1.2.0</version> |
||||
<scope>test</scope> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>com.github.dreamhead</groupId> |
||||
<artifactId>moco-runner</artifactId> |
||||
<version>1.2.0</version> |
||||
<scope>test</scope> |
||||
<exclusions> |
||||
<exclusion> |
||||
<groupId>commons-cli</groupId> |
||||
<artifactId>commons-cli</artifactId> |
||||
</exclusion> |
||||
</exclusions> |
||||
</dependency> |
||||
|
||||
<dependency> |
||||
<groupId>org.java-websocket</groupId> |
||||
<artifactId>Java-WebSocket</artifactId> |
||||
</dependency> |
||||
|
||||
<dependency> |
||||
<groupId>org.apache.httpcomponents</groupId> |
||||
<artifactId>httpclient</artifactId> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>org.apache.httpcomponents</groupId> |
||||
<artifactId>httpcore</artifactId> |
||||
</dependency> |
||||
</dependencies> |
||||
</project> |
@ -1,86 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.pigeon; |
||||
|
||||
import org.apache.commons.lang3.StringUtils; |
||||
|
||||
import java.util.ResourceBundle; |
||||
|
||||
public class PigeonConfig { |
||||
|
||||
private static PigeonConfig cfg; |
||||
|
||||
private final String jobTriggerUrl; |
||||
private final String jobTriggerPostBody; |
||||
private final String jobStatusUrl; |
||||
private final String jobStatusPostBody; |
||||
|
||||
private final String jobLogsFetchUrl; |
||||
private final String jobCancelPostBody; |
||||
|
||||
public static synchronized PigeonConfig getInstance() { |
||||
if (cfg == null) { |
||||
cfg = new PigeonConfig(); |
||||
} |
||||
return cfg; |
||||
} |
||||
|
||||
private PigeonConfig() { |
||||
ResourceBundle bundle = |
||||
ResourceBundle.getBundle(PigeonConfig.class.getPackage().getName().replace(".", "/") + "/config"); |
||||
this.jobTriggerUrl = bundle.getString("job.trigger.url"); |
||||
this.jobStatusUrl = bundle.getString("job.status.url"); |
||||
this.jobTriggerPostBody = bundle.getString("job.trigger.post.body"); |
||||
this.jobStatusPostBody = bundle.getString("job.status.post.body"); |
||||
this.jobLogsFetchUrl = bundle.getString("job.logs.fetch.url"); |
||||
this.jobCancelPostBody = bundle.getString("job.cancel.post.body"); |
||||
} |
||||
|
||||
public String getJobCancelPostBody(int taskId) { |
||||
return String.format(jobCancelPostBody, taskId); |
||||
} |
||||
|
||||
public String getJobTriggerUrl(String tisHost) { |
||||
checkHost(tisHost); |
||||
return String.format(this.jobTriggerUrl, tisHost); |
||||
} |
||||
|
||||
public String getJobTriggerPostBody() { |
||||
return jobTriggerPostBody; |
||||
} |
||||
|
||||
public String getJobStatusPostBody(int taskId) { |
||||
return String.format(jobStatusPostBody, taskId); |
||||
} |
||||
|
||||
public String getJobLogsFetchUrl(String host, String jobName, int taskId) { |
||||
checkHost(host); |
||||
return String.format(jobLogsFetchUrl, host, jobName, taskId); |
||||
} |
||||
|
||||
public String getJobStatusUrl(String tisHost) { |
||||
checkHost(tisHost); |
||||
return String.format(this.jobStatusUrl, tisHost); |
||||
} |
||||
|
||||
private static void checkHost(String tisHost) { |
||||
if (StringUtils.isBlank(tisHost)) { |
||||
throw new IllegalArgumentException("param tisHost can not be null"); |
||||
} |
||||
} |
||||
} |
@ -1,62 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.pigeon; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; |
||||
|
||||
import org.apache.commons.lang3.StringUtils; |
||||
|
||||
import java.util.Collections; |
||||
import java.util.List; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
/** |
||||
* TIS parameter |
||||
*/ |
||||
@Slf4j |
||||
public class PigeonParameters extends AbstractParameters { |
||||
|
||||
/** |
||||
* Pigeon target job name |
||||
*/ |
||||
private String targetJobName; |
||||
|
||||
public String getTargetJobName() { |
||||
return targetJobName; |
||||
} |
||||
|
||||
public void setTargetJobName(String targetJobName) { |
||||
this.targetJobName = targetJobName; |
||||
} |
||||
|
||||
@Override |
||||
public boolean checkParameters() { |
||||
if (StringUtils.isBlank(this.targetJobName)) { |
||||
log.error("checkParameters faild targetJobName can not be null"); |
||||
return false; |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public List<ResourceInfo> getResourceFilesList() { |
||||
return Collections.emptyList(); |
||||
} |
||||
} |
@ -1,27 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.pigeon; |
||||
|
||||
public class PigeonParamsConstants { |
||||
|
||||
public static String NAME_TARGET_JOB_NAME = "targetJobName"; |
||||
public static String TARGET_JOB_NAME = NAME_TARGET_JOB_NAME; |
||||
|
||||
private PigeonParamsConstants() { |
||||
} |
||||
} |
@ -1,430 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.pigeon; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskException; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; |
||||
|
||||
import org.apache.commons.collections4.CollectionUtils; |
||||
import org.apache.commons.lang3.StringUtils; |
||||
import org.apache.http.HttpEntity; |
||||
import org.apache.http.StatusLine; |
||||
import org.apache.http.client.ClientProtocolException; |
||||
import org.apache.http.client.methods.CloseableHttpResponse; |
||||
import org.apache.http.client.methods.HttpPost; |
||||
import org.apache.http.entity.StringEntity; |
||||
import org.apache.http.impl.client.CloseableHttpClient; |
||||
import org.apache.http.impl.client.HttpClients; |
||||
import org.apache.http.util.EntityUtils; |
||||
|
||||
import java.net.HttpURLConnection; |
||||
import java.net.URI; |
||||
import java.nio.charset.StandardCharsets; |
||||
import java.util.Collections; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Objects; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import org.java_websocket.client.WebSocketClient; |
||||
import org.java_websocket.handshake.ServerHandshake; |
||||
|
||||
/** |
||||
* TIS DataX Task |
||||
**/ |
||||
@Slf4j |
||||
public class PigeonTask extends AbstractRemoteTask { |
||||
|
||||
public static final String KEY_POOL_VAR_PIGEON_HOST = "p_host"; |
||||
private final TaskExecutionContext taskExecutionContext; |
||||
|
||||
private PigeonParameters parameters; |
||||
private BizResult triggerResult; |
||||
private final PigeonConfig config; |
||||
|
||||
public PigeonTask(TaskExecutionContext taskExecutionContext) { |
||||
super(taskExecutionContext); |
||||
this.taskExecutionContext = taskExecutionContext; |
||||
this.config = PigeonConfig.getInstance(); |
||||
} |
||||
|
||||
@Override |
||||
public List<String> getApplicationIds() throws TaskException { |
||||
return Collections.emptyList(); |
||||
} |
||||
|
||||
@Override |
||||
public void init() throws TaskException { |
||||
super.init(); |
||||
parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), PigeonParameters.class); |
||||
log.info("Initialize PIGEON task params {}", JSONUtils.toPrettyJsonString(parameters)); |
||||
if (parameters == null || !parameters.checkParameters()) { |
||||
throw new TaskException("datax task params is not valid"); |
||||
} |
||||
} |
||||
|
||||
// todo split handle to submit and track
|
||||
@Override |
||||
public void handle(TaskCallBack taskCallBack) throws TaskException { |
||||
// Trigger PIGEON DataX pipeline
|
||||
log.info("start execute PIGEON task"); |
||||
long startTime = System.currentTimeMillis(); |
||||
String targetJobName = this.parameters.getTargetJobName(); |
||||
String host = getHost(); |
||||
try { |
||||
final String triggerUrl = getTriggerUrl(); |
||||
final String getStatusUrl = config.getJobStatusUrl(host); |
||||
HttpPost post = new HttpPost(triggerUrl); |
||||
post.addHeader("appname", targetJobName); |
||||
addFormUrlencoded(post); |
||||
StringEntity entity = new StringEntity(config.getJobTriggerPostBody(), StandardCharsets.UTF_8); |
||||
post.setEntity(entity); |
||||
ExecResult execState = null; |
||||
int taskId; |
||||
WebSocketClient webSocket = null; |
||||
try ( |
||||
CloseableHttpClient client = HttpClients.createDefault(); |
||||
// trigger to start PIGEON dataX task
|
||||
CloseableHttpResponse response = client.execute(post)) { |
||||
triggerResult = processResponse(triggerUrl, response, BizResult.class); |
||||
if (!triggerResult.isSuccess()) { |
||||
List<String> errormsg = triggerResult.getErrormsg(); |
||||
StringBuffer errs = new StringBuffer(); |
||||
if (CollectionUtils.isNotEmpty(errormsg)) { |
||||
errs.append(",errs:").append(errormsg.stream().collect(Collectors.joining(","))); |
||||
} |
||||
throw new Exception("trigger PIGEON job faild taskName:" + targetJobName + errs.toString()); |
||||
} |
||||
taskId = triggerResult.getBizresult().getTaskid(); |
||||
|
||||
webSocket = receiveRealtimeLog(host, targetJobName, taskId); |
||||
|
||||
setAppIds(String.valueOf(taskId)); |
||||
|
||||
CloseableHttpResponse status = null; |
||||
|
||||
while (true) { |
||||
try { |
||||
post = new HttpPost(getStatusUrl); |
||||
entity = new StringEntity("{\n taskid: " + taskId + "\n, log: false }", StandardCharsets.UTF_8); |
||||
post.setEntity(entity); |
||||
status = client.execute(post); |
||||
StatusResult execStatus = processResponse(getStatusUrl, status, StatusResult.class); |
||||
Map bizresult = execStatus.getBizresult(); |
||||
Map s = (Map) bizresult.get("status"); |
||||
execState = ExecResult.parse((Integer) s.get("state")); |
||||
if (execState == ExecResult.SUCCESS || execState == ExecResult.FAILD) { |
||||
break; |
||||
} |
||||
Thread.sleep(3000); |
||||
} finally { |
||||
status.close(); |
||||
} |
||||
} |
||||
} finally { |
||||
if (webSocket != null) { |
||||
Thread.sleep(4000); |
||||
try { |
||||
webSocket.close(); |
||||
} catch (Throwable e) { |
||||
log.warn(e.getMessage(), e); |
||||
} |
||||
} |
||||
} |
||||
|
||||
long costTime = System.currentTimeMillis() - startTime; |
||||
log.info("PIGEON task: {},taskId:{} costTime : {} milliseconds, statusCode : {}", |
||||
targetJobName, taskId, costTime, (execState == ExecResult.SUCCESS) ? "'success'" : "'failure'"); |
||||
setExitStatusCode((execState == ExecResult.SUCCESS) ? TaskConstants.EXIT_CODE_SUCCESS |
||||
: TaskConstants.EXIT_CODE_FAILURE); |
||||
} catch (Exception e) { |
||||
log.error("execute PIGEON dataX faild,PIGEON task name:" + targetJobName, e); |
||||
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); |
||||
if (e instanceof InterruptedException) { |
||||
Thread.currentThread().interrupt(); |
||||
} |
||||
throw new TaskException("Execute pigeon task failed", e); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void submitApplication() throws TaskException { |
||||
|
||||
} |
||||
|
||||
@Override |
||||
public void trackApplicationStatus() throws TaskException { |
||||
|
||||
} |
||||
|
||||
private void addFormUrlencoded(HttpPost post) { |
||||
post.addHeader("content-type", "application/x-www-form-urlencoded"); |
||||
} |
||||
|
||||
@Override |
||||
public void cancelApplication() throws TaskException { |
||||
log.info("start to cancelApplication"); |
||||
Objects.requireNonNull(triggerResult, "triggerResult can not be null"); |
||||
log.info("start to cancelApplication taskId:{}", triggerResult.getTaskId()); |
||||
final String triggerUrl = getTriggerUrl(); |
||||
|
||||
StringEntity entity = |
||||
new StringEntity(config.getJobCancelPostBody(triggerResult.getTaskId()), StandardCharsets.UTF_8); |
||||
|
||||
CancelResult cancelResult = null; |
||||
HttpPost post = new HttpPost(triggerUrl); |
||||
addFormUrlencoded(post); |
||||
post.setEntity(entity); |
||||
try ( |
||||
CloseableHttpClient client = HttpClients.createDefault(); |
||||
// trigger to start TIS dataX task
|
||||
CloseableHttpResponse response = client.execute(post)) { |
||||
cancelResult = processResponse(triggerUrl, response, CancelResult.class); |
||||
if (!cancelResult.isSuccess()) { |
||||
List<String> errormsg = triggerResult.getErrormsg(); |
||||
StringBuffer errs = new StringBuffer(); |
||||
if (CollectionUtils.isNotEmpty(errormsg)) { |
||||
errs.append(",errs:").append(errormsg.stream().collect(Collectors.joining(","))); |
||||
} |
||||
throw new TaskException("cancel PIGEON job faild taskId:" + triggerResult.getTaskId() + errs); |
||||
} |
||||
} catch (ClientProtocolException e) { |
||||
throw new TaskException("client protocol error", e); |
||||
} catch (Exception e) { |
||||
throw new TaskException("pigeon execute error", e); |
||||
} |
||||
} |
||||
|
||||
private String getTriggerUrl() { |
||||
final String tisHost = getHost(); |
||||
return config.getJobTriggerUrl(tisHost); |
||||
} |
||||
|
||||
private String getHost() { |
||||
final String host = taskExecutionContext.getDefinedParams().get(KEY_POOL_VAR_PIGEON_HOST); |
||||
if (StringUtils.isEmpty(host)) { |
||||
throw new IllegalStateException("global var '" + KEY_POOL_VAR_PIGEON_HOST + "' can not be empty"); |
||||
} |
||||
return host; |
||||
} |
||||
|
||||
private WebSocketClient receiveRealtimeLog(final String tisHost, String dataXName, int taskId) throws Exception { |
||||
final String applyURI = config.getJobLogsFetchUrl(tisHost, dataXName, taskId); |
||||
log.info("apply ws connection,uri:{}", applyURI); |
||||
WebSocketClient webSocketClient = new WebSocketClient(new URI(applyURI)) { |
||||
|
||||
@Override |
||||
public void onOpen(ServerHandshake handshakedata) { |
||||
log.info("start to receive remote execute log"); |
||||
} |
||||
|
||||
@Override |
||||
public void onMessage(String message) { |
||||
ExecLog execLog = JSONUtils.parseObject(message, ExecLog.class); |
||||
log.info(execLog.getMsg()); |
||||
} |
||||
|
||||
@Override |
||||
public void onClose(int code, String reason, boolean remote) { |
||||
log.info("stop to receive remote log,reason:{},taskId:{}", reason, taskId); |
||||
} |
||||
|
||||
@Override |
||||
public void onError(Exception t) { |
||||
log.error(t.getMessage(), t); |
||||
} |
||||
}; |
||||
webSocketClient.connect(); |
||||
return webSocketClient; |
||||
} |
||||
|
||||
private <T extends AjaxResult> T processResponse(String applyUrl, CloseableHttpResponse response, |
||||
Class<T> clazz) throws Exception { |
||||
StatusLine resStatus = response.getStatusLine(); |
||||
if (HttpURLConnection.HTTP_OK != resStatus.getStatusCode()) { |
||||
throw new IllegalStateException("request server " + applyUrl + " faild:" + resStatus.getReasonPhrase()); |
||||
} |
||||
HttpEntity entity = response.getEntity(); |
||||
String resp = EntityUtils.toString(entity, StandardCharsets.UTF_8); |
||||
T result = JSONUtils.parseObject(resp, clazz); |
||||
return result; |
||||
} |
||||
|
||||
@Override |
||||
public AbstractParameters getParameters() { |
||||
Objects.requireNonNull(this.parameters, "tisParameters can not be null"); |
||||
return this.parameters; |
||||
} |
||||
|
||||
private static class CancelResult extends AjaxResult<Object> { |
||||
|
||||
private Object bizresult; |
||||
|
||||
@Override |
||||
public Object getBizresult() { |
||||
return this.bizresult; |
||||
} |
||||
|
||||
public void setBizresult(Object bizresult) { |
||||
this.bizresult = bizresult; |
||||
} |
||||
} |
||||
|
||||
private static class BizResult extends AjaxResult<TriggerBuildResult> { |
||||
|
||||
private TriggerBuildResult bizresult; |
||||
|
||||
@Override |
||||
public TriggerBuildResult getBizresult() { |
||||
return this.bizresult; |
||||
} |
||||
|
||||
public int getTaskId() { |
||||
return bizresult.taskid; |
||||
} |
||||
|
||||
public void setBizresult(TriggerBuildResult bizresult) { |
||||
this.bizresult = bizresult; |
||||
} |
||||
} |
||||
|
||||
private static class StatusResult extends AjaxResult<Map> { |
||||
|
||||
private Map bizresult; |
||||
|
||||
@Override |
||||
public Map getBizresult() { |
||||
return this.bizresult; |
||||
} |
||||
|
||||
public void setBizresult(Map bizresult) { |
||||
this.bizresult = bizresult; |
||||
} |
||||
} |
||||
|
||||
private abstract static class AjaxResult<T> { |
||||
|
||||
private boolean success; |
||||
|
||||
private List<String> errormsg; |
||||
|
||||
private List<String> msg; |
||||
|
||||
public abstract T getBizresult(); |
||||
|
||||
public boolean isSuccess() { |
||||
return success; |
||||
} |
||||
|
||||
public void setSuccess(boolean success) { |
||||
this.success = success; |
||||
} |
||||
|
||||
public List<String> getErrormsg() { |
||||
return this.errormsg; |
||||
} |
||||
|
||||
public void setErrormsg(List<String> errormsg) { |
||||
this.errormsg = errormsg; |
||||
} |
||||
|
||||
public List<String> getMsg() { |
||||
return this.msg; |
||||
} |
||||
|
||||
public void setMsg(List<String> msg) { |
||||
this.msg = msg; |
||||
} |
||||
|
||||
} |
||||
|
||||
private static class TriggerBuildResult { |
||||
|
||||
private int taskid; |
||||
|
||||
public int getTaskid() { |
||||
return taskid; |
||||
} |
||||
|
||||
public void setTaskid(int taskid) { |
||||
this.taskid = taskid; |
||||
} |
||||
} |
||||
|
||||
private enum ExecResult { |
||||
|
||||
SUCCESS(1), FAILD(-1), DOING(2), ASYN_DOING(22), CANCEL(3); |
||||
|
||||
private final int value; |
||||
|
||||
public static ExecResult parse(int value) { |
||||
for (ExecResult r : values()) { |
||||
if (r.value == value) { |
||||
return r; |
||||
} |
||||
} |
||||
throw new IllegalStateException("vale:" + value + " is illegal"); |
||||
} |
||||
|
||||
private ExecResult(int value) { |
||||
this.value = value; |
||||
} |
||||
|
||||
public int getValue() { |
||||
return this.value; |
||||
} |
||||
} |
||||
|
||||
private static class ExecLog { |
||||
|
||||
private String logType; |
||||
private String msg; |
||||
private int taskId; |
||||
|
||||
public String getLogType() { |
||||
return logType; |
||||
} |
||||
|
||||
public void setLogType(String logType) { |
||||
this.logType = logType; |
||||
} |
||||
|
||||
public String getMsg() { |
||||
return msg; |
||||
} |
||||
|
||||
public void setMsg(String msg) { |
||||
this.msg = msg; |
||||
} |
||||
|
||||
public int getTaskId() { |
||||
return taskId; |
||||
} |
||||
|
||||
public void setTaskId(int taskId) { |
||||
this.taskId = taskId; |
||||
} |
||||
} |
||||
} |
@ -1,41 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.pigeon; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; |
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
@Slf4j |
||||
public class PigeonTaskChannel implements TaskChannel { |
||||
|
||||
@Override |
||||
public AbstractTask createTask(TaskExecutionContext taskRequest) { |
||||
return new PigeonTask(taskRequest); |
||||
} |
||||
|
||||
@Override |
||||
public AbstractParameters parseParameters(String taskParams) { |
||||
return JSONUtils.parseObject(taskParams, PigeonParameters.class); |
||||
} |
||||
|
||||
} |
@ -1,38 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.pigeon; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; |
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; |
||||
|
||||
import com.google.auto.service.AutoService; |
||||
|
||||
@AutoService(TaskChannelFactory.class) |
||||
public class PigeonTaskChannelFactory implements TaskChannelFactory { |
||||
|
||||
@Override |
||||
public TaskChannel create() { |
||||
return new PigeonTaskChannel(); |
||||
} |
||||
|
||||
@Override |
||||
public String getName() { |
||||
return "PIGEON"; |
||||
} |
||||
|
||||
} |
@ -1,26 +0,0 @@
|
||||
# |
||||
# Licensed to the Apache Software Foundation (ASF) under one or more |
||||
# contributor license agreements. See the NOTICE file distributed with |
||||
# this work for additional information regarding copyright ownership. |
||||
# The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
# (the "License"); you may not use this file except in compliance with |
||||
# the License. You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
# |
||||
|
||||
job.trigger.url=http://%s/tjs/coredefine/coredefine.ajax |
||||
job.trigger.post.body=action=datax_action&emethod=trigger_fullbuild_task |
||||
|
||||
job.cancel.post.body=action=core_action&event_submit_do_cancel_task=y&taskid=%s |
||||
|
||||
job.status.url=http://%s/tjs/config/config.ajax?action=collection_action&emethod=get_task_status |
||||
job.status.post.body={\n taskid: %s\n, log: false } |
||||
|
||||
job.logs.fetch.url=ws://%s/tjs/download/logfeedback?logtype=full&collection=%s&taskid=%s |
@ -1,142 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.pigeon; |
||||
|
||||
import static com.github.dreamhead.moco.Moco.file; |
||||
import static com.github.dreamhead.moco.MocoJsonRunner.jsonHttpServer; |
||||
import static com.github.dreamhead.moco.Runner.running; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; |
||||
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; |
||||
|
||||
import org.apache.commons.io.IOUtils; |
||||
|
||||
import java.io.InputStream; |
||||
import java.nio.charset.StandardCharsets; |
||||
import java.util.Collections; |
||||
import java.util.Map; |
||||
import java.util.Objects; |
||||
import java.util.UUID; |
||||
|
||||
import org.junit.jupiter.api.Assertions; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.mockito.Mockito; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import com.github.dreamhead.moco.HttpServer; |
||||
|
||||
public class PigeonTaskTest { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(PigeonTaskTest.class); |
||||
private PigeonTask pigeonTask; |
||||
|
||||
private TaskExecutionContext taskExecutionContext; |
||||
|
||||
@BeforeEach |
||||
public void before() throws Exception { |
||||
|
||||
String taskParams = "{\"targetJobName\":\"mysql_elastic\"}"; |
||||
|
||||
taskExecutionContext = Mockito.mock(TaskExecutionContext.class); |
||||
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(taskParams); |
||||
Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp"); |
||||
Mockito.when(taskExecutionContext.getTaskAppId()).thenReturn(UUID.randomUUID().toString()); |
||||
Mockito.when(taskExecutionContext.getTenantCode()).thenReturn("root"); |
||||
Mockito.when(taskExecutionContext.getStartTime()).thenReturn(System.currentTimeMillis()); |
||||
Mockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000); |
||||
Mockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dx"); |
||||
// Mockito.when(taskExecutionContext.getVarPool())
|
||||
// .thenReturn("[{\"direct\":\"IN\",\"prop\":\"" + TISTask.KEY_POOL_VAR_TIS_HOST +
|
||||
// "\",\"type\":\"VARCHAR\",\"value\":\"127.0.0.1:8080\"}]");
|
||||
Map<String, String> gloabParams = |
||||
Collections.singletonMap(PigeonTask.KEY_POOL_VAR_PIGEON_HOST, "127.0.0.1:8080"); |
||||
Mockito.when(taskExecutionContext.getDefinedParams()).thenReturn(gloabParams); |
||||
|
||||
pigeonTask = new PigeonTask(taskExecutionContext); |
||||
pigeonTask.init(); |
||||
|
||||
} |
||||
|
||||
@Test |
||||
public void testGetTISConfigParams() { |
||||
PigeonConfig cfg = PigeonConfig.getInstance(); |
||||
String tisHost = "127.0.0.1:8080"; |
||||
Assertions.assertEquals("http://127.0.0.1:8080/tjs/coredefine/coredefine.ajax", cfg.getJobTriggerUrl(tisHost)); |
||||
String jobName = "mysql_elastic"; |
||||
int taskId = 123; |
||||
Assertions.assertEquals( |
||||
"ws://" + tisHost + "/tjs/download/logfeedback?logtype=full&collection=mysql_elastic&taskid=" + taskId, |
||||
cfg.getJobLogsFetchUrl(tisHost, jobName, taskId)); |
||||
|
||||
Assertions.assertEquals("action=datax_action&emethod=trigger_fullbuild_task", cfg.getJobTriggerPostBody()); |
||||
|
||||
Assertions.assertEquals( |
||||
"http://127.0.0.1:8080/tjs/config/config.ajax?action=collection_action&emethod=get_task_status", |
||||
cfg.getJobStatusUrl(tisHost)); |
||||
|
||||
Assertions.assertEquals("{\n taskid: " + taskId + "\n, log: false }", cfg.getJobStatusPostBody(taskId)); |
||||
|
||||
Assertions.assertEquals("action=core_action&event_submit_do_cancel_task=y&taskid=" + taskId, |
||||
cfg.getJobCancelPostBody(taskId)); |
||||
} |
||||
|
||||
@Test |
||||
public void testInit() throws Exception { |
||||
try { |
||||
pigeonTask.init(); |
||||
} catch (Exception e) { |
||||
Assertions.fail(e.getMessage()); |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void testHandle() throws Exception { |
||||
HttpServer server = jsonHttpServer(8080, |
||||
file("src/test/resources/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTaskTest.json")); |
||||
|
||||
running(server, () -> { |
||||
pigeonTask.handle(null); |
||||
|
||||
Assertions.assertEquals(TaskExecutionStatus.SUCCESS, pigeonTask.getExitStatus()); |
||||
}); |
||||
} |
||||
|
||||
private String loadResContent(String resName) { |
||||
try (InputStream i = this.getClass().getResourceAsStream(resName)) { |
||||
Objects.requireNonNull(i, "resource " + resName + " relevant stream content can not be null"); |
||||
String content = IOUtils.toString(i, StandardCharsets.UTF_8); |
||||
|
||||
return content; |
||||
} catch (Exception e) { |
||||
throw new RuntimeException(e); |
||||
} |
||||
} |
||||
|
||||
// @Test
|
||||
// public void testCancelApplication()
|
||||
// throws Exception {
|
||||
// try {
|
||||
// tisTask.cancelApplication(true);
|
||||
// } catch (Exception e) {
|
||||
// Assertions.fail(e.getMessage());
|
||||
// }
|
||||
// }
|
||||
|
||||
} |
@ -1,60 +0,0 @@
|
||||
[ |
||||
{ |
||||
"description": "trigger task execute", |
||||
"request": { |
||||
"uri": "/tjs/coredefine/coredefine.ajax", |
||||
"method": "post", |
||||
"headers": { |
||||
"Content-Type": "application/x-www-form-urlencoded", |
||||
"appname": "mysql_elastic" |
||||
}, |
||||
"text": "action=datax_action&emethod=trigger_fullbuild_task" |
||||
}, |
||||
"response": { |
||||
"text": "{\n \"success\":true,\n \"errormsg\":[],\n \"msg\":[],\n \"bizresult\":{\"taskid\": \"1087\"}\n}" |
||||
} |
||||
}, |
||||
{ |
||||
"description": "Get task execute status", |
||||
"request": { |
||||
"uri": "/tjs/config/config.ajax", |
||||
"method": "post", |
||||
"headers": { |
||||
"Content-Type": "text/plain; charset=UTF-8" |
||||
}, |
||||
"queries": { |
||||
"action": "collection_action", |
||||
"emethod": "get_task_status" |
||||
}, |
||||
"text": "{\n taskid: 1087\n, log: false }" |
||||
}, |
||||
"response": { |
||||
"seq": [ |
||||
{ |
||||
"text": "{\n \"success\": true,\n \"errormsg\": [\n \"err1\"\n ],\n \"bizresult\": {\n \"status\": {\n \"state\": 2\n }\n }\n}" |
||||
}, |
||||
{ |
||||
"text": "{\n \"success\": true,\n \"errormsg\": [\n \"err1\"\n ],\n \"bizresult\": {\n \"status\": {\n \"state\": 1\n }\n }\n}" |
||||
} |
||||
] |
||||
} |
||||
}, |
||||
{ |
||||
"websocket": { |
||||
"uri": "/tjs/download/logfeedback", |
||||
"connected": "connected", |
||||
"sessions": [ |
||||
{ |
||||
"request": { |
||||
"text": "logtype=full&collection=mysql_elastic&taskid=1087" |
||||
}, |
||||
"response": { |
||||
"broadcast": { |
||||
"content": "{\n \"logType\": \"FULL\",\n \"msg\": \"message 1\",\n \"taskId\": \"1087\"\n}" |
||||
} |
||||
} |
||||
} |
||||
] |
||||
} |
||||
} |
||||
] |
Before Width: | Height: | Size: 1.2 KiB |
Before Width: | Height: | Size: 1.1 KiB |
@ -1,68 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
import { reactive } from 'vue' |
||||
import * as Fields from '../fields/index' |
||||
import type { IJsonItem, INodeData, ITaskData } from '../types' |
||||
|
||||
export function usePigeon({ |
||||
projectCode, |
||||
from = 0, |
||||
readonly, |
||||
data |
||||
}: { |
||||
projectCode: number |
||||
from?: number |
||||
readonly?: boolean |
||||
data?: ITaskData |
||||
}) { |
||||
const model = reactive({ |
||||
taskType: 'PIGEON', |
||||
name: '', |
||||
flag: 'YES', |
||||
description: '', |
||||
timeoutFlag: false, |
||||
environmentCode: null, |
||||
failRetryInterval: 1, |
||||
failRetryTimes: 0, |
||||
workerGroup: 'default', |
||||
delayTime: 0, |
||||
timeout: 30, |
||||
targetJobName: '', |
||||
timeoutNotifyStrategy: ['WARN'] |
||||
} as INodeData) |
||||
|
||||
return { |
||||
json: [ |
||||
Fields.useName(from), |
||||
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }), |
||||
Fields.useRunFlag(), |
||||
Fields.useCache(), |
||||
Fields.useDescription(), |
||||
Fields.useTaskPriority(), |
||||
Fields.useWorkerGroup(projectCode), |
||||
Fields.useEnvironmentName(model, !data?.id), |
||||
...Fields.useTaskGroup(model, projectCode), |
||||
...Fields.useFailed(), |
||||
Fields.useDelayTime(model), |
||||
...Fields.useTimeoutAlarm(model), |
||||
Fields.useTargetTaskName(), |
||||
Fields.usePreTasks() |
||||
] as IJsonItem[], |
||||
model |
||||
} |
||||
} |
Loading…
Reference in new issue