diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index c91bcfd217..ae570bc725 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -59,6 +59,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; @@ -149,6 +150,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce @Autowired private TaskPluginManager taskPluginManager; + @Autowired + private ScheduleMapper scheduleMapper; + /** * return top n SUCCESS process instance order by running time which started between startTime and endTime */ @@ -472,7 +476,17 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce processInstance.getName(), processInstance.getState().toString(), "update"); return result; } - setProcessInstance(processInstance, tenantCode, scheduleTime, globalParams, timeout); + + // + Map commandParamMap = JSONUtils.toMap(processInstance.getCommandParam()); + String timezoneId = null; + if (commandParamMap == null || StringUtils.isBlank(commandParamMap.get(Constants.SCHEDULE_TIMEZONE))) { + timezoneId = loginUser.getTimeZone(); + } else { + timezoneId = commandParamMap.get(Constants.SCHEDULE_TIMEZONE); + } + + setProcessInstance(processInstance, tenantCode, scheduleTime, globalParams, timeout, timezoneId); List taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); if (taskDefinitionLogs.isEmpty()) { putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson); @@ -538,7 +552,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce /** * update process instance attributes */ - private void setProcessInstance(ProcessInstance processInstance, String tenantCode, String scheduleTime, String globalParams, int timeout) { + private void setProcessInstance(ProcessInstance processInstance, String tenantCode, String scheduleTime, String globalParams, int timeout, String timezone) { Date schedule = processInstance.getScheduleTime(); if (scheduleTime != null) { schedule = DateUtils.getScheduleDate(scheduleTime); @@ -546,7 +560,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce processInstance.setScheduleTime(schedule); List globalParamList = JSONUtils.toList(globalParams, Property.class); Map globalParamMap = globalParamList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); - globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, processInstance.getCmdTypeIfComplement(), schedule); + globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, processInstance.getCmdTypeIfComplement(), schedule, timezone); processInstance.setTimeout(timeout); processInstance.setTenantCode(tenantCode); processInstance.setGlobalParams(globalParams); @@ -672,9 +686,14 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } + Map commandParam = JSONUtils.toMap(processInstance.getCommandParam()); + String timezone = null; + if (commandParam != null) { + timezone = commandParam.get(Constants.SCHEDULE_TIMEZONE); + } Map timeParams = BusinessTimeUtils .getBusinessTime(processInstance.getCmdTypeIfComplement(), - processInstance.getScheduleTime()); + processInstance.getScheduleTime(), timezone); String userDefinedParams = processInstance.getGlobalParams(); // global params List globalParams = new ArrayList<>(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index 53337524db..eb7977d860 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -48,6 +48,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; @@ -121,6 +122,10 @@ public class ProcessInstanceServiceTest { @Mock TaskPluginManager taskPluginManager; + @Mock + ScheduleMapper scheduleMapper; + + private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789," + "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789," + "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]"; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 3d87af5ea8..e26a6f0036 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -817,4 +817,9 @@ public final class Constants { public static final String LIMITS_CPU = "limitsCpu"; public static final String LIMITS_MEMORY = "limitsMemory"; public static final String K8S_LOCAL_TEST_CLUSTER = "ds_null_k8s"; + + /** + * schedule timezone + */ + public static final String SCHEDULE_TIMEZONE = "schedule_timezone"; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java index 66a59f1668..a1241022a8 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java @@ -88,7 +88,7 @@ public class ParameterUtils { * @return curing user define parameters */ public static String curingGlobalParams(Map globalParamMap, List globalParamList, - CommandType commandType, Date scheduleTime) { + CommandType commandType, Date scheduleTime, String timezone) { if (globalParamList == null || globalParamList.isEmpty()) { return null; @@ -101,7 +101,7 @@ public class ParameterUtils { Map allParamMap = new HashMap<>(); //If it is a complement, a complement time needs to be passed in, according to the task type Map timeParams = BusinessTimeUtils. - getBusinessTime(commandType, scheduleTime); + getBusinessTime(commandType, scheduleTime, timezone); if (timeParams != null) { allParamMap.putAll(timeParams); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java index 23de2d1753..0c41d11558 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java @@ -45,7 +45,7 @@ public class BusinessTimeUtils { * @param runTime run time or schedule time * @return business time */ - public static Map getBusinessTime(CommandType commandType, Date runTime) { + public static Map getBusinessTime(CommandType commandType, Date runTime, String timezone) { Date businessDate = runTime; Map result = new HashMap<>(); switch (commandType) { @@ -71,9 +71,9 @@ public class BusinessTimeUtils { break; } Date businessCurrentDate = addDays(businessDate, 1); - result.put(Constants.PARAMETER_CURRENT_DATE, format(businessCurrentDate, PARAMETER_FORMAT_DATE, null)); - result.put(Constants.PARAMETER_BUSINESS_DATE, format(businessDate, PARAMETER_FORMAT_DATE, null)); - result.put(Constants.PARAMETER_DATETIME, format(businessCurrentDate, PARAMETER_FORMAT_TIME, null)); + result.put(Constants.PARAMETER_CURRENT_DATE, format(businessCurrentDate, PARAMETER_FORMAT_DATE, timezone)); + result.put(Constants.PARAMETER_BUSINESS_DATE, format(businessDate, PARAMETER_FORMAT_DATE, timezone)); + result.put(Constants.PARAMETER_DATETIME, format(businessCurrentDate, PARAMETER_FORMAT_TIME, timezone)); return result; } } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java index aff76fb785..01a1eab9a7 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java @@ -100,22 +100,22 @@ public class ParameterUtilsTest { Date scheduleTime = DateUtils.stringToDate("2019-12-20 00:00:00"); //test globalParamList is null - String result = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime); + String result = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null); Assert.assertNull(result); - Assert.assertNull(ParameterUtils.curingGlobalParams(null, null, CommandType.START_CURRENT_TASK_PROCESS, null)); - Assert.assertNull(ParameterUtils.curingGlobalParams(globalParamMap, null, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime)); + Assert.assertNull(ParameterUtils.curingGlobalParams(null, null, CommandType.START_CURRENT_TASK_PROCESS, null, null)); + Assert.assertNull(ParameterUtils.curingGlobalParams(globalParamMap, null, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null)); //test globalParamList is not null Property property = new Property("testGlobalParam", Direct.IN, DataType.VARCHAR, "testGlobalParam"); globalParamList.add(property); - String result2 = ParameterUtils.curingGlobalParams(null, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime); + String result2 = ParameterUtils.curingGlobalParams(null, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null); Assert.assertEquals(result2, JSONUtils.toJsonString(globalParamList)); - String result3 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, null); + String result3 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, null, null); Assert.assertEquals(result3, JSONUtils.toJsonString(globalParamList)); - String result4 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime); + String result4 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null); Assert.assertEquals(result4, JSONUtils.toJsonString(globalParamList)); //test var $ startsWith @@ -130,7 +130,7 @@ public class ParameterUtilsTest { globalParamList.add(property3); globalParamList.add(property4); - String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime); + String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null); Assert.assertEquals(result5, JSONUtils.toJsonString(globalParamList)); Property testStartParamProperty = new Property("testStartParam", Direct.IN, DataType.VARCHAR, ""); @@ -150,7 +150,7 @@ public class ParameterUtilsTest { } } - String result6 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime); + String result6 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null); Assert.assertTrue(result6.contains("20191220")); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index c22d5811c3..979de3feb4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -948,7 +948,7 @@ public class WorkflowExecuteThread { processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamList(), - CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); + CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE))); processService.updateProcessInstance(processInstance); } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java index c3fa0fc547..904896bd9f 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java @@ -43,7 +43,7 @@ public class ParamsTest { // start process Map timeParams = BusinessTimeUtils .getBusinessTime(CommandType.START_PROCESS, - new Date()); + new Date(), null); command = ParameterUtils.convertParameterPlaceholders(command, timeParams); @@ -57,7 +57,7 @@ public class ParamsTest { // complement data timeParams = BusinessTimeUtils .getBusinessTime(CommandType.COMPLEMENT_DATA, - calendar.getTime()); + calendar.getTime(), null); command = ParameterUtils.convertParameterPlaceholders(command, timeParams); logger.info("complement data : {}",command); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index c899a7baf5..bbf1e2b788 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -399,6 +399,13 @@ public class ProcessServiceImpl implements ProcessService { public int createCommand(Command command) { int result = 0; if (command != null) { + // add command timezone + Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(command.getProcessDefinitionCode()); + Map commandParams = JSONUtils.toMap(command.getCommandParam()); + if (commandParams != null && schedule != null) { + commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId()); + command.setCommandParam(JSONUtils.toJsonString(commandParams)); + } result = commandMapper.insert(command); } return result; @@ -771,11 +778,17 @@ public class ProcessServiceImpl implements ProcessService { setGlobalParamIfCommanded(processDefinition, cmdParam); // curing global params + Map commandParamMap = JSONUtils.toMap(command.getCommandParam()); + String timezoneId = null; + if (commandParamMap != null) { + timezoneId = commandParamMap.get(Constants.SCHEDULE_TIMEZONE); + } + processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamList(), getCommandTypeIfComplement(processInstance, command), - processInstance.getScheduleTime())); + processInstance.getScheduleTime(), timezoneId)); // set process instance priority processInstance.setProcessInstancePriority(command.getProcessInstancePriority()); @@ -909,12 +922,15 @@ public class ProcessServiceImpl implements ProcessService { setGlobalParamIfCommanded(processDefinition, cmdParam); } + // time zone + String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE); + // Recalculate global parameters after rerun. processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamList(), commandTypeIfComplement, - processInstance.getScheduleTime())); + processInstance.getScheduleTime(), timezoneId)); processInstance.setProcessDefinition(processDefinition); } //reset command parameter @@ -1092,10 +1108,14 @@ public class ProcessServiceImpl implements ProcessService { && Flag.NO == processInstance.getIsSubProcess()) { processInstance.setScheduleTime(complementDate.get(0)); } + + // time zone + String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE); + processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamList(), - CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); + CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), timezoneId)); } /** @@ -1181,7 +1201,7 @@ public class ProcessServiceImpl implements ProcessService { private String joinVarPool(String parentValPool, String subValPool) { List parentValPools = Lists.newArrayList(JSONUtils.toList(parentValPool, Property.class)); parentValPools = parentValPools.stream().filter(valPool -> valPool.getDirect() == Direct.OUT).collect(Collectors.toList()); - + List subValPools = Lists.newArrayList(JSONUtils.toList(subValPool, Property.class)); Set parentValPoolKeys = parentValPools.stream().map(Property::getProp).collect(toSet()); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index e8abd0692e..22575681e1 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -58,6 +58,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; +import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper; @@ -155,6 +156,9 @@ public class ProcessServiceTest { @Mock private DqComparisonTypeMapper dqComparisonTypeMapper; + @Mock + private ScheduleMapper scheduleMapper; + @Test public void testCreateSubCommand() { ProcessInstance parentInstance = new ProcessInstance();