|
|
@ -27,14 +27,21 @@ public class PageReadListener<T> implements ReadListener<T> { |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
private final Consumer<List<T>> consumer; |
|
|
|
private final Consumer<List<T>> consumer; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final int pageSize; |
|
|
|
|
|
|
|
|
|
|
|
public PageReadListener(Consumer<List<T>> consumer) { |
|
|
|
public PageReadListener(Consumer<List<T>> consumer) { |
|
|
|
|
|
|
|
this(consumer, BATCH_COUNT); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public PageReadListener(Consumer<List<T>> consumer,int pageSize) { |
|
|
|
this.consumer = consumer; |
|
|
|
this.consumer = consumer; |
|
|
|
|
|
|
|
this.pageSize = pageSize; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void invoke(T data, AnalysisContext context) { |
|
|
|
public void invoke(T data, AnalysisContext context) { |
|
|
|
cachedDataList.add(data); |
|
|
|
cachedDataList.add(data); |
|
|
|
if (cachedDataList.size() >= BATCH_COUNT) { |
|
|
|
if (cachedDataList.size() >= pageSize) { |
|
|
|
consumer.accept(cachedDataList); |
|
|
|
consumer.accept(cachedDataList); |
|
|
|
cachedDataList = ListUtils.newArrayListWithExpectedSize(BATCH_COUNT); |
|
|
|
cachedDataList = ListUtils.newArrayListWithExpectedSize(BATCH_COUNT); |
|
|
|
} |
|
|
|
} |
|
|
|