From 9d0c816cee102edbba2ac080f483c8a73a0b7b30 Mon Sep 17 00:00:00 2001 From: ruanwenjun <861923274@qq.com> Date: Tue, 25 May 2021 04:53:56 +0800 Subject: [PATCH] [Bug][worker] task throw ConcurrentModifiedException #5528 (#5530) * [Bug][worker] task throw ConcurrentModifiedException #5528 * fix code smell --- .../server/worker/task/AbstractTask.java | 7 +++++- .../worker/task/sqoop/SqoopTaskTest.java | 25 ++++++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index 7454f49ae5..45b94d2628 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import java.util.List; +import java.util.StringJoiner; import org.slf4j.Logger; @@ -130,7 +131,11 @@ public abstract class AbstractTask { if (logs.contains(FINALIZE_SESSION_MARKER.toString())) { logger.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString()); } else { - logger.info(" -> {}", String.join("\n\t", logs)); + // note: if the logs is a SynchronizedList and will be modified concurrently, + // we should must use foreach to iterate the element, otherwise will throw a ConcurrentModifiedException(#issue 5528) + StringJoiner joiner = new StringJoiner("\n\t"); + logs.forEach(joiner::add); + logger.info(" -> {}", joiner); } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java index 5787907d60..222c35593a 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java @@ -25,7 +25,10 @@ import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGe import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.util.ArrayList; +import java.util.Collections; import java.util.Date; +import java.util.List; import org.junit.Assert; import org.junit.Before; @@ -37,7 +40,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; - /** * sqoop task test */ @@ -201,4 +203,25 @@ public class SqoopTaskTest { } } + @Test + public void testLogHandler() throws InterruptedException { + List list = Collections.synchronizedList(new ArrayList<>()); + Thread thread1 = new Thread(() -> { + for (int i = 0; i < 10; i++) { + list.add("test add log"); + } + }); + Thread thread2 = new Thread(() -> { + for (int i = 0; i < 10; i++) { + sqoopTask.logHandle(list); + } + }); + thread1.start(); + thread2.start(); + thread1.join(); + thread2.join(); + // if no exception throw, assert true + Assert.assertTrue(true); + } + }