From f8a44ff7192f7dc998f7568b229b48a2bf7ee769 Mon Sep 17 00:00:00 2001 From: Gallardot Date: Thu, 21 Dec 2023 18:54:44 +0800 Subject: [PATCH] [Bug][Master] send ACK event timeout (#15346) --- README.md | 5 ++--- README_zh_CN.md | 4 ++-- .../master/event/TaskResultEventHandler.java | 22 ++++++++++++++----- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index d30b55d95d..d6ce0e2f9a 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html) ![codecov](https://codecov.io/gh/apache/dolphinscheduler/branch/dev/graph/badge.svg) [![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=apache-dolphinscheduler&metric=alert_status)](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler) -[![Twitter Follow](https://img.shields.io/twitter/follow/dolphinschedule.svg?style=social&label=Follow)](https://twitter.com/dolphinschedule) +[![Twitter Follow](https://img.shields.io/twitter/follow/dolphinschedule.svg?style=social&label=Follow)](https://twitter.com/dolphinschedule) [![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://s.apache.org/dolphinscheduler-slack) [![CN doc](https://img.shields.io/badge/文档-中文版-blue.svg)](README_zh_CN.md) @@ -65,7 +65,7 @@ find the good first issue in [here](https://github.com/apache/dolphinscheduler/c Welcome to join the Apache DolphinScheduler community by: - Join the [DolphinScheduler Slack](https://s.apache.org/dolphinscheduler-slack) to keep in touch with the community -- Follow the [DolphinScheduler Twitter](https://twitter.com/dolphinschedule) and get the latest news +- Follow the [DolphinScheduler Twitter](https://twitter.com/dolphinschedule) and get the latest news - Subscribe DolphinScheduler mail list, users@dolphinscheduler.apache.org for user and dev@dolphinscheduler.apache.org for developer # Landscapes @@ -75,5 +75,4 @@ Welcome to join the Apache DolphinScheduler community by:   

DolphinScheduler enriches the CNCF CLOUD NATIVE Landscape. -

diff --git a/README_zh_CN.md b/README_zh_CN.md index 2e955811e2..dcadcc6427 100644 --- a/README_zh_CN.md +++ b/README_zh_CN.md @@ -3,7 +3,7 @@ [![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html) [![codecov](https://codecov.io/gh/apache/dolphinscheduler/branch/dev/graph/badge.svg)]() [![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=apache-dolphinscheduler&metric=alert_status)](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler) -[![Twitter Follow](https://img.shields.io/twitter/follow/dolphinschedule.svg?style=social&label=Follow)](https://twitter.com/dolphinschedule) +[![Twitter Follow](https://img.shields.io/twitter/follow/dolphinschedule.svg?style=social&label=Follow)](https://twitter.com/dolphinschedule) [![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://s.apache.org/dolphinscheduler-slack) [![EN doc](https://img.shields.io/badge/document-English-blue.svg)](README.md) @@ -61,7 +61,7 @@ DolphinScheduler 的主要特性如下: 欢迎通过以方式加入社区: - 加入 [DolphinScheduler Slack](https://s.apache.org/dolphinscheduler-slack) -- 关注 [DolphinScheduler Twitter](https://twitter.com/dolphinschedule) 来获取最新消息 +- 关注 [DolphinScheduler Twitter](https://twitter.com/dolphinschedule) 来获取最新消息 - 订阅 DolphinScheduler 邮件列表, 用户订阅 users@dolphinscheduler.apache.org 开发者请订阅 dev@dolphinscheduler.apache.org # Landscapes diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java index 854812264f..f3d3a7480a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java @@ -35,10 +35,13 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.Optional; +import lombok.extern.slf4j.Slf4j; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component +@Slf4j public class TaskResultEventHandler implements TaskEventHandler { @Autowired @@ -99,11 +102,13 @@ public class TaskResultEventHandler implements TaskEventHandler { taskInstance.setVarPool(taskEvent.getVarPool()); processService.changeOutParam(taskInstance); taskInstanceDao.updateById(taskInstance); - sendAckToWorker(taskEvent); } catch (Exception ex) { TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance); throw new TaskEventHandleError("Handle task result event error, save taskInstance to db error", ex); } + + sendAckToWorker(taskEvent); + TaskStateEvent stateEvent = TaskStateEvent.builder() .processInstanceId(taskEvent.getProcessInstanceId()) .taskInstanceId(taskEvent.getTaskInstanceId()) @@ -115,11 +120,16 @@ public class TaskResultEventHandler implements TaskEventHandler { } public void sendAckToWorker(TaskEvent taskEvent) { - ITaskInstanceExecutionEventAckListener instanceExecutionEventAckListener = - SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(taskEvent.getWorkerAddress(), ITaskInstanceExecutionEventAckListener.class); - instanceExecutionEventAckListener.handleTaskInstanceExecutionFinishEventAck( - TaskInstanceExecutionFinishEventAck.success(taskEvent.getTaskInstanceId())); + try { + ITaskInstanceExecutionEventAckListener instanceExecutionEventAckListener = + SingletonJdkDynamicRpcClientProxyFactory + .getProxyClient(taskEvent.getWorkerAddress(), ITaskInstanceExecutionEventAckListener.class); + instanceExecutionEventAckListener.handleTaskInstanceExecutionFinishEventAck( + TaskInstanceExecutionFinishEventAck.success(taskEvent.getTaskInstanceId())); + } catch (Exception e) { + // master ignore the exception, worker will retry to send this TaskEventType.RESULT event again. + log.warn("send ack to worker error, taskInstanceId: {}", taskEvent.getTaskInstanceId(), e); + } } @Override