From a4ee351a3af1b05bafab48699f846e2b3ac226eb Mon Sep 17 00:00:00 2001 From: vanilla111 <1115690319@qq.com> Date: Wed, 19 Aug 2020 11:36:30 +0800 Subject: [PATCH] delay execution of tasks and improve some designs (#3427) --- .../api/dto/TaskCountDto.java | 24 +- .../dolphinscheduler/common/Constants.java | 17 +- .../common/enums/ExecutionStatus.java | 109 +++-- .../common/enums/TaskStateType.java | 10 +- .../common/model/TaskNode.java | 50 +- .../common/utils/DateUtils.java | 444 +++++++++++++++++- .../common/enums/ExecutionStatusTest.java | 32 ++ .../dao/entity/TaskInstance.java | 127 +++-- .../remote/codec/NettyDecoder.java | 1 + .../builder/TaskExecutionContextBuilder.java | 2 + .../server/entity/TaskExecutionContext.java | 171 ++++--- .../runner/ConditionsTaskExecThread.java | 1 + .../runner/DependentTaskExecThread.java | 1 + .../runner/MasterBaseTaskExecThread.java | 21 +- .../master/runner/MasterExecThread.java | 54 ++- .../master/runner/MasterTaskExecThread.java | 17 +- .../processor/TaskExecuteProcessor.java | 39 +- .../worker/runner/TaskExecuteThread.java | 98 +++- .../worker/task/AbstractCommandExecutor.java | 22 +- .../TaskPriorityQueueConsumerTest.java | 3 +- .../dispatch/ExecutorDispatcherTest.java | 3 +- .../executor/NettyExecutorManagerTest.java | 3 +- .../host/RoundRobinHostManagerTest.java | 3 +- .../queue/TaskResponseServiceTest.java | 4 +- .../master/registry/MasterRegistryTest.java | 4 +- .../worker/runner/TaskExecuteThreadTest.java | 171 +++++++ .../service/process/ProcessService.java | 98 +++- pom.xml | 2 + sql/dolphinscheduler-postgre.sql | 4 +- sql/dolphinscheduler_mysql.sql | 2 + .../mysql/dolphinscheduler_ddl.sql | 1 - .../mysql/dolphinscheduler_ddl.sql | 58 +++ .../mysql/dolphinscheduler_dml.sql | 16 + .../postgresql/dolphinscheduler_ddl.sql | 52 ++ .../postgresql/dolphinscheduler_dml.sql | 16 + 35 files changed, 1383 insertions(+), 297 deletions(-) create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/enums/ExecutionStatusTest.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java create mode 100644 sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql create mode 100644 sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_dml.sql create mode 100644 sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql create mode 100644 sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_dml.sql diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java index fa7588f2ed..35aaaf34dd 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java @@ -42,9 +42,10 @@ public class TaskCountDto { countTaskDtos(taskInstanceStateCounts); } - private void countTaskDtos(List taskInstanceStateCounts){ + private void countTaskDtos(List taskInstanceStateCounts) { int submittedSuccess = 0; - int runningExeution = 0; + int runningExecution = 0; + int delayExecution = 0; int readyPause = 0; int pause = 0; int readyStop = 0; @@ -55,15 +56,18 @@ public class TaskCountDto { int kill = 0; int waittingThread = 0; - for(ExecuteStatusCount taskInstanceStateCount : taskInstanceStateCounts){ + for (ExecuteStatusCount taskInstanceStateCount : taskInstanceStateCounts) { ExecutionStatus status = taskInstanceStateCount.getExecutionStatus(); totalCount += taskInstanceStateCount.getCount(); - switch (status){ + switch (status) { case SUBMITTED_SUCCESS: submittedSuccess += taskInstanceStateCount.getCount(); break; case RUNNING_EXECUTION: - runningExeution += taskInstanceStateCount.getCount(); + runningExecution += taskInstanceStateCount.getCount(); + break; + case DELAY_EXECUTION: + delayExecution += taskInstanceStateCount.getCount(); break; case READY_PAUSE: readyPause += taskInstanceStateCount.getCount(); @@ -93,13 +97,14 @@ public class TaskCountDto { waittingThread += taskInstanceStateCount.getCount(); break; - default: - break; + default: + break; } } this.taskCountDtos = new ArrayList<>(); this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.SUBMITTED_SUCCESS, submittedSuccess)); - this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.RUNNING_EXECUTION, runningExeution)); + this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.RUNNING_EXECUTION, runningExecution)); + this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.DELAY_EXECUTION, delayExecution)); this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.READY_PAUSE, readyPause)); this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.PAUSE, pause)); this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.READY_STOP, readyStop)); @@ -111,8 +116,7 @@ public class TaskCountDto { this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.WAITTING_THREAD, waittingThread)); } - - public List getTaskCountDtos(){ + public List getTaskCountDtos() { return taskCountDtos; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 072a67f44f..cee83e73bc 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -138,7 +138,7 @@ public final class Constants { /** * python home */ - public static final String PYTHON_HOME="PYTHON_HOME"; + public static final String PYTHON_HOME = "PYTHON_HOME"; /** * resource.view.suffixs @@ -366,7 +366,6 @@ public final class Constants { public static final double DEFAULT_WORKER_RESERVED_MEMORY = OSUtils.totalMemorySize() / 10; - /** * default log cache rows num,output when reach the number */ @@ -752,7 +751,7 @@ public final class Constants { /** - * preview schedule execute count + * preview schedule execute count */ public static final int PREVIEW_SCHEDULE_EXECUTE_COUNT = 5; @@ -832,6 +831,7 @@ public final class Constants { public static final int[] NOT_TERMINATED_STATES = new int[]{ ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), ExecutionStatus.RUNNING_EXECUTION.ordinal(), + ExecutionStatus.DELAY_EXECUTION.ordinal(), ExecutionStatus.READY_PAUSE.ordinal(), ExecutionStatus.READY_STOP.ordinal(), ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(), @@ -852,18 +852,17 @@ public final class Constants { /** * data total */ - public static final String COUNT = "count"; + public static final String COUNT = "count"; /** * page size */ - public static final String PAGE_SIZE = "pageSize"; + public static final String PAGE_SIZE = "pageSize"; /** * current page no */ - public static final String PAGE_NUMBER = "pageNo"; - + public static final String PAGE_NUMBER = "pageNo"; /** @@ -966,11 +965,11 @@ public final class Constants { /** * authorize writable perm */ - public static final int AUTHORIZE_WRITABLE_PERM=7; + public static final int AUTHORIZE_WRITABLE_PERM = 7; /** * authorize readable perm */ - public static final int AUTHORIZE_READABLE_PERM=4; + public static final int AUTHORIZE_READABLE_PERM = 4; /** diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java index 6ea02ef096..f6ac2cf5ab 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java @@ -14,16 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.common.enums; +import java.util.HashMap; import com.baomidou.mybatisplus.annotation.EnumValue; -import java.util.HashMap; - /** * running status for workflow and task nodes - * */ public enum ExecutionStatus { @@ -41,6 +40,7 @@ public enum ExecutionStatus { * 9 kill * 10 waiting thread * 11 waiting depend node complete + * 12 delay execution */ SUBMITTED_SUCCESS(0, "submit success"), RUNNING_EXECUTION(1, "running"), @@ -53,9 +53,10 @@ public enum ExecutionStatus { NEED_FAULT_TOLERANCE(8, "need fault tolerance"), KILL(9, "kill"), WAITTING_THREAD(10, "waiting thread"), - WAITTING_DEPEND(11, "waiting depend node complete"); + WAITTING_DEPEND(11, "waiting depend node complete"), + DELAY_EXECUTION(12, "delay execution"); - ExecutionStatus(int code, String descp){ + ExecutionStatus(int code, String descp) { this.code = code; this.descp = descp; } @@ -64,77 +65,85 @@ public enum ExecutionStatus { private final int code; private final String descp; - private static HashMap EXECUTION_STATUS_MAP=new HashMap<>(); + private static HashMap EXECUTION_STATUS_MAP = new HashMap<>(); static { - for (ExecutionStatus executionStatus:ExecutionStatus.values()){ - EXECUTION_STATUS_MAP.put(executionStatus.code,executionStatus); - } + for (ExecutionStatus executionStatus : ExecutionStatus.values()) { + EXECUTION_STATUS_MAP.put(executionStatus.code, executionStatus); + } } - /** - * status is success - * @return status - */ - public boolean typeIsSuccess(){ - return this == SUCCESS; - } - - /** - * status is failure - * @return status - */ - public boolean typeIsFailure(){ - return this == FAILURE || this == NEED_FAULT_TOLERANCE || this == KILL; - } - - /** - * status is finished - * @return status - */ - public boolean typeIsFinished(){ - - return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause() - || typeIsStop(); - } + /** + * status is success + * + * @return status + */ + public boolean typeIsSuccess() { + return this == SUCCESS; + } + + /** + * status is failure + * + * @return status + */ + public boolean typeIsFailure() { + return this == FAILURE || this == NEED_FAULT_TOLERANCE || this == KILL; + } + + /** + * status is finished + * + * @return status + */ + public boolean typeIsFinished() { + return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause() + || typeIsStop(); + } /** * status is waiting thread + * * @return status */ - public boolean typeIsWaitingThread(){ - return this == WAITTING_THREAD; - } + public boolean typeIsWaitingThread() { + return this == WAITTING_THREAD; + } /** * status is pause + * * @return status */ - public boolean typeIsPause(){ - return this == PAUSE; - } + public boolean typeIsPause() { + return this == PAUSE; + } + /** * status is pause + * * @return status */ - public boolean typeIsStop(){ + public boolean typeIsStop() { return this == STOP; } /** * status is running + * * @return status */ - public boolean typeIsRunning(){ - return this == RUNNING_EXECUTION || this == WAITTING_DEPEND; - } + public boolean typeIsRunning() { + return this == RUNNING_EXECUTION || this == WAITTING_DEPEND || this == DELAY_EXECUTION; + } /** * status is cancel + * * @return status */ - public boolean typeIsCancel(){ - return this == KILL || this == STOP ; + public boolean typeIsCancel() { + return this == KILL || this == STOP; } public int getCode() { @@ -145,10 +154,10 @@ public enum ExecutionStatus { return descp; } - public static ExecutionStatus of(int status){ - if(EXECUTION_STATUS_MAP.containsKey(status)){ - return EXECUTION_STATUS_MAP.get(status); - } + public static ExecutionStatus of(int status) { + if (EXECUTION_STATUS_MAP.containsKey(status)) { + return EXECUTION_STATUS_MAP.get(status); + } throw new IllegalArgumentException("invalid status : " + status); } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java index 11ab8560b7..36766a7f4d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java @@ -31,12 +31,13 @@ public enum TaskStateType { /** * convert task state to execute status integer array ; + * * @param taskStateType task state type * @return result of execution status */ - public static int[] convert2ExecutStatusIntArray(TaskStateType taskStateType){ + public static int[] convert2ExecutStatusIntArray(TaskStateType taskStateType) { - switch (taskStateType){ + switch (taskStateType) { case SUCCESS: return new int[]{ExecutionStatus.SUCCESS.ordinal()}; case FAILED: @@ -51,14 +52,15 @@ public enum TaskStateType { case RUNNING: return new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), ExecutionStatus.RUNNING_EXECUTION.ordinal(), + ExecutionStatus.DELAY_EXECUTION.ordinal(), ExecutionStatus.READY_PAUSE.ordinal(), ExecutionStatus.READY_STOP.ordinal()}; case WAITTING: return new int[]{ ExecutionStatus.SUBMITTED_SUCCESS.ordinal() }; - default: - break; + default: + break; } return new int[0]; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java index f794396457..cd3e573b16 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java @@ -136,6 +136,11 @@ public class TaskNode { @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) private String timeout; + /** + * delay execution time. + */ + private int delayTime; + public String getId() { return id; } @@ -310,24 +315,25 @@ public class TaskNode { @Override public String toString() { - return "TaskNode{" + - "id='" + id + '\'' + - ", name='" + name + '\'' + - ", desc='" + desc + '\'' + - ", type='" + type + '\'' + - ", runFlag='" + runFlag + '\'' + - ", loc='" + loc + '\'' + - ", maxRetryTimes=" + maxRetryTimes + - ", retryInterval=" + retryInterval + - ", params='" + params + '\'' + - ", preTasks='" + preTasks + '\'' + - ", extras='" + extras + '\'' + - ", depList=" + depList + - ", dependence='" + dependence + '\'' + - ", taskInstancePriority=" + taskInstancePriority + - ", timeout='" + timeout + '\'' + - ", workerGroup='" + workerGroup + '\'' + - '}'; + return "TaskNode{" + + "id='" + id + '\'' + + ", name='" + name + '\'' + + ", desc='" + desc + '\'' + + ", type='" + type + '\'' + + ", runFlag='" + runFlag + '\'' + + ", loc='" + loc + '\'' + + ", maxRetryTimes=" + maxRetryTimes + + ", retryInterval=" + retryInterval + + ", params='" + params + '\'' + + ", preTasks='" + preTasks + '\'' + + ", extras='" + extras + '\'' + + ", depList=" + depList + + ", dependence='" + dependence + '\'' + + ", taskInstancePriority=" + taskInstancePriority + + ", timeout='" + timeout + '\'' + + ", workerGroup='" + workerGroup + '\'' + + ", delayTime=" + delayTime + + '}'; } public String getWorkerGroup() { @@ -353,4 +359,12 @@ public class TaskNode { public void setWorkerGroupId(Integer workerGroupId) { this.workerGroupId = workerGroupId; } + + public int getDelayTime() { + return delayTime; + } + + public void setDelayTime(int delayTime) { + this.delayTime = delayTime; + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java index 1033816e8e..6cd1d5867e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java @@ -1 +1,443 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.common.utils; import org.apache.dolphinscheduler.common.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Calendar; import java.util.Date; /** * date utils */ public class DateUtils { private static final Logger logger = LoggerFactory.getLogger(DateUtils.class); /** * date to local datetime * * @param date date * @return local datetime */ private static LocalDateTime date2LocalDateTime(Date date) { return LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault()); } /** * local datetime to date * * @param localDateTime local datetime * @return date */ private static Date localDateTime2Date(LocalDateTime localDateTime) { Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant(); return Date.from(instant); } /** * get current date str * * @return date string */ public static String getCurrentTime() { return getCurrentTime(Constants.YYYY_MM_DD_HH_MM_SS); } /** * get the date string in the specified format of the current time * * @param format date format * @return date string */ public static String getCurrentTime(String format) { return LocalDateTime.now().format(DateTimeFormatter.ofPattern(format)); } /** * get the formatted date string * * @param date date * @param format e.g. yyyy-MM-dd HH:mm:ss * @return date string */ public static String format(Date date, String format) { return format(date2LocalDateTime(date), format); } /** * get the formatted date string * * @param localDateTime local data time * @param format yyyy-MM-dd HH:mm:ss * @return date string */ public static String format(LocalDateTime localDateTime, String format) { return localDateTime.format(DateTimeFormatter.ofPattern(format)); } /** * convert time to yyyy-MM-dd HH:mm:ss format * * @param date date * @return date string */ public static String dateToString(Date date) { return format(date, Constants.YYYY_MM_DD_HH_MM_SS); } /** * convert string to date and time * * @param date date * @param format format * @return date */ public static Date parse(String date, String format) { try { LocalDateTime ldt = LocalDateTime.parse(date, DateTimeFormatter.ofPattern(format)); return localDateTime2Date(ldt); } catch (Exception e) { logger.error("error while parse date:" + date, e); } return null; } /** * convert date str to yyyy-MM-dd HH:mm:ss format * * @param str date string * @return yyyy-MM-dd HH:mm:ss format */ public static Date stringToDate(String str) { return parse(str, Constants.YYYY_MM_DD_HH_MM_SS); } /** * get seconds between two dates * * @param d1 date1 * @param d2 date2 * @return differ seconds */ public static long differSec(Date d1, Date d2) { if(d1 == null || d2 == null){ return 0; } return (long) Math.ceil(differMs(d1, d2) / 1000.0); } /** * get ms between two dates * * @param d1 date1 * @param d2 date2 * @return differ ms */ public static long differMs(Date d1, Date d2) { return Math.abs(d1.getTime() - d2.getTime()); } /** * get hours between two dates * * @param d1 date1 * @param d2 date2 * @return differ hours */ public static long diffHours(Date d1, Date d2) { return (long) Math.ceil(diffMin(d1, d2) / 60.0); } /** * get minutes between two dates * * @param d1 date1 * @param d2 date2 * @return differ minutes */ public static long diffMin(Date d1, Date d2) { return (long) Math.ceil(differSec(d1, d2) / 60.0); } /** * get the date of the specified date in the days before and after * * @param date date * @param day day * @return the date of the specified date in the days before and after */ public static Date getSomeDay(Date date, int day) { Calendar calendar = Calendar.getInstance(); calendar.setTime(date); calendar.add(Calendar.DATE, day); return calendar.getTime(); } /** * get the hour of day. * * @param date date * @return hour of day */ public static int getHourIndex(Date date) { Calendar calendar = Calendar.getInstance(); calendar.setTime(date); return calendar.get(Calendar.HOUR_OF_DAY); } /** * compare two dates * * @param future future date * @param old old date * @return true if future time greater than old time */ public static boolean compare(Date future, Date old) { return future.getTime() > old.getTime(); } /** * convert schedule string to date * * @param schedule schedule * @return convert schedule string to date */ public static Date getScheduleDate(String schedule) { return stringToDate(schedule); } /** * format time to readable * * @param ms ms * @return format time */ public static String format2Readable(long ms) { long days = ms / (1000 * 60 * 60 * 24); long hours = (ms % (1000 * 60 * 60 * 24)) / (1000 * 60 * 60); long minutes = (ms % (1000 * 60 * 60)) / (1000 * 60); long seconds = (ms % (1000 * 60)) / 1000; return String.format("%02d %02d:%02d:%02d", days, hours, minutes, seconds); } /** * get monday * * note: Set the first day of the week to Monday, the default is Sunday * @param date date * @return get monday */ public static Date getMonday(Date date) { Calendar cal = Calendar.getInstance(); cal.setTime(date); cal.setFirstDayOfWeek(Calendar.MONDAY); cal.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY); return cal.getTime(); } /** * get sunday * * note: Set the first day of the week to Monday, the default is Sunday * @param date date * @return get sunday */ public static Date getSunday(Date date) { Calendar cal = Calendar.getInstance(); cal.setTime(date); cal.setFirstDayOfWeek(Calendar.MONDAY); cal.set(Calendar.DAY_OF_WEEK, Calendar.SUNDAY); return cal.getTime(); } /** * get first day of month * * @param date date * @return first day of month * */ public static Date getFirstDayOfMonth(Date date) { Calendar cal = Calendar.getInstance(); cal.setTime(date); cal.set(Calendar.DAY_OF_MONTH, 1); return cal.getTime(); } /** * get some hour of day * * @param date date * @param offsetHour hours * @return some hour of day * */ public static Date getSomeHourOfDay(Date date, int offsetHour) { Calendar cal = Calendar.getInstance(); cal.setTime(date); cal.set(Calendar.HOUR_OF_DAY, cal.get(Calendar.HOUR_OF_DAY) + offsetHour); cal.set(Calendar.MINUTE, 0); cal.set(Calendar.SECOND, 0); cal.set(Calendar.MILLISECOND, 0); return cal.getTime(); } /** * get last day of month * * @param date date * @return get last day of month */ public static Date getLastDayOfMonth(Date date) { Calendar cal = Calendar.getInstance(); cal.setTime(date); cal.add(Calendar.MONTH, 1); cal.set(Calendar.DAY_OF_MONTH, 1); cal.add(Calendar.DAY_OF_MONTH, -1); return cal.getTime(); } /** * return YYYY-MM-DD 00:00:00 * * @param inputDay date * @return start day */ public static Date getStartOfDay(Date inputDay) { Calendar cal = Calendar.getInstance(); cal.setTime(inputDay); cal.set(Calendar.HOUR_OF_DAY, 0); cal.set(Calendar.MINUTE, 0); cal.set(Calendar.SECOND, 0); cal.set(Calendar.MILLISECOND, 0); return cal.getTime(); } /** * return YYYY-MM-DD 23:59:59 * * @param inputDay day * @return end of day */ public static Date getEndOfDay(Date inputDay) { Calendar cal = Calendar.getInstance(); cal.setTime(inputDay); cal.set(Calendar.HOUR_OF_DAY, 23); cal.set(Calendar.MINUTE, 59); cal.set(Calendar.SECOND, 59); cal.set(Calendar.MILLISECOND, 999); return cal.getTime(); } /** * return YYYY-MM-DD 00:00:00 * * @param inputDay day * @return start of hour */ public static Date getStartOfHour(Date inputDay) { Calendar cal = Calendar.getInstance(); cal.setTime(inputDay); cal.set(Calendar.MINUTE, 0); cal.set(Calendar.SECOND, 0); cal.set(Calendar.MILLISECOND, 0); return cal.getTime(); } /** * return YYYY-MM-DD 23:59:59 * * @param inputDay day * @return end of hour */ public static Date getEndOfHour(Date inputDay) { Calendar cal = Calendar.getInstance(); cal.setTime(inputDay); cal.set(Calendar.MINUTE, 59); cal.set(Calendar.SECOND, 59); cal.set(Calendar.MILLISECOND, 999); return cal.getTime(); } /** * get current date * @return current date */ public static Date getCurrentDate() { return DateUtils.parse(DateUtils.getCurrentTime(), Constants.YYYY_MM_DD_HH_MM_SS); } /** * get date * @param date date * @param calendarField calendarField * @param amount amount * @return date */ public static Date add(final Date date, final int calendarField, final int amount) { if (date == null) { throw new IllegalArgumentException("The date must not be null"); } final Calendar c = Calendar.getInstance(); c.setTime(date); c.add(calendarField, amount); return c.getTime(); } } \ No newline at end of file +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.utils; + +import org.apache.dolphinscheduler.common.Constants; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Calendar; +import java.util.Date; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * date utils + */ +public class DateUtils { + + private static final Logger logger = LoggerFactory.getLogger(DateUtils.class); + + /** + * date to local datetime + * + * @param date date + * @return local datetime + */ + private static LocalDateTime date2LocalDateTime(Date date) { + return LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault()); + } + + /** + * local datetime to date + * + * @param localDateTime local datetime + * @return date + */ + private static Date localDateTime2Date(LocalDateTime localDateTime) { + Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant(); + return Date.from(instant); + } + + /** + * get current date str + * + * @return date string + */ + public static String getCurrentTime() { + return getCurrentTime(Constants.YYYY_MM_DD_HH_MM_SS); + } + + /** + * get the date string in the specified format of the current time + * + * @param format date format + * @return date string + */ + public static String getCurrentTime(String format) { + return LocalDateTime.now().format(DateTimeFormatter.ofPattern(format)); + } + + /** + * get the formatted date string + * + * @param date date + * @param format e.g. yyyy-MM-dd HH:mm:ss + * @return date string + */ + public static String format(Date date, String format) { + return format(date2LocalDateTime(date), format); + } + + /** + * get the formatted date string + * + * @param localDateTime local data time + * @param format yyyy-MM-dd HH:mm:ss + * @return date string + */ + public static String format(LocalDateTime localDateTime, String format) { + return localDateTime.format(DateTimeFormatter.ofPattern(format)); + } + + /** + * convert time to yyyy-MM-dd HH:mm:ss format + * + * @param date date + * @return date string + */ + public static String dateToString(Date date) { + return format(date, Constants.YYYY_MM_DD_HH_MM_SS); + } + + /** + * convert string to date and time + * + * @param date date + * @param format format + * @return date + */ + public static Date parse(String date, String format) { + try { + LocalDateTime ldt = LocalDateTime.parse(date, DateTimeFormatter.ofPattern(format)); + return localDateTime2Date(ldt); + } catch (Exception e) { + logger.error("error while parse date:" + date, e); + } + return null; + } + + /** + * convert date str to yyyy-MM-dd HH:mm:ss format + * + * @param str date string + * @return yyyy-MM-dd HH:mm:ss format + */ + public static Date stringToDate(String str) { + return parse(str, Constants.YYYY_MM_DD_HH_MM_SS); + } + + /** + * get seconds between two dates + * + * @param d1 date1 + * @param d2 date2 + * @return differ seconds + */ + public static long differSec(Date d1, Date d2) { + if (d1 == null || d2 == null) { + return 0; + } + return (long) Math.ceil(differMs(d1, d2) / 1000.0); + } + + /** + * get ms between two dates + * + * @param d1 date1 + * @param d2 date2 + * @return differ ms + */ + public static long differMs(Date d1, Date d2) { + return Math.abs(d1.getTime() - d2.getTime()); + } + + /** + * get hours between two dates + * + * @param d1 date1 + * @param d2 date2 + * @return differ hours + */ + public static long diffHours(Date d1, Date d2) { + return (long) Math.ceil(diffMin(d1, d2) / 60.0); + } + + /** + * get minutes between two dates + * + * @param d1 date1 + * @param d2 date2 + * @return differ minutes + */ + public static long diffMin(Date d1, Date d2) { + return (long) Math.ceil(differSec(d1, d2) / 60.0); + } + + /** + * get the date of the specified date in the days before and after + * + * @param date date + * @param day day + * @return the date of the specified date in the days before and after + */ + public static Date getSomeDay(Date date, int day) { + Calendar calendar = Calendar.getInstance(); + calendar.setTime(date); + calendar.add(Calendar.DATE, day); + return calendar.getTime(); + } + + /** + * get the hour of day. + * + * @param date date + * @return hour of day + */ + public static int getHourIndex(Date date) { + Calendar calendar = Calendar.getInstance(); + calendar.setTime(date); + return calendar.get(Calendar.HOUR_OF_DAY); + } + + /** + * compare two dates + * + * @param future future date + * @param old old date + * @return true if future time greater than old time + */ + public static boolean compare(Date future, Date old) { + return future.getTime() > old.getTime(); + } + + /** + * convert schedule string to date + * + * @param schedule schedule + * @return convert schedule string to date + */ + public static Date getScheduleDate(String schedule) { + return stringToDate(schedule); + } + + /** + * format time to readable + * + * @param ms ms + * @return format time + */ + public static String format2Readable(long ms) { + + long days = ms / (1000 * 60 * 60 * 24); + long hours = (ms % (1000 * 60 * 60 * 24)) / (1000 * 60 * 60); + long minutes = (ms % (1000 * 60 * 60)) / (1000 * 60); + long seconds = (ms % (1000 * 60)) / 1000; + + return String.format("%02d %02d:%02d:%02d", days, hours, minutes, seconds); + + } + + /** + * get monday + *

+ * note: Set the first day of the week to Monday, the default is Sunday + * + * @param date date + * @return get monday + */ + public static Date getMonday(Date date) { + Calendar cal = Calendar.getInstance(); + + cal.setTime(date); + + cal.setFirstDayOfWeek(Calendar.MONDAY); + cal.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY); + + return cal.getTime(); + } + + /** + * get sunday + *

+ * note: Set the first day of the week to Monday, the default is Sunday + * + * @param date date + * @return get sunday + */ + public static Date getSunday(Date date) { + Calendar cal = Calendar.getInstance(); + cal.setTime(date); + + cal.setFirstDayOfWeek(Calendar.MONDAY); + cal.set(Calendar.DAY_OF_WEEK, Calendar.SUNDAY); + + return cal.getTime(); + } + + /** + * get first day of month + * + * @param date date + * @return first day of month + */ + public static Date getFirstDayOfMonth(Date date) { + Calendar cal = Calendar.getInstance(); + + cal.setTime(date); + cal.set(Calendar.DAY_OF_MONTH, 1); + + return cal.getTime(); + } + + /** + * get some hour of day + * + * @param date date + * @param offsetHour hours + * @return some hour of day + */ + public static Date getSomeHourOfDay(Date date, int offsetHour) { + Calendar cal = Calendar.getInstance(); + + cal.setTime(date); + cal.set(Calendar.HOUR_OF_DAY, cal.get(Calendar.HOUR_OF_DAY) + offsetHour); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + + return cal.getTime(); + } + + /** + * get last day of month + * + * @param date date + * @return get last day of month + */ + public static Date getLastDayOfMonth(Date date) { + Calendar cal = Calendar.getInstance(); + + cal.setTime(date); + + cal.add(Calendar.MONTH, 1); + cal.set(Calendar.DAY_OF_MONTH, 1); + cal.add(Calendar.DAY_OF_MONTH, -1); + + return cal.getTime(); + } + + /** + * return YYYY-MM-DD 00:00:00 + * + * @param inputDay date + * @return start day + */ + public static Date getStartOfDay(Date inputDay) { + Calendar cal = Calendar.getInstance(); + cal.setTime(inputDay); + cal.set(Calendar.HOUR_OF_DAY, 0); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + return cal.getTime(); + } + + /** + * return YYYY-MM-DD 23:59:59 + * + * @param inputDay day + * @return end of day + */ + public static Date getEndOfDay(Date inputDay) { + Calendar cal = Calendar.getInstance(); + cal.setTime(inputDay); + cal.set(Calendar.HOUR_OF_DAY, 23); + cal.set(Calendar.MINUTE, 59); + cal.set(Calendar.SECOND, 59); + cal.set(Calendar.MILLISECOND, 999); + return cal.getTime(); + } + + /** + * return YYYY-MM-DD 00:00:00 + * + * @param inputDay day + * @return start of hour + */ + public static Date getStartOfHour(Date inputDay) { + Calendar cal = Calendar.getInstance(); + cal.setTime(inputDay); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + return cal.getTime(); + } + + /** + * return YYYY-MM-DD 23:59:59 + * + * @param inputDay day + * @return end of hour + */ + public static Date getEndOfHour(Date inputDay) { + Calendar cal = Calendar.getInstance(); + cal.setTime(inputDay); + cal.set(Calendar.MINUTE, 59); + cal.set(Calendar.SECOND, 59); + cal.set(Calendar.MILLISECOND, 999); + return cal.getTime(); + } + + /** + * get current date + * + * @return current date + */ + public static Date getCurrentDate() { + return DateUtils.parse(DateUtils.getCurrentTime(), + Constants.YYYY_MM_DD_HH_MM_SS); + } + + /** + * get date + * + * @param date date + * @param calendarField calendarField + * @param amount amount + * @return date + */ + public static Date add(final Date date, final int calendarField, final int amount) { + if (date == null) { + throw new IllegalArgumentException("The date must not be null"); + } + final Calendar c = Calendar.getInstance(); + c.setTime(date); + c.add(calendarField, amount); + return c.getTime(); + } + + /** + * starting from the current time, get how many seconds are left before the target time. + * targetTime = baseTime + intervalSeconds + * + * @param baseTime base time + * @param intervalSeconds a period of time + * @return the number of seconds + */ + public static long getRemainTime(Date baseTime, long intervalSeconds) { + if (baseTime == null) { + return 0; + } + long usedTime = (System.currentTimeMillis() - baseTime.getTime()) / 1000; + return intervalSeconds - usedTime; + } +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/enums/ExecutionStatusTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/enums/ExecutionStatusTest.java new file mode 100644 index 0000000000..6d4be78aef --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/enums/ExecutionStatusTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.enums; + +import junit.framework.TestCase; + +/** + * execution status test. + */ +public class ExecutionStatusTest extends TestCase { + + public void testTypeIsRunning() { + assertTrue(ExecutionStatus.RUNNING_EXECUTION.typeIsRunning()); + assertTrue(ExecutionStatus.WAITTING_DEPEND.typeIsRunning()); + assertTrue(ExecutionStatus.DELAY_EXECUTION.typeIsRunning()); + } +} \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index b82da62b02..9688200b2c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -42,7 +42,7 @@ public class TaskInstance implements Serializable { /** * id */ - @TableId(value="id", type=IdType.AUTO) + @TableId(value = "id", type = IdType.AUTO) private int id; /** @@ -51,7 +51,6 @@ public class TaskInstance implements Serializable { private String name; - /** * task type */ @@ -83,22 +82,28 @@ public class TaskInstance implements Serializable { */ private ExecutionStatus state; + /** + * task first submit time. + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date firstSubmitTime; + /** * task submit time */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date submitTime; /** * task start time */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date startTime; /** * task end time */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date endTime; /** @@ -214,11 +219,14 @@ public class TaskInstance implements Serializable { @TableField(exist = false) - private Map resources; - + private Map resources; + /** + * delay execution time. + */ + private int delayTime; - public void init(String host,Date startTime,String executePath){ + public void init(String host, Date startTime, String executePath) { this.host = host; this.startTime = startTime; this.executePath = executePath; @@ -297,6 +305,14 @@ public class TaskInstance implements Serializable { this.state = state; } + public Date getFirstSubmitTime() { + return firstSubmitTime; + } + + public void setFirstSubmitTime(Date firstSubmitTime) { + this.firstSubmitTime = firstSubmitTime; + } + public Date getSubmitTime() { return submitTime; } @@ -361,7 +377,7 @@ public class TaskInstance implements Serializable { this.retryTimes = retryTimes; } - public Boolean isTaskSuccess(){ + public Boolean isTaskSuccess() { return this.state == ExecutionStatus.SUCCESS; } @@ -400,6 +416,7 @@ public class TaskInstance implements Serializable { public void setFlag(Flag flag) { this.flag = flag; } + public String getProcessInstanceName() { return processInstanceName; } @@ -464,33 +481,33 @@ public class TaskInstance implements Serializable { this.resources = resources; } - public boolean isSubProcess(){ + public boolean isSubProcess() { return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType)); } - public boolean isDependTask(){ + public boolean isDependTask() { return TaskType.DEPENDENT.equals(TaskType.valueOf(this.taskType)); } - public boolean isConditionsTask(){ + public boolean isConditionsTask() { return TaskType.CONDITIONS.equals(TaskType.valueOf(this.taskType)); } - /** * determine if you can try again + * * @return can try result */ public boolean taskCanRetry() { - if(this.isSubProcess()){ + if (this.isSubProcess()) { return false; } - if(this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){ + if (this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) { return true; - }else { + } else { return (this.getState().typeIsFailure() - && this.getRetryTimes() < this.getMaxRetryTimes()); + && this.getRetryTimes() < this.getMaxRetryTimes()); } } @@ -526,40 +543,50 @@ public class TaskInstance implements Serializable { this.dependentResult = dependentResult; } + public int getDelayTime() { + return delayTime; + } + + public void setDelayTime(int delayTime) { + this.delayTime = delayTime; + } + @Override public String toString() { - return "TaskInstance{" + - "id=" + id + - ", name='" + name + '\'' + - ", taskType='" + taskType + '\'' + - ", processDefinitionId=" + processDefinitionId + - ", processInstanceId=" + processInstanceId + - ", processInstanceName='" + processInstanceName + '\'' + - ", taskJson='" + taskJson + '\'' + - ", state=" + state + - ", submitTime=" + submitTime + - ", startTime=" + startTime + - ", endTime=" + endTime + - ", host='" + host + '\'' + - ", executePath='" + executePath + '\'' + - ", logPath='" + logPath + '\'' + - ", retryTimes=" + retryTimes + - ", alertFlag=" + alertFlag + - ", processInstance=" + processInstance + - ", processDefine=" + processDefine + - ", pid=" + pid + - ", appLink='" + appLink + '\'' + - ", flag=" + flag + - ", dependency='" + dependency + '\'' + - ", duration=" + duration + - ", maxRetryTimes=" + maxRetryTimes + - ", retryInterval=" + retryInterval + - ", taskInstancePriority=" + taskInstancePriority + - ", processInstancePriority=" + processInstancePriority + - ", dependentResult='" + dependentResult + '\'' + - ", workerGroup='" + workerGroup + '\'' + - ", executorId=" + executorId + - ", executorName='" + executorName + '\'' + - '}'; + return "TaskInstance{" + + "id=" + id + + ", name='" + name + '\'' + + ", taskType='" + taskType + '\'' + + ", processDefinitionId=" + processDefinitionId + + ", processInstanceId=" + processInstanceId + + ", processInstanceName='" + processInstanceName + '\'' + + ", taskJson='" + taskJson + '\'' + + ", state=" + state + + ", firstSubmitTime=" + firstSubmitTime + + ", submitTime=" + submitTime + + ", startTime=" + startTime + + ", endTime=" + endTime + + ", host='" + host + '\'' + + ", executePath='" + executePath + '\'' + + ", logPath='" + logPath + '\'' + + ", retryTimes=" + retryTimes + + ", alertFlag=" + alertFlag + + ", processInstance=" + processInstance + + ", processDefine=" + processDefine + + ", pid=" + pid + + ", appLink='" + appLink + '\'' + + ", flag=" + flag + + ", dependency='" + dependency + '\'' + + ", duration=" + duration + + ", maxRetryTimes=" + maxRetryTimes + + ", retryInterval=" + retryInterval + + ", taskInstancePriority=" + taskInstancePriority + + ", processInstancePriority=" + processInstancePriority + + ", dependentResult='" + dependentResult + '\'' + + ", workerGroup='" + workerGroup + '\'' + + ", executorId=" + executorId + + ", executorName='" + executorName + '\'' + + ", delayTime=" + delayTime + + '}'; } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java index a69022214d..179ae1bef8 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java @@ -75,6 +75,7 @@ public class NettyDecoder extends ReplayingDecoder { out.add(packet); // checkpoint(State.MAGIC); + break; default: logger.warn("unknown decoder state {}", state()); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index 535c274989..74b0635145 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -40,6 +40,7 @@ public class TaskExecutionContextBuilder { public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(TaskInstance taskInstance){ taskExecutionContext.setTaskInstanceId(taskInstance.getId()); taskExecutionContext.setTaskName(taskInstance.getName()); + taskExecutionContext.setFirstSubmitTime(taskInstance.getFirstSubmitTime()); taskExecutionContext.setStartTime(taskInstance.getStartTime()); taskExecutionContext.setTaskType(taskInstance.getTaskType()); taskExecutionContext.setLogPath(taskInstance.getLogPath()); @@ -48,6 +49,7 @@ public class TaskExecutionContextBuilder { taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup()); taskExecutionContext.setHost(taskInstance.getHost()); taskExecutionContext.setResources(taskInstance.getResources()); + taskExecutionContext.setDelayTime(taskInstance.getDelayTime()); return this; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index 81488fb134..1589c365c2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.entity; -import com.fasterxml.jackson.annotation.JsonFormat; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; import org.apache.dolphinscheduler.remote.utils.JsonSerializer; @@ -26,30 +26,38 @@ import java.io.Serializable; import java.util.Date; import java.util.Map; +import com.fasterxml.jackson.annotation.JsonFormat; + /** - * master/worker task transport + * master/worker task transport */ -public class TaskExecutionContext implements Serializable{ +public class TaskExecutionContext implements Serializable { /** - * task id + * task id */ private int taskInstanceId; /** - * task name + * task name */ private String taskName; /** - * task start time + * task first submit time. */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date firstSubmitTime; + + /** + * task start time + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date startTime; /** - * task type + * task type */ private String taskType; @@ -57,9 +65,9 @@ public class TaskExecutionContext implements Serializable{ * host */ private String host; - + /** - * task execute path + * task execute path */ private String executePath; @@ -69,7 +77,7 @@ public class TaskExecutionContext implements Serializable{ private String logPath; /** - * task json + * task json */ private String taskJson; @@ -84,53 +92,53 @@ public class TaskExecutionContext implements Serializable{ private String appIds; /** - * process instance id + * process instance id */ private int processInstanceId; /** - * process instance schedule time + * process instance schedule time */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date scheduleTime; /** - * process instance global parameters + * process instance global parameters */ private String globalParams; /** - * execute user id + * execute user id */ private int executorId; /** - * command type if complement + * command type if complement */ private int cmdTypeIfComplement; /** - * tenant code + * tenant code */ private String tenantCode; /** - * task queue + * task queue */ private String queue; /** - * process define id + * process define id */ private int processDefineId; /** - * project id + * project id */ private int projectId; @@ -140,12 +148,12 @@ public class TaskExecutionContext implements Serializable{ private String taskParams; /** - * envFile + * envFile */ private String envFile; /** - * definedParams + * definedParams */ private Map definedParams; @@ -155,7 +163,7 @@ public class TaskExecutionContext implements Serializable{ private String taskAppId; /** - * task timeout strategy + * task timeout strategy */ private int taskTimeoutStrategy; @@ -169,18 +177,28 @@ public class TaskExecutionContext implements Serializable{ */ private String workerGroup; + /** + * delay execution time. + */ + private int delayTime; + + /** + * current execution status + */ + private ExecutionStatus currentExecutionStatus; + /** * resources full name and tenant code */ - private Map resources; + private Map resources; /** - * sql TaskExecutionContext + * sql TaskExecutionContext */ private SQLTaskExecutionContext sqlTaskExecutionContext; /** - * datax TaskExecutionContext + * datax TaskExecutionContext */ private DataxTaskExecutionContext dataxTaskExecutionContext; @@ -195,7 +213,7 @@ public class TaskExecutionContext implements Serializable{ private SqoopTaskExecutionContext sqoopTaskExecutionContext; /** - * procedure TaskExecutionContext + * procedure TaskExecutionContext */ private ProcedureTaskExecutionContext procedureTaskExecutionContext; @@ -215,6 +233,14 @@ public class TaskExecutionContext implements Serializable{ this.taskName = taskName; } + public Date getFirstSubmitTime() { + return firstSubmitTime; + } + + public void setFirstSubmitTime(Date firstSubmitTime) { + this.firstSubmitTime = firstSubmitTime; + } + public Date getStartTime() { return startTime; } @@ -407,6 +433,22 @@ public class TaskExecutionContext implements Serializable{ this.workerGroup = workerGroup; } + public int getDelayTime() { + return delayTime; + } + + public void setDelayTime(int delayTime) { + this.delayTime = delayTime; + } + + public ExecutionStatus getCurrentExecutionStatus() { + return currentExecutionStatus; + } + + public void setCurrentExecutionStatus(ExecutionStatus currentExecutionStatus) { + this.currentExecutionStatus = currentExecutionStatus; + } + public SQLTaskExecutionContext getSqlTaskExecutionContext() { return sqlTaskExecutionContext; } @@ -431,7 +473,7 @@ public class TaskExecutionContext implements Serializable{ this.procedureTaskExecutionContext = procedureTaskExecutionContext; } - public Command toCommand(){ + public Command toCommand() { TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(); requestCommand.setTaskExecutionContext(JsonSerializer.serializeToString(this)); return requestCommand.convert2Command(); @@ -463,39 +505,42 @@ public class TaskExecutionContext implements Serializable{ @Override public String toString() { - return "TaskExecutionContext{" + - "taskInstanceId=" + taskInstanceId + - ", taskName='" + taskName + '\'' + - ", startTime=" + startTime + - ", taskType='" + taskType + '\'' + - ", host='" + host + '\'' + - ", executePath='" + executePath + '\'' + - ", logPath='" + logPath + '\'' + - ", taskJson='" + taskJson + '\'' + - ", processId=" + processId + - ", appIds='" + appIds + '\'' + - ", processInstanceId=" + processInstanceId + - ", scheduleTime=" + scheduleTime + - ", globalParams='" + globalParams + '\'' + - ", executorId=" + executorId + - ", cmdTypeIfComplement=" + cmdTypeIfComplement + - ", tenantCode='" + tenantCode + '\'' + - ", queue='" + queue + '\'' + - ", processDefineId=" + processDefineId + - ", projectId=" + projectId + - ", taskParams='" + taskParams + '\'' + - ", envFile='" + envFile + '\'' + - ", definedParams=" + definedParams + - ", taskAppId='" + taskAppId + '\'' + - ", taskTimeoutStrategy=" + taskTimeoutStrategy + - ", taskTimeout=" + taskTimeout + - ", workerGroup='" + workerGroup + '\'' + - ", resources=" + resources + - ", sqlTaskExecutionContext=" + sqlTaskExecutionContext + - ", dataxTaskExecutionContext=" + dataxTaskExecutionContext + - ", dependenceTaskExecutionContext=" + dependenceTaskExecutionContext + - ", sqoopTaskExecutionContext=" + sqoopTaskExecutionContext + - ", procedureTaskExecutionContext=" + procedureTaskExecutionContext + - '}'; + return "TaskExecutionContext{" + + "taskInstanceId=" + taskInstanceId + + ", taskName='" + taskName + '\'' + + ", currentExecutionStatus=" + currentExecutionStatus + + ", firstSubmitTime=" + firstSubmitTime + + ", startTime=" + startTime + + ", taskType='" + taskType + '\'' + + ", host='" + host + '\'' + + ", executePath='" + executePath + '\'' + + ", logPath='" + logPath + '\'' + + ", taskJson='" + taskJson + '\'' + + ", processId=" + processId + + ", appIds='" + appIds + '\'' + + ", processInstanceId=" + processInstanceId + + ", scheduleTime=" + scheduleTime + + ", globalParams='" + globalParams + '\'' + + ", executorId=" + executorId + + ", cmdTypeIfComplement=" + cmdTypeIfComplement + + ", tenantCode='" + tenantCode + '\'' + + ", queue='" + queue + '\'' + + ", processDefineId=" + processDefineId + + ", projectId=" + projectId + + ", taskParams='" + taskParams + '\'' + + ", envFile='" + envFile + '\'' + + ", definedParams=" + definedParams + + ", taskAppId='" + taskAppId + '\'' + + ", taskTimeoutStrategy=" + taskTimeoutStrategy + + ", taskTimeout=" + taskTimeout + + ", workerGroup='" + workerGroup + '\'' + + ", delayTime=" + delayTime + + ", resources=" + resources + + ", sqlTaskExecutionContext=" + sqlTaskExecutionContext + + ", dataxTaskExecutionContext=" + dataxTaskExecutionContext + + ", dependenceTaskExecutionContext=" + dependenceTaskExecutionContext + + ", sqoopTaskExecutionContext=" + sqoopTaskExecutionContext + + ", procedureTaskExecutionContext=" + procedureTaskExecutionContext + + '}'; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java index a410f99fc1..6878d5a8b1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java @@ -60,6 +60,7 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread { */ public ConditionsTaskExecThread(TaskInstance taskInstance) { super(taskInstance); + taskInstance.setStartTime(new Date()); } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java index 183f1aac42..cfe885cbfa 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java @@ -64,6 +64,7 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread { */ public DependentTaskExecThread(TaskInstance taskInstance) { super(taskInstance); + taskInstance.setStartTime(new Date()); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index aac201c403..cf5359d579 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -16,11 +16,11 @@ */ package org.apache.dolphinscheduler.server.master.runner; -import ch.qos.logback.classic.LoggerContext; -import ch.qos.logback.classic.sift.SiftingAppender; +import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -30,11 +30,14 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; + +import java.util.concurrent.Callable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.dolphinscheduler.common.Constants.*; -import java.util.concurrent.Callable; +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.sift.SiftingAppender; /** @@ -171,9 +174,10 @@ public class MasterBaseTaskExecThread implements Callable { logger.info(String.format("submit task , but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString())); return true; } - // task cannot submit when running - if(taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION){ - logger.info(String.format("submit to task, but task [%s] state already be running. ", taskInstance.getName())); + // task cannot be submitted because its execution state is RUNNING or DELAY. + if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION + || taskInstance.getState() == ExecutionStatus.DELAY_EXECUTION) { + logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName()); return true; } logger.info("task ready to submit: {}", taskInstance); @@ -272,5 +276,4 @@ public class MasterBaseTaskExecThread implements Callable { return logPath; } - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 5be6050f87..788b30638e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -16,10 +16,21 @@ */ package org.apache.dolphinscheduler.server.master.runner; -import com.google.common.collect.Lists; -import org.apache.commons.io.FileUtils; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_RECOVERY_START_NODE_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_START_NODE_NAMES; +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT; + import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.DependResult; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; @@ -27,7 +38,13 @@ import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -37,17 +54,26 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.AlertManager; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; import java.io.File; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import static org.apache.dolphinscheduler.common.Constants.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; /** * master exec thread,split dag @@ -470,9 +496,6 @@ public class MasterExecThread implements Runnable { // task instance whether alert taskInstance.setAlertFlag(Flag.NO); - // task instance start time - taskInstance.setStartTime(new Date()); - // task instance flag taskInstance.setFlag(Flag.YES); @@ -501,6 +524,8 @@ public class MasterExecThread implements Runnable { taskInstance.setWorkerGroup(taskWorkerGroup); } + // delay execution time + taskInstance.setDelayTime(taskNode.getDelayTime()); } return taskInstance; } @@ -719,9 +744,10 @@ public class MasterExecThread implements Runnable { * @return ExecutionStatus */ private ExecutionStatus runningState(ExecutionStatus state){ - if(state == ExecutionStatus.READY_STOP || - state == ExecutionStatus.READY_PAUSE || - state == ExecutionStatus.WAITTING_THREAD){ + if (state == ExecutionStatus.READY_STOP + || state == ExecutionStatus.READY_PAUSE + || state == ExecutionStatus.WAITTING_THREAD + || state == ExecutionStatus.DELAY_EXECUTION) { // if the running task is not completed, the state remains unchanged return state; }else{ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index a2dd7cea8d..72ee0fcb89 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -39,7 +41,6 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.Date; import java.util.Set; -import org.apache.dolphinscheduler.common.utils.*; /** @@ -150,7 +151,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { break; } if(checkTimeout){ - long remainTime = getRemaintime(taskTimeoutParameter.getInterval() * 60L); + long remainTime = DateUtils.getRemainTime(taskInstance.getStartTime(), taskTimeoutParameter.getInterval() * 60L); if (remainTime < 0) { logger.warn("task id: {} execution time out",taskInstance.getId()); // process define @@ -256,16 +257,4 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class); return taskNode.getTaskTimeoutParameter(); } - - - /** - * get remain time?s? - * - * @return remain time - */ - private long getRemaintime(long timeoutSeconds) { - Date startTime = taskInstance.getStartTime(); - long usedTime = (System.currentTimeMillis() - startTime.getTime()) / 1000; - return timeoutSeconds - usedTime; - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 2ce4515279..2dc9bb2ce9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -18,15 +18,17 @@ package org.apache.dolphinscheduler.server.worker.processor; -import ch.qos.logback.classic.LoggerContext; -import ch.qos.logback.classic.sift.SiftingAppender; -import com.github.rholder.retry.RetryException; -import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.common.utils.Preconditions; +import org.apache.dolphinscheduler.common.utils.RetryerUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; @@ -38,14 +40,21 @@ import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Date; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.rholder.retry.RetryException; + +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.sift.SiftingAppender; +import io.netty.channel.Channel; + /** * worker request processor */ @@ -97,7 +106,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { return; } - taskExecutionContext.setHost(NetUtils.getHost() + ":" + workerConfig.getListenPort()); + taskExecutionContext.setHost(NetUtils.getHost() + ":" + workerConfig.getListenPort()); // custom logger Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, @@ -122,7 +131,15 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); - // tell master that task is in executing + if (DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L) > 0) { + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION); + taskExecutionContext.setStartTime(null); + } else { + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); + taskExecutionContext.setStartTime(new Date()); + } + + // tell master the status of this task (RUNNING_EXECUTION or DELAY_EXECUTION) final Command ackCommand = buildAckCommand(taskExecutionContext).convert2Command(); try { @@ -167,10 +184,10 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { private TaskExecuteAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) { TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - ackCommand.setStatus(ExecutionStatus.RUNNING_EXECUTION.getCode()); + ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode()); ackCommand.setLogPath(getTaskLogPath(taskExecutionContext)); ackCommand.setHost(taskExecutionContext.getHost()); - ackCommand.setStartTime(new Date()); + ackCommand.setStartTime(taskExecutionContext.getStartTime()); if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){ ackCommand.setExecutePath(null); }else{ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 26494bc77b..3ba49451c7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -16,25 +16,20 @@ */ package org.apache.dolphinscheduler.server.worker.runner; -import java.io.File; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.Set; - -import org.apache.commons.collections.MapUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.RetryerUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; @@ -43,9 +38,23 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import org.apache.commons.collections.MapUtils; + +import java.io.File; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.rholder.retry.RetryException; + /** * task scheduler thread @@ -105,6 +114,15 @@ public class TaskExecuteThread implements Runnable { // task node TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class); + delayExecutionIfNeeded(); + if (taskExecutionContext.getStartTime() == null) { + taskExecutionContext.setStartTime(new Date()); + } + if (taskExecutionContext.getCurrentExecutionStatus() != ExecutionStatus.RUNNING_EXECUTION) { + changeTaskExecutionStatusToRunning(); + } + logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId()); + // copy hdfs/minio file to local downloadResource(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources(), @@ -138,7 +156,7 @@ public class TaskExecuteThread implements Runnable { responseCommand.setProcessId(task.getProcessId()); responseCommand.setAppIds(task.getAppIds()); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus()); - }catch (Exception e){ + } catch (Exception e) { logger.error("task scheduler failure", e); kill(); responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); @@ -147,9 +165,10 @@ public class TaskExecuteThread implements Runnable { responseCommand.setAppIds(task.getAppIds()); } finally { try { + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(responseCommand.getStatus())); taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); - }catch (Exception e){ + } catch (Exception e) { ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); } @@ -256,4 +275,59 @@ public class TaskExecuteThread implements Runnable { } } } + + /** + * delay execution if needed. + */ + private void delayExecutionIfNeeded() { + long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), + taskExecutionContext.getDelayTime() * 60L); + logger.info("delay execution time: {} s", remainTime < 0 ? 0 : remainTime); + if (remainTime > 0) { + try { + Thread.sleep(remainTime * Constants.SLEEP_TIME_MILLIS); + } catch (Exception e) { + logger.error("delay task execution failure, the task will be executed directly. process instance id:{}, task instance id:{}", + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId()); + } + } + } + + /** + * send an ack to change the status of the task. + */ + private void changeTaskExecutionStatusToRunning() { + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); + Command ackCommand = buildAckCommand().convert2Command(); + try { + RetryerUtils.retryCall(() -> { + taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand); + return Boolean.TRUE; + }); + } catch (ExecutionException | RetryException e) { + logger.error(e.getMessage(), e); + } + } + + /** + * build ack command. + * + * @return TaskExecuteAckCommand + */ + private TaskExecuteAckCommand buildAckCommand() { + TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); + ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode()); + ackCommand.setStartTime(taskExecutionContext.getStartTime()); + ackCommand.setLogPath(taskExecutionContext.getLogPath()); + ackCommand.setHost(taskExecutionContext.getHost()); + if (taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) + || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())) { + ackCommand.setExecutePath(null); + } else { + ackCommand.setExecutePath(taskExecutionContext.getExecutePath()); + } + return ackCommand; + } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 3459e35b72..a6f0f1a29e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -16,12 +16,14 @@ */ package org.apache.dolphinscheduler.server.worker.task; -import com.sun.jna.platform.win32.Kernel32; -import com.sun.jna.platform.win32.WinNT; +import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; @@ -32,9 +34,12 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.slf4j.Logger; -import java.io.*; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -47,8 +52,10 @@ import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; -import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE; -import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS; +import org.slf4j.Logger; + +import com.sun.jna.platform.win32.Kernel32; +import com.sun.jna.platform.win32.WinNT; /** * abstract command executor @@ -474,8 +481,7 @@ public abstract class AbstractCommandExecutor { * @return remain time */ private long getRemaintime() { - long usedTime = (System.currentTimeMillis() - taskExecutionContext.getStartTime().getTime()) / 1000; - long remainTime = taskExecutionContext.getTaskTimeout() - usedTime; + long remainTime = DateUtils.getRemainTime(taskExecutionContext.getStartTime(), taskExecutionContext.getTaskTimeout()); if (remainTime < 0) { throw new RuntimeException("task execution time out"); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index 997c129573..32881b5681 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -55,7 +55,8 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, CuratorZookeeperClient.class, NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class, - ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class}) + ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class, + CuratorZookeeperClient.class}) public class TaskPriorityQueueConsumerTest { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java index 98231bee06..de1f0517bd 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.junit.Test; @@ -46,7 +47,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, WorkerRegistry.class, NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, WorkerConfig.class, - ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) + ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, CuratorZookeeperClient.class}) public class ExecutorDispatcherTest { @Autowired diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java index a1c6b71437..f7d98baed1 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java @@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.junit.Assert; @@ -52,7 +53,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, WorkerRegistry.class, - ZookeeperNodeManager.class, ZookeeperRegistryCenter.class, WorkerConfig.class, + ZookeeperNodeManager.class, ZookeeperRegistryCenter.class, WorkerConfig.class, CuratorZookeeperClient.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, SpringApplicationContext.class, NettyExecutorManager.class}) public class NettyExecutorManagerTest { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java index c38c9d4ae8..21780635ac 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.junit.Assert; @@ -43,7 +44,7 @@ import org.springframework.test.context.junit4.SpringRunner; */ @RunWith(SpringRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, WorkerRegistry.class, ZookeeperRegistryCenter.class, WorkerConfig.class, - ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) + ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, CuratorZookeeperClient.class}) public class RoundRobinHostManagerTest { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index f19e2b4b64..a2b1b4ecc2 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.server.registry.DependencyConfig; import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.junit.Assert; @@ -35,7 +36,8 @@ import java.util.Date; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, TaskResponseService.class, ZookeeperRegistryCenter.class, - ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskResponseService.class}) + ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskResponseService.class, + CuratorZookeeperClient.class}) public class TaskResponseServiceTest { @Autowired diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java index 18c8b496d7..ea822cbfb1 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.junit.Assert; @@ -38,7 +39,8 @@ import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEP * master registry test */ @RunWith(SpringRunner.class) -@ContextConfiguration(classes={SpringZKServer.class, MasterRegistry.class,ZookeeperRegistryCenter.class, MasterConfig.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) +@ContextConfiguration(classes = {SpringZKServer.class, MasterRegistry.class, ZookeeperRegistryCenter.class, + MasterConfig.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, CuratorZookeeperClient.class}) public class MasterRegistryTest { @Autowired diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java new file mode 100644 index 0000000000..2e7e531f30 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; +import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; +import org.apache.dolphinscheduler.server.worker.task.AbstractTask; +import org.apache.dolphinscheduler.server.worker.task.TaskManager; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import java.util.Date; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * test task execute thread. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({TaskManager.class, JSONUtils.class, CommonUtils.class, SpringApplicationContext.class}) +public class TaskExecuteThreadTest { + + private TaskExecutionContext taskExecutionContext; + + private TaskCallbackService taskCallbackService; + + private Command ackCommand; + + private Command responseCommand; + + private Logger taskLogger; + + private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager; + + @Before + public void before() { + // init task execution context, logger + taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setProcessId(12345); + taskExecutionContext.setProcessDefineId(1); + taskExecutionContext.setProcessInstanceId(1); + taskExecutionContext.setTaskInstanceId(1); + taskExecutionContext.setTaskType(""); + taskExecutionContext.setFirstSubmitTime(new Date()); + taskExecutionContext.setDelayTime(0); + taskExecutionContext.setLogPath("/tmp/test.log"); + taskExecutionContext.setHost("localhost"); + taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4"); + + ackCommand = new TaskExecuteAckCommand().convert2Command(); + responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId()).convert2Command(); + + taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId( + LoggerUtils.TASK_LOGGER_INFO_PREFIX, + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId() + )); + + taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl(); + taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); + + taskCallbackService = PowerMockito.mock(TaskCallbackService.class); + PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand); + PowerMockito.doNothing().when(taskCallbackService).sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand); + + PowerMockito.mockStatic(SpringApplicationContext.class); + PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)) + .thenReturn(taskExecutionContextCacheManager); + + PowerMockito.mockStatic(TaskManager.class); + PowerMockito.when(TaskManager.newTask(taskExecutionContext, taskLogger)) + .thenReturn(new SimpleTask(taskExecutionContext, taskLogger)); + + PowerMockito.mockStatic(JSONUtils.class); + PowerMockito.when(JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class)) + .thenReturn(new TaskNode()); + + PowerMockito.mockStatic(CommonUtils.class); + PowerMockito.when(CommonUtils.getSystemEnvPath()).thenReturn("/user_home/.bash_profile"); + } + + @Test + public void testNormalExecution() { + taskExecutionContext.setTaskType("SQL"); + taskExecutionContext.setStartTime(new Date()); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); + TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger); + taskExecuteThread.run(); + + Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus()); + } + + @Test + public void testDelayExecution() { + taskExecutionContext.setTaskType("PYTHON"); + taskExecutionContext.setStartTime(null); + taskExecutionContext.setDelayTime(1); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION); + TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger); + taskExecuteThread.run(); + + Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus()); + } + + private class SimpleTask extends AbstractTask { + + protected SimpleTask(TaskExecutionContext taskExecutionContext, Logger logger) { + super(taskExecutionContext, logger); + // pid + this.processId = taskExecutionContext.getProcessId(); + } + + @Override + public AbstractParameters getParameters() { + return null; + } + + @Override + public void init() { + + } + + @Override + public void handle() throws Exception { + + } + + @Override + public void after() { + + } + + @Override + public ExecutionStatus getExitStatus() { + return ExecutionStatus.SUCCESS; + } + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 7df56c6cdd..6f642672cd 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -16,31 +16,89 @@ */ package org.apache.dolphinscheduler.service.process; -import com.cronutils.model.Cron; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.commons.lang.ArrayUtils; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_EMPTY_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID; +import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS; + +import static java.util.stream.Collectors.toSet; + import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.enums.AuthorizationType; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.CycleEnum; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.model.DateInterval; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters; -import org.apache.dolphinscheduler.common.utils.*; -import org.apache.dolphinscheduler.dao.entity.*; -import org.apache.dolphinscheduler.dao.mapper.*; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.CycleDependency; +import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.dao.entity.ErrorCommand; +import org.apache.dolphinscheduler.dao.entity.ProcessData; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.UdfFunc; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.CommandMapper; +import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; +import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; +import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.TenantMapper; +import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; +import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + import org.quartz.CronExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -import java.util.*; -import java.util.stream.Collectors; -import static java.util.stream.Collectors.toSet; -import static org.apache.dolphinscheduler.common.Constants.*; + +import com.cronutils.model.Cron; +import com.fasterxml.jackson.databind.node.ObjectNode; /** * process relative dao that some mappers in this. @@ -52,6 +110,7 @@ public class ProcessService { private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), ExecutionStatus.RUNNING_EXECUTION.ordinal(), + ExecutionStatus.DELAY_EXECUTION.ordinal(), ExecutionStatus.READY_PAUSE.ordinal(), ExecutionStatus.READY_STOP.ordinal()}; @@ -1001,8 +1060,9 @@ public class ProcessService { if(taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE){ taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1 ); } + taskInstance.setSubmitTime(null); + taskInstance.setStartTime(null); taskInstance.setEndTime(null); - taskInstance.setStartTime(new Date()); taskInstance.setFlag(Flag.YES); taskInstance.setHost(null); taskInstance.setId(0); @@ -1012,7 +1072,12 @@ public class ProcessService { taskInstance.setExecutorId(processInstance.getExecutorId()); taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority()); taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState)); - taskInstance.setSubmitTime(new Date()); + if (taskInstance.getSubmitTime() == null) { + taskInstance.setSubmitTime(new Date()); + } + if (taskInstance.getFirstSubmitTime() == null) { + taskInstance.setFirstSubmitTime(taskInstance.getSubmitTime()); + } boolean saveResult = saveTaskInstance(taskInstance); if(!saveResult){ return null; @@ -1062,10 +1127,11 @@ public class ProcessService { public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState){ ExecutionStatus state = taskInstance.getState(); if( - // running or killed + // running, delayed or killed // the task already exists in task queue // return state state == ExecutionStatus.RUNNING_EXECUTION + || state == ExecutionStatus.DELAY_EXECUTION || state == ExecutionStatus.KILL || checkTaskExistsInTaskQueue(taskInstance) ){ @@ -1588,7 +1654,7 @@ public class ProcessService { */ public List getCycleDependencies(int masterId,int[] ids,Date scheduledFireTime) throws Exception { List cycleDependencyList = new ArrayList(); - if(ArrayUtils.isEmpty(ids)){ + if (ids == null || ids.length == 0) { logger.warn("ids[] is empty!is invalid!"); return cycleDependencyList; } @@ -1772,7 +1838,7 @@ public class ProcessService { public List listUnauthorized(int userId,T[] needChecks,AuthorizationType authorizationType){ List resultList = new ArrayList(); - if (!ArrayUtils.isEmpty(needChecks)) { + if (Objects.nonNull(needChecks) && needChecks.length > 0) { Set originResSet = new HashSet(Arrays.asList(needChecks)); switch (authorizationType){ diff --git a/pom.xml b/pom.xml index 2937a814fc..49e00ba1b0 100644 --- a/pom.xml +++ b/pom.xml @@ -792,6 +792,7 @@ **/common/utils/RetryerUtilsTest.java **/common/plugin/FilePluginManagerTest **/common/plugin/PluginClassLoaderTest + **/common/enums/ExecutionStatusTest **/dao/mapper/AccessTokenMapperTest.java **/dao/mapper/AlertGroupMapperTest.java **/dao/mapper/CommandMapperTest.java @@ -842,6 +843,7 @@ **/server/worker/task/sqoop/SqoopTaskTest.java **/server/worker/EnvFileTest.java + **/server/worker/runner/TaskExecuteThreadTest.java **/service/quartz/cron/CronUtilsTest.java **/service/zk/DefaultEnsembleProviderTest.java **/service/zk/ZKServerTest.java diff --git a/sql/dolphinscheduler-postgre.sql b/sql/dolphinscheduler-postgre.sql index 4a947d134d..3a0b1843be 100644 --- a/sql/dolphinscheduler-postgre.sql +++ b/sql/dolphinscheduler-postgre.sql @@ -566,8 +566,10 @@ CREATE TABLE t_ds_task_instance ( retry_interval int DEFAULT NULL , max_retry_times int DEFAULT NULL , task_instance_priority int DEFAULT NULL , - worker_group varchar(64), + worker_group varchar(64), executor_id int DEFAULT NULL , + first_submit_time timestamp DEFAULT NULL , + delay_time int DEFAULT '0' , PRIMARY KEY (id) ) ; diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql index f4ad9a3cb8..bb3dbb095a 100644 --- a/sql/dolphinscheduler_mysql.sql +++ b/sql/dolphinscheduler_mysql.sql @@ -707,6 +707,8 @@ CREATE TABLE `t_ds_task_instance` ( `task_instance_priority` int(11) DEFAULT NULL COMMENT 'task instance priority:0 Highest,1 High,2 Medium,3 Low,4 Lowest', `worker_group` varchar(64) DEFAULT NULL COMMENT 'worker group id', `executor_id` int(11) DEFAULT NULL, + `first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time', + `delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time', PRIMARY KEY (`id`), KEY `process_instance_id` (`process_instance_id`) USING BTREE, KEY `task_instance_index` (`process_definition_id`,`process_instance_id`) USING BTREE, diff --git a/sql/upgrade/1.3.0_schema/mysql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.0_schema/mysql/dolphinscheduler_ddl.sql index 40e7a3f4dc..a24c9bc928 100644 --- a/sql/upgrade/1.3.0_schema/mysql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.3.0_schema/mysql/dolphinscheduler_ddl.sql @@ -297,4 +297,3 @@ delimiter ; CALL ac_dolphin_T_t_ds_user_A_state; DROP PROCEDURE ac_dolphin_T_t_ds_user_A_state; - diff --git a/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql new file mode 100644 index 0000000000..9eaf3f8d50 --- /dev/null +++ b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY','')); +-- uc_dolphin_T_t_ds_task_instance_A_first_submit_time +drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_instance_A_first_submit_time; +delimiter d// +CREATE PROCEDURE uc_dolphin_T_t_ds_task_instance_A_first_submit_time() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_task_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='first_submit_time') + THEN + ALTER TABLE t_ds_task_instance ADD `first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time'; + END IF; + END; + +d// + +delimiter ; +CALL uc_dolphin_T_t_ds_task_instance_A_first_submit_time(); +DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_first_submit_time; + +-- uc_dolphin_T_t_ds_task_instance_A_delay_time +drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_instance_A_delay_time; +delimiter d// +CREATE PROCEDURE uc_dolphin_T_t_ds_task_instance_A_delay_time() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_task_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='delay_time') + THEN + ALTER TABLE t_ds_task_instance ADD `delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time'; + END IF; + END; + +d// + +delimiter ; +CALL uc_dolphin_T_t_ds_task_instance_A_delay_time(); +DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_delay_time; + diff --git a/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_dml.sql b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_dml.sql new file mode 100644 index 0000000000..4a14f326b9 --- /dev/null +++ b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_dml.sql @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ diff --git a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql new file mode 100644 index 0000000000..9a65824238 --- /dev/null +++ b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +-- uc_dolphin_T_t_ds_task_instance_A_first_submit_time +delimiter d// +CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_task_instance_A_first_submit_time() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_task_instance' + AND COLUMN_NAME ='first_submit_time') + THEN + ALTER TABLE t_ds_task_instance ADD COLUMN first_submit_time timestamp DEFAULT NULL; + END IF; +END; +$$ LANGUAGE plpgsql; +d// + +delimiter ; +SELECT uc_dolphin_T_t_ds_task_instance_A_first_submit_time(); +DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_first_submit_time(); + + +-- uc_dolphin_T_t_ds_task_instance_A_delay_time +delimiter d// +CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_task_instance_A_delay_time() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_task_instance' + AND COLUMN_NAME ='delay_time') + THEN + ALTER TABLE t_ds_task_instance ADD COLUMN delay_time int DEFAULT '0'; + END IF; +END; +$$ LANGUAGE plpgsql; +d// + +delimiter ; +SELECT uc_dolphin_T_t_ds_task_instance_A_delay_time(); +DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_delay_time(); \ No newline at end of file diff --git a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_dml.sql b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_dml.sql new file mode 100644 index 0000000000..4a14f326b9 --- /dev/null +++ b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_dml.sql @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/