diff --git a/.github/workflows/ci_e2e.yml b/.github/workflows/ci_e2e.yml index 82c81ef4e1..ebd53643eb 100644 --- a/.github/workflows/ci_e2e.yml +++ b/.github/workflows/ci_e2e.yml @@ -58,7 +58,9 @@ jobs: wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb sudo dpkg -i google-chrome*.deb sudo apt-get install -f -y - wget -N https://chromedriver.storage.googleapis.com/83.0.4103.39/chromedriver_linux64.zip + google-chrome -version + googleVersion=$(curl -s https://chromedriver.storage.googleapis.com/LATEST_RELEASE) + wget -N https://chromedriver.storage.googleapis.com/${googleVersion}/chromedriver_linux64.zip unzip chromedriver_linux64.zip sudo mv -f chromedriver /usr/local/share/chromedriver sudo ln -s /usr/local/share/chromedriver /usr/local/bin/chromedriver @@ -66,9 +68,7 @@ jobs: run: cd ./e2e && mvn -B clean test - name: Collect logs if: failure() - uses: actions/upload-artifact@v1 + uses: actions/upload-artifact@v2 with: name: dslogs - path: /var/lib/docker/volumes/docker-swarm_dolphinscheduler-logs/_data - - + path: ${{ github.workspace }}/docker/docker-swarm/dolphinscheduler-logs \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java new file mode 100644 index 0000000000..9cec2766f1 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.enums; + +public enum Event { + ACK, + RESULT; +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index d1ffc65f57..17c1e44fd3 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -1 +1,110 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * remove task log request, */ REMOVE_TAK_LOG_REQUEST, /** * remove task log response */ REMOVE_TAK_LOG_RESPONSE, /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ TASK_EXECUTE_REQUEST, /** * execute task ack */ TASK_EXECUTE_ACK, /** * execute task response */ TASK_EXECUTE_RESPONSE, /** * kill task */ TASK_KILL_REQUEST, /** * kill task response */ TASK_KILL_RESPONSE, /** * ping */ PING, /** * pong */ PONG; } \ No newline at end of file +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +public enum CommandType { + + /** + * remove task log request, + */ + REMOVE_TAK_LOG_REQUEST, + + /** + * remove task log response + */ + REMOVE_TAK_LOG_RESPONSE, + + /** + * roll view log request + */ + ROLL_VIEW_LOG_REQUEST, + + /** + * roll view log response + */ + ROLL_VIEW_LOG_RESPONSE, + + /** + * view whole log request + */ + VIEW_WHOLE_LOG_REQUEST, + + /** + * view whole log response + */ + VIEW_WHOLE_LOG_RESPONSE, + + /** + * get log bytes request + */ + GET_LOG_BYTES_REQUEST, + + /** + * get log bytes response + */ + GET_LOG_BYTES_RESPONSE, + + + WORKER_REQUEST, + MASTER_RESPONSE, + + /** + * execute task request + */ + TASK_EXECUTE_REQUEST, + + /** + * execute task ack + */ + TASK_EXECUTE_ACK, + + /** + * execute task response + */ + TASK_EXECUTE_RESPONSE, + + /** + * db task ack + */ + DB_TASK_ACK, + + /** + * db task response + */ + DB_TASK_RESPONSE, + + /** + * kill task + */ + TASK_KILL_REQUEST, + + /** + * kill task response + */ + TASK_KILL_RESPONSE, + + /** + * ping + */ + PING, + + /** + * pong + */ + PONG; +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java new file mode 100644 index 0000000000..f37eb979fc --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; + +import java.io.Serializable; + +/** + * db task ack request command + */ +public class DBTaskAckCommand implements Serializable { + + private int taskInstanceId; + private int status; + + public DBTaskAckCommand(int status,int taskInstanceId) { + this.status = status; + this.taskInstanceId = taskInstanceId; + } + + public int getTaskInstanceId() { + return taskInstanceId; + } + + public void setTaskInstanceId(int taskInstanceId) { + this.taskInstanceId = taskInstanceId; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + /** + * package response command + * @return command + */ + public Command convert2Command(){ + Command command = new Command(); + command.setType(CommandType.DB_TASK_ACK); + byte[] body = FastJsonSerializer.serialize(this); + command.setBody(body); + return command; + } + + + @Override + public String toString() { + return "DBTaskAckCommand{" + + "taskInstanceId=" + taskInstanceId + + ", status=" + status + + '}'; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java new file mode 100644 index 0000000000..a64029822c --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; + +import java.io.Serializable; + +/** + * db task final result response command + */ +public class DBTaskResponseCommand implements Serializable { + + private int taskInstanceId; + private int status; + + public DBTaskResponseCommand(int status,int taskInstanceId) { + this.status = status; + this.taskInstanceId = taskInstanceId; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public int getTaskInstanceId() { + return taskInstanceId; + } + + public void setTaskInstanceId(int taskInstanceId) { + this.taskInstanceId = taskInstanceId; + } + + /** + * package response command + * @return command + */ + public Command convert2Command(){ + Command command = new Command(); + command.setType(CommandType.DB_TASK_RESPONSE); + byte[] body = FastJsonSerializer.serialize(this); + command.setBody(body); + return command; + } + + @Override + public String toString() { + return "DBTaskResponseCommand{" + + "taskInstanceId=" + taskInstanceId + + ", status=" + status + + '}'; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index 7bc37590aa..9ca9645f03 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -19,10 +19,7 @@ package org.apache.dolphinscheduler.server.master.processor; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.Preconditions; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; @@ -34,11 +31,9 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.dolphinscheduler.common.Constants.*; /** * task ack processor @@ -57,16 +52,9 @@ public class TaskAckProcessor implements NettyRequestProcessor { */ private final TaskInstanceCacheManager taskInstanceCacheManager; - - /** - * processService - */ - private ProcessService processService; - public TaskAckProcessor(){ this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); - this.processService = SpringApplicationContext.getBean(ProcessService.class); } /** @@ -92,19 +80,10 @@ public class TaskAckProcessor implements NettyRequestProcessor { workerAddress, taskAckCommand.getExecutePath(), taskAckCommand.getLogPath(), - taskAckCommand.getTaskInstanceId()); + taskAckCommand.getTaskInstanceId(), + channel); taskResponseService.addResponse(taskResponseEvent); - - while (Stopper.isRunning()){ - TaskInstance taskInstance = processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId()); - - if (taskInstance != null && ackStatus.typeIsRunning()){ - break; - } - ThreadUtils.sleep(SLEEP_TIME_MILLIS); - } - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 721b146d86..5982f6c4b7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -19,10 +19,7 @@ package org.apache.dolphinscheduler.server.master.processor; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.Preconditions; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; @@ -33,11 +30,9 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.dolphinscheduler.common.Constants.*; /** * task response processor @@ -56,15 +51,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor { */ private final TaskInstanceCacheManager taskInstanceCacheManager; - /** - * processService - */ - private ProcessService processService; - public TaskResponseProcessor(){ this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); - this.processService = SpringApplicationContext.getBean(ProcessService.class); } /** @@ -83,25 +72,14 @@ public class TaskResponseProcessor implements NettyRequestProcessor { taskInstanceCacheManager.cacheTaskInstance(responseCommand); - ExecutionStatus responseStatus = ExecutionStatus.of(responseCommand.getStatus()); - // TaskResponseEvent TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), responseCommand.getProcessId(), responseCommand.getAppIds(), - responseCommand.getTaskInstanceId()); - + responseCommand.getTaskInstanceId(), + channel); taskResponseService.addResponse(taskResponseEvent); - - while (Stopper.isRunning()){ - TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); - - if (taskInstance != null && responseStatus.typeIsFinished()){ - break; - } - ThreadUtils.sleep(SLEEP_TIME_MILLIS); - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java index 9e8813fd7f..494d95f6d8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.server.master.processor.queue; +import io.netty.channel.Channel; +import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import java.util.Date; @@ -76,7 +78,18 @@ public class TaskResponseEvent { */ private Event event; - public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, String workerAddress, String executePath, String logPath, int taskInstanceId){ + /** + * channel + */ + private Channel channel; + + public static TaskResponseEvent newAck(ExecutionStatus state, + Date startTime, + String workerAddress, + String executePath, + String logPath, + int taskInstanceId, + Channel channel){ TaskResponseEvent event = new TaskResponseEvent(); event.setState(state); event.setStartTime(startTime); @@ -85,10 +98,16 @@ public class TaskResponseEvent { event.setLogPath(logPath); event.setTaskInstanceId(taskInstanceId); event.setEvent(Event.ACK); + event.setChannel(channel); return event; } - public static TaskResponseEvent newResult(ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstanceId){ + public static TaskResponseEvent newResult(ExecutionStatus state, + Date endTime, + int processId, + String appIds, + int taskInstanceId, + Channel channel){ TaskResponseEvent event = new TaskResponseEvent(); event.setState(state); event.setEndTime(endTime); @@ -96,6 +115,7 @@ public class TaskResponseEvent { event.setAppIds(appIds); event.setTaskInstanceId(taskInstanceId); event.setEvent(Event.RESULT); + event.setChannel(channel); return event; } @@ -179,8 +199,11 @@ public class TaskResponseEvent { this.event = event; } - public enum Event{ - ACK, - RESULT; + public Channel getChannel() { + return channel; + } + + public void setChannel(Channel channel) { + this.channel = channel; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index b9772ca523..f365db70bb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -17,7 +17,13 @@ package org.apache.dolphinscheduler.server.master.processor.queue; +import io.netty.channel.Channel; +import org.apache.dolphinscheduler.common.enums.Event; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand; +import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,23 +127,48 @@ public class TaskResponseService { * @param taskResponseEvent taskResponseEvent */ private void persist(TaskResponseEvent taskResponseEvent){ - TaskResponseEvent.Event event = taskResponseEvent.getEvent(); + Event event = taskResponseEvent.getEvent(); + Channel channel = taskResponseEvent.getChannel(); switch (event){ case ACK: - processService.changeTaskState(taskResponseEvent.getState(), - taskResponseEvent.getStartTime(), - taskResponseEvent.getWorkerAddress(), - taskResponseEvent.getExecutePath(), - taskResponseEvent.getLogPath(), - taskResponseEvent.getTaskInstanceId()); + try { + TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); + if (taskInstance != null){ + processService.changeTaskState(taskResponseEvent.getState(), + taskResponseEvent.getStartTime(), + taskResponseEvent.getWorkerAddress(), + taskResponseEvent.getExecutePath(), + taskResponseEvent.getLogPath(), + taskResponseEvent.getTaskInstanceId()); + } + // if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success + DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId()); + channel.writeAndFlush(taskAckCommand.convert2Command()); + }catch (Exception e){ + logger.error("worker ack master error",e); + DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(),-1); + channel.writeAndFlush(taskAckCommand.convert2Command()); + } break; case RESULT: - processService.changeTaskState(taskResponseEvent.getState(), - taskResponseEvent.getEndTime(), - taskResponseEvent.getProcessId(), - taskResponseEvent.getAppIds(), - taskResponseEvent.getTaskInstanceId()); + try { + TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); + if (taskInstance != null){ + processService.changeTaskState(taskResponseEvent.getState(), + taskResponseEvent.getEndTime(), + taskResponseEvent.getProcessId(), + taskResponseEvent.getAppIds(), + taskResponseEvent.getTaskInstanceId()); + } + // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success + DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId()); + channel.writeAndFlush(taskResponseCommand.convert2Command()); + }catch (Exception e){ + logger.error("worker response master error",e); + DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(),-1); + channel.writeAndFlush(taskResponseCommand.convert2Command()); + } break; default: throw new IllegalArgumentException("invalid event type : " + event); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index f0833cb7e0..6895de3d4a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -22,9 +22,12 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor; +import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; +import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +73,9 @@ public class WorkerServer { @Autowired private SpringApplicationContext springApplicationContext; + @Autowired + private RetryReportTaskStatusThread retryReportTaskStatusThread; + /** * worker server startup * @@ -95,11 +101,16 @@ public class WorkerServer { this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor()); this.nettyRemotingServer.start(); // worker registry this.workerRegistry.registry(); + // retry report task status + this.retryReportTaskStatusThread.start(); + /** * register hooks, which are called before the process exits */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java new file mode 100644 index 0000000000..3639b8eba3 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.cache; + +import org.apache.dolphinscheduler.common.enums.Event; +import org.apache.dolphinscheduler.remote.command.Command; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Responce Cache : cache worker send master result + */ +public class ResponceCache { + + private static final ResponceCache instance = new ResponceCache(); + + private ResponceCache(){} + + public static ResponceCache get(){ + return instance; + } + + private Map ackCache = new ConcurrentHashMap<>(); + private Map responseCache = new ConcurrentHashMap<>(); + + + /** + * cache response + * @param taskInstanceId taskInstanceId + * @param command command + * @param event event ACK/RESULT + */ + public void cache(Integer taskInstanceId, Command command, Event event){ + switch (event){ + case ACK: + ackCache.put(taskInstanceId,command); + break; + case RESULT: + responseCache.put(taskInstanceId,command); + break; + default: + throw new IllegalArgumentException("invalid event type : " + event); + } + } + + + /** + * remove ack cache + * @param taskInstanceId taskInstanceId + */ + public void removeAckCache(Integer taskInstanceId){ + ackCache.remove(taskInstanceId); + } + + /** + * remove reponse cache + * @param taskInstanceId taskInstanceId + */ + public void removeResponseCache(Integer taskInstanceId){ + responseCache.remove(taskInstanceId); + } + + /** + * getAckCache + * @return getAckCache + */ + public Map getAckCache(){ + return ackCache; + } + + /** + * getResponseCache + * @return getResponseCache + */ + public Map getResponseCache(){ + return responseCache; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java new file mode 100644 index 0000000000..ff0fd8e49e --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.processor; + +import io.netty.channel.Channel; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.utils.Preconditions; +import org.apache.dolphinscheduler.remote.command.*; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * db task ack processor + */ +public class DBTaskAckProcessor implements NettyRequestProcessor { + + private final Logger logger = LoggerFactory.getLogger(DBTaskAckProcessor.class); + + + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.DB_TASK_ACK == command.getType(), + String.format("invalid command type : %s", command.getType())); + + DBTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize( + command.getBody(), DBTaskAckCommand.class); + + if (taskAckCommand == null){ + return; + } + + if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){ + ResponceCache.get().removeAckCache(taskAckCommand.getTaskInstanceId()); + } + } + + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java new file mode 100644 index 0000000000..126defdb67 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.processor; + +import io.netty.channel.Channel; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.utils.Preconditions; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * db task response processor + */ +public class DBTaskResponseProcessor implements NettyRequestProcessor { + + private final Logger logger = LoggerFactory.getLogger(DBTaskResponseProcessor.class); + + + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.DB_TASK_RESPONSE == command.getType(), + String.format("invalid command type : %s", command.getType())); + + DBTaskResponseCommand taskResponseCommand = FastJsonSerializer.deserialize( + command.getBody(), DBTaskResponseCommand.class); + + if (taskResponseCommand == null){ + return; + } + + if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){ + ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId()); + } + } + + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java index cbb8972a33..9762b10041 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java @@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.Host; +import java.util.Random; + /** * callback channel */ @@ -50,6 +52,12 @@ public class NettyRemoteChannel { this.opaque = opaque; } + public NettyRemoteChannel(Channel channel) { + this.channel = channel; + this.host = ChannelUtils.toAddress(channel); + this.opaque = -1; + } + public Channel getChannel() { return channel; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 6a23a9e77d..ca7d3c643e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -86,20 +86,19 @@ public class TaskCallbackService { * @return callback channel */ private NettyRemoteChannel getRemoteChannel(int taskInstanceId){ + Channel newChannel; NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId); - if(nettyRemoteChannel == null){ - throw new IllegalArgumentException("nettyRemoteChannel is empty, should call addRemoteChannel first"); - } - if(nettyRemoteChannel.isActive()){ - return nettyRemoteChannel; - } - Channel newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost()); - if(newChannel != null){ - return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId); + if(nettyRemoteChannel != null){ + if(nettyRemoteChannel.isActive()){ + return nettyRemoteChannel; + } + newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost()); + if(newChannel != null){ + return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId); + } + } - logger.warn("original master : {} for task : {} is not reachable, random select master", - nettyRemoteChannel.getHost(), - taskInstanceId); + Set masterNodes = null; int ntries = 0; while (Stopper.isRunning()) { @@ -119,7 +118,7 @@ public class TaskCallbackService { for (String masterNode : masterNodes) { newChannel = nettyRemotingClient.getChannel(Host.of(masterNode)); if (newChannel != null) { - return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId); + return getRemoteChannel(newChannel,taskInstanceId); } } masterNodes = null; @@ -141,6 +140,12 @@ public class TaskCallbackService { return remoteChannel; } + private NettyRemoteChannel getRemoteChannel(Channel newChannel, int taskInstanceId){ + NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel); + addRemoteChannel(taskInstanceId, remoteChannel); + return remoteChannel; + } + /** * remove callback channels * @param taskInstanceId taskInstanceId diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 4ca110f42b..5ecc2c7b55 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -22,6 +22,7 @@ import ch.qos.logback.classic.sift.SiftingAppender; import com.alibaba.fastjson.JSONObject; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.OSUtils; @@ -36,6 +37,7 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -101,12 +103,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); - try { - this.doAck(taskExecutionContext); - }catch (Exception e){ - ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); - this.doAck(taskExecutionContext); - } + this.doAck(taskExecutionContext); // submit task workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService)); @@ -115,6 +112,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { private void doAck(TaskExecutionContext taskExecutionContext){ // tell master that task is in executing TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext); + ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),ackCommand.convert2Command(),Event.ACK); taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command()); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java new file mode 100644 index 0000000000..ea9bb03e16 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import org.apache.dolphinscheduler.common.thread.Stopper; + +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * Retry Report Task Status Thread + */ +@Component +public class RetryReportTaskStatusThread implements Runnable { + + private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class); + + /** + * every 5 minutes + */ + private static long RETRY_REPORT_TASK_STATUS_TIME = 5 * 60 * 1000L; + /** + * task callback service + */ + private final TaskCallbackService taskCallbackService; + + public void start(){ + Thread thread = new Thread(this,"RetryReportTaskStatusThread"); + thread.start(); + } + + public RetryReportTaskStatusThread(){ + this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); + } + + /** + * retry ack/response + */ + @Override + public void run() { + ResponceCache responceCache = ResponceCache.get(); + + while (Stopper.isRunning()){ + try { + if (!responceCache.getAckCache().isEmpty()){ + Map ackCache = responceCache.getAckCache(); + for (Map.Entry entry : ackCache.entrySet()){ + Integer taskInstanceId = entry.getKey(); + Command ackCommand = entry.getValue(); + taskCallbackService.sendAck(taskInstanceId,ackCommand); + } + } + + if (!responceCache.getResponseCache().isEmpty()){ + Map responseCache = responceCache.getResponseCache(); + for (Map.Entry entry : responseCache.entrySet()){ + Integer taskInstanceId = entry.getKey(); + Command responseCommand = entry.getValue(); + taskCallbackService.sendAck(taskInstanceId,responseCommand); + } + } + }catch (Exception e){ + logger.warn("retry report task status error", e); + } + + ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_TIME); + } + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 592060b0e8..e224896353 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.runner; import com.alibaba.fastjson.JSONObject; import org.apache.commons.collections.MapUtils; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; @@ -30,6 +31,7 @@ import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; @@ -144,13 +146,10 @@ public class TaskExecuteThread implements Runnable { responseCommand.setProcessId(task.getProcessId()); responseCommand.setAppIds(task.getAppIds()); } finally { - try { - taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); - }catch (Exception e){ - ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); - taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); - } + taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),responseCommand.convert2Command(),Event.RESULT); + taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); + } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index dcba83271c..a22663e870 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -44,7 +44,7 @@ public class TaskResponseServiceTest { @Test public void testAdd(){ TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXEUTION, new Date(), - "", "", "", 1); + "", "", "", 1,null); taskResponseService.addResponse(taskResponseEvent); Assert.assertTrue(taskResponseService.getEventQueue().size() == 1); try { @@ -58,7 +58,7 @@ public class TaskResponseServiceTest { @Test public void testStop(){ TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXEUTION, new Date(), - "", "", "", 1); + "", "", "", 1,null); taskResponseService.addResponse(taskResponseEvent); taskResponseService.stop(); Assert.assertTrue(taskResponseService.getEventQueue().size() == 0);