Browse Source

[fix][Service] BusinessTime should format with schedule timezone (#9714)

* BusinessTime should format with schedule timezone

* fix test error

* fix test error

* fix test error
3.0.0/version-upgrade
gaojun2048 3 years ago committed by GitHub
parent
commit
ebc4253d50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  2. 5
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  3. 5
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  4. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java
  5. 8
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java
  6. 16
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java
  7. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  8. 4
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java
  9. 28
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  10. 4
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

27
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -59,6 +59,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
@ -149,6 +150,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private ScheduleMapper scheduleMapper;
/**
* return top n SUCCESS process instance order by running time which started between startTime and endTime
*/
@ -472,7 +476,17 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processInstance.getName(), processInstance.getState().toString(), "update");
return result;
}
setProcessInstance(processInstance, tenantCode, scheduleTime, globalParams, timeout);
//
Map<String, String> commandParamMap = JSONUtils.toMap(processInstance.getCommandParam());
String timezoneId = null;
if (commandParamMap == null || StringUtils.isBlank(commandParamMap.get(Constants.SCHEDULE_TIMEZONE))) {
timezoneId = loginUser.getTimeZone();
} else {
timezoneId = commandParamMap.get(Constants.SCHEDULE_TIMEZONE);
}
setProcessInstance(processInstance, tenantCode, scheduleTime, globalParams, timeout, timezoneId);
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
if (taskDefinitionLogs.isEmpty()) {
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
@ -538,7 +552,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
/**
* update process instance attributes
*/
private void setProcessInstance(ProcessInstance processInstance, String tenantCode, String scheduleTime, String globalParams, int timeout) {
private void setProcessInstance(ProcessInstance processInstance, String tenantCode, String scheduleTime, String globalParams, int timeout, String timezone) {
Date schedule = processInstance.getScheduleTime();
if (scheduleTime != null) {
schedule = DateUtils.getScheduleDate(scheduleTime);
@ -546,7 +560,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processInstance.setScheduleTime(schedule);
List<Property> globalParamList = JSONUtils.toList(globalParams, Property.class);
Map<String, String> globalParamMap = globalParamList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, processInstance.getCmdTypeIfComplement(), schedule);
globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, processInstance.getCmdTypeIfComplement(), schedule, timezone);
processInstance.setTimeout(timeout);
processInstance.setTenantCode(tenantCode);
processInstance.setGlobalParams(globalParams);
@ -672,9 +686,14 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result;
}
Map<String, String> commandParam = JSONUtils.toMap(processInstance.getCommandParam());
String timezone = null;
if (commandParam != null) {
timezone = commandParam.get(Constants.SCHEDULE_TIMEZONE);
}
Map<String, String> timeParams = BusinessTimeUtils
.getBusinessTime(processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
processInstance.getScheduleTime(), timezone);
String userDefinedParams = processInstance.getGlobalParams();
// global params
List<Property> globalParams = new ArrayList<>();

5
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -48,6 +48,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
@ -121,6 +122,10 @@ public class ProcessInstanceServiceTest {
@Mock
TaskPluginManager taskPluginManager;
@Mock
ScheduleMapper scheduleMapper;
private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
+ "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";

5
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -817,4 +817,9 @@ public final class Constants {
public static final String LIMITS_CPU = "limitsCpu";
public static final String LIMITS_MEMORY = "limitsMemory";
public static final String K8S_LOCAL_TEST_CLUSTER = "ds_null_k8s";
/**
* schedule timezone
*/
public static final String SCHEDULE_TIMEZONE = "schedule_timezone";
}

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java

@ -88,7 +88,7 @@ public class ParameterUtils {
* @return curing user define parameters
*/
public static String curingGlobalParams(Map<String, String> globalParamMap, List<Property> globalParamList,
CommandType commandType, Date scheduleTime) {
CommandType commandType, Date scheduleTime, String timezone) {
if (globalParamList == null || globalParamList.isEmpty()) {
return null;
@ -101,7 +101,7 @@ public class ParameterUtils {
Map<String, String> allParamMap = new HashMap<>();
//If it is a complement, a complement time needs to be passed in, according to the task type
Map<String, String> timeParams = BusinessTimeUtils.
getBusinessTime(commandType, scheduleTime);
getBusinessTime(commandType, scheduleTime, timezone);
if (timeParams != null) {
allParamMap.putAll(timeParams);

8
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java

@ -45,7 +45,7 @@ public class BusinessTimeUtils {
* @param runTime run time or schedule time
* @return business time
*/
public static Map<String, String> getBusinessTime(CommandType commandType, Date runTime) {
public static Map<String, String> getBusinessTime(CommandType commandType, Date runTime, String timezone) {
Date businessDate = runTime;
Map<String, String> result = new HashMap<>();
switch (commandType) {
@ -71,9 +71,9 @@ public class BusinessTimeUtils {
break;
}
Date businessCurrentDate = addDays(businessDate, 1);
result.put(Constants.PARAMETER_CURRENT_DATE, format(businessCurrentDate, PARAMETER_FORMAT_DATE, null));
result.put(Constants.PARAMETER_BUSINESS_DATE, format(businessDate, PARAMETER_FORMAT_DATE, null));
result.put(Constants.PARAMETER_DATETIME, format(businessCurrentDate, PARAMETER_FORMAT_TIME, null));
result.put(Constants.PARAMETER_CURRENT_DATE, format(businessCurrentDate, PARAMETER_FORMAT_DATE, timezone));
result.put(Constants.PARAMETER_BUSINESS_DATE, format(businessDate, PARAMETER_FORMAT_DATE, timezone));
result.put(Constants.PARAMETER_DATETIME, format(businessCurrentDate, PARAMETER_FORMAT_TIME, timezone));
return result;
}
}

16
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java

@ -100,22 +100,22 @@ public class ParameterUtilsTest {
Date scheduleTime = DateUtils.stringToDate("2019-12-20 00:00:00");
//test globalParamList is null
String result = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
String result = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertNull(result);
Assert.assertNull(ParameterUtils.curingGlobalParams(null, null, CommandType.START_CURRENT_TASK_PROCESS, null));
Assert.assertNull(ParameterUtils.curingGlobalParams(globalParamMap, null, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime));
Assert.assertNull(ParameterUtils.curingGlobalParams(null, null, CommandType.START_CURRENT_TASK_PROCESS, null, null));
Assert.assertNull(ParameterUtils.curingGlobalParams(globalParamMap, null, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null));
//test globalParamList is not null
Property property = new Property("testGlobalParam", Direct.IN, DataType.VARCHAR, "testGlobalParam");
globalParamList.add(property);
String result2 = ParameterUtils.curingGlobalParams(null, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
String result2 = ParameterUtils.curingGlobalParams(null, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertEquals(result2, JSONUtils.toJsonString(globalParamList));
String result3 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, null);
String result3 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, null, null);
Assert.assertEquals(result3, JSONUtils.toJsonString(globalParamList));
String result4 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
String result4 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertEquals(result4, JSONUtils.toJsonString(globalParamList));
//test var $ startsWith
@ -130,7 +130,7 @@ public class ParameterUtilsTest {
globalParamList.add(property3);
globalParamList.add(property4);
String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertEquals(result5, JSONUtils.toJsonString(globalParamList));
Property testStartParamProperty = new Property("testStartParam", Direct.IN, DataType.VARCHAR, "");
@ -150,7 +150,7 @@ public class ParameterUtilsTest {
}
}
String result6 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
String result6 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertTrue(result6.contains("20191220"));
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -948,7 +948,7 @@ public class WorkflowExecuteThread {
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE)));
processService.updateProcessInstance(processInstance);
}
}

4
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java

@ -43,7 +43,7 @@ public class ParamsTest {
// start process
Map<String,String> timeParams = BusinessTimeUtils
.getBusinessTime(CommandType.START_PROCESS,
new Date());
new Date(), null);
command = ParameterUtils.convertParameterPlaceholders(command, timeParams);
@ -57,7 +57,7 @@ public class ParamsTest {
// complement data
timeParams = BusinessTimeUtils
.getBusinessTime(CommandType.COMPLEMENT_DATA,
calendar.getTime());
calendar.getTime(), null);
command = ParameterUtils.convertParameterPlaceholders(command, timeParams);
logger.info("complement data : {}",command);

28
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -399,6 +399,13 @@ public class ProcessServiceImpl implements ProcessService {
public int createCommand(Command command) {
int result = 0;
if (command != null) {
// add command timezone
Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(command.getProcessDefinitionCode());
Map<String, String> commandParams = JSONUtils.toMap(command.getCommandParam());
if (commandParams != null && schedule != null) {
commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId());
command.setCommandParam(JSONUtils.toJsonString(commandParams));
}
result = commandMapper.insert(command);
}
return result;
@ -771,11 +778,17 @@ public class ProcessServiceImpl implements ProcessService {
setGlobalParamIfCommanded(processDefinition, cmdParam);
// curing global params
Map<String, String> commandParamMap = JSONUtils.toMap(command.getCommandParam());
String timezoneId = null;
if (commandParamMap != null) {
timezoneId = commandParamMap.get(Constants.SCHEDULE_TIMEZONE);
}
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
getCommandTypeIfComplement(processInstance, command),
processInstance.getScheduleTime()));
processInstance.getScheduleTime(), timezoneId));
// set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
@ -909,12 +922,15 @@ public class ProcessServiceImpl implements ProcessService {
setGlobalParamIfCommanded(processDefinition, cmdParam);
}
// time zone
String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
// Recalculate global parameters after rerun.
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
commandTypeIfComplement,
processInstance.getScheduleTime()));
processInstance.getScheduleTime(), timezoneId));
processInstance.setProcessDefinition(processDefinition);
}
//reset command parameter
@ -1092,10 +1108,14 @@ public class ProcessServiceImpl implements ProcessService {
&& Flag.NO == processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(complementDate.get(0));
}
// time zone
String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), timezoneId));
}
/**
@ -1181,7 +1201,7 @@ public class ProcessServiceImpl implements ProcessService {
private String joinVarPool(String parentValPool, String subValPool) {
List<Property> parentValPools = Lists.newArrayList(JSONUtils.toList(parentValPool, Property.class));
parentValPools = parentValPools.stream().filter(valPool -> valPool.getDirect() == Direct.OUT).collect(Collectors.toList());
List<Property> subValPools = Lists.newArrayList(JSONUtils.toList(subValPool, Property.class));
Set<String> parentValPoolKeys = parentValPools.stream().map(Property::getProp).collect(toSet());

4
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -58,6 +58,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
@ -155,6 +156,9 @@ public class ProcessServiceTest {
@Mock
private DqComparisonTypeMapper dqComparisonTypeMapper;
@Mock
private ScheduleMapper scheduleMapper;
@Test
public void testCreateSubCommand() {
ProcessInstance parentInstance = new ProcessInstance();

Loading…
Cancel
Save