Browse Source

[Bug][worker] task throw ConcurrentModifiedException #5528 (#5530)

* [Bug][worker] task throw ConcurrentModifiedException #5528

* fix code smell
pull/3/MERGE
ruanwenjun 3 years ago committed by GitHub
parent
commit
9d0c816cee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
  2. 25
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java

7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java

@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import java.util.List;
import java.util.StringJoiner;
import org.slf4j.Logger;
@ -130,7 +131,11 @@ public abstract class AbstractTask {
if (logs.contains(FINALIZE_SESSION_MARKER.toString())) {
logger.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString());
} else {
logger.info(" -> {}", String.join("\n\t", logs));
// note: if the logs is a SynchronizedList and will be modified concurrently,
// we should must use foreach to iterate the element, otherwise will throw a ConcurrentModifiedException(#issue 5528)
StringJoiner joiner = new StringJoiner("\n\t");
logs.forEach(joiner::add);
logger.info(" -> {}", joiner);
}
}

25
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java

@ -25,7 +25,10 @@ import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGe
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
@ -37,7 +40,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
/**
* sqoop task test
*/
@ -201,4 +203,25 @@ public class SqoopTaskTest {
}
}
@Test
public void testLogHandler() throws InterruptedException {
List<String> list = Collections.synchronizedList(new ArrayList<>());
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
list.add("test add log");
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
sqoopTask.logHandle(list);
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
// if no exception throw, assert true
Assert.assertTrue(true);
}
}

Loading…
Cancel
Save