Browse Source

[Feature-7576][Master] Optimize complement task's date (#7585)

* feature_7576

* feature_7576

* feature 7576

* feature 7576
3.0.0/version-upgrade
xiangzihao 3 years ago committed by GitHub
parent
commit
3d9d91ccc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  2. 37
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
  3. 4
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java

39
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -621,6 +621,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam()); Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
switch (runMode) { switch (runMode) {
case RUN_MODE_SERIAL: { case RUN_MODE_SERIAL: {
if (start.after(end)) {
logger.warn("The startDate {} is later than the endDate {}", start, end);
break;
}
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end)); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setCommandParam(JSONUtils.toJsonString(cmdParam));
@ -628,26 +632,45 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
break; break;
} }
case RUN_MODE_PARALLEL: { case RUN_MODE_PARALLEL: {
if (start.after(end)) {
logger.warn("The startDate {} is later than the endDate {}", start, end);
break;
}
LinkedList<Date> listDate = new LinkedList<>(); LinkedList<Date> listDate = new LinkedList<>();
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode()); List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
listDate.addAll(CronUtils.getSelfFireDateList(start, end, schedules)); listDate.addAll(CronUtils.getSelfFireDateList(start, end, schedules));
int listDateSize = listDate.size();
createCount = listDate.size(); createCount = listDate.size();
if (!CollectionUtils.isEmpty(listDate)) { if (!CollectionUtils.isEmpty(listDate)) {
if (expectedParallelismNumber != null && expectedParallelismNumber != 0) { if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
createCount = Math.min(listDate.size(), expectedParallelismNumber); createCount = Math.min(listDate.size(), expectedParallelismNumber);
if (listDateSize < createCount) {
createCount = listDateSize;
}
} }
logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount); logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
listDate.addLast(end); // Distribute the number of tasks equally to each command.
int chunkSize = listDate.size() / createCount; // The last command with insufficient quantity will be assigned to the remaining tasks.
int itemsPerCommand = (listDateSize / createCount);
int remainingItems = (listDateSize % createCount);
int startDateIndex = 0;
int endDateIndex = 0;
for (int i = 1; i <= createCount; i++) {
int extra = (i <= remainingItems) ? 1 : 0;
int singleCommandItems = (itemsPerCommand + extra);
for (int i = 0; i < createCount; i++) { if (i == 1) {
int rangeStart = i == 0 ? i : (i * chunkSize); endDateIndex += singleCommandItems - 1;
int rangeEnd = i == createCount - 1 ? listDate.size() - 1 } else {
: rangeStart + chunkSize; startDateIndex = endDateIndex + 1;
endDateIndex += singleCommandItems;
}
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(rangeStart))); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(startDateIndex)));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(rangeEnd))); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(endDateIndex)));
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setCommandParam(JSONUtils.toJsonString(cmdParam));
processService.createCommand(command); processService.createCommand(command);
} }

37
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -32,6 +34,8 @@ import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode; import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -340,22 +344,35 @@ public class ExecutorServiceTest {
listDate.add(1); listDate.add(1);
listDate.add(2); listDate.add(2);
listDate.add(3); listDate.add(3);
listDate.add(4);
int listDateSize = listDate.size();
int createCount = Math.min(listDate.size(), expectedParallelismNumber); int createCount = Math.min(listDate.size(), expectedParallelismNumber);
logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount); logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
listDate.addLast(4); int itemsPerCommand = (listDateSize / createCount);
int chunkSize = listDate.size() / createCount; int remainingItems = (listDateSize % createCount);
for (int i = 0; i < createCount; i++) { int startDateIndex = 0;
int rangeStart = i == 0 ? i : (i * chunkSize); int endDateIndex = 0;
int rangeEnd = i == createCount - 1 ? listDate.size() - 1 : rangeStart + chunkSize;
logger.info("rangeStart:{}, rangeEnd:{}",rangeStart, rangeEnd); for (int i = 1; i <= createCount; i++) {
result.add(listDate.get(rangeStart) + "," + listDate.get(rangeEnd)); int extra = (i <= remainingItems) ? 1 : 0;
int singleCommandItems = (itemsPerCommand + extra);
if (i == 1) {
endDateIndex += singleCommandItems - 1;
} else {
startDateIndex = endDateIndex + 1;
endDateIndex += singleCommandItems;
} }
Assert.assertEquals("0,1", result.get(0)); logger.info("startDate:{}, endDate:{}", listDate.get(startDateIndex), listDate.get(endDateIndex));
Assert.assertEquals("1,2", result.get(1)); result.add(listDate.get(startDateIndex) + "," + listDate.get(endDateIndex));
Assert.assertEquals("2,4", result.get(2)); }
Assert.assertEquals("0,1", result.get(0));
Assert.assertEquals("2,3", result.get(1));
Assert.assertEquals("4,4", result.get(2));
} }
} }

4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java

@ -191,9 +191,9 @@ public class CronUtils {
return result; return result;
} }
// support left closed and right open time interval (startDate <= N < endDate) // support left closed and right closed time interval (startDate <= N <= endDate)
Date from = new Date(startTime.getTime() - Constants.SECOND_TIME_MILLIS); Date from = new Date(startTime.getTime() - Constants.SECOND_TIME_MILLIS);
Date to = new Date(endTime.getTime() - Constants.SECOND_TIME_MILLIS); Date to = new Date(endTime.getTime() + Constants.SECOND_TIME_MILLIS);
List<Schedule> listSchedule = new ArrayList<>(); List<Schedule> listSchedule = new ArrayList<>();
listSchedule.addAll(schedules); listSchedule.addAll(schedules);

Loading…
Cancel
Save