From 7aa8b518ac40b4e3e00e73555d8a4e7bf7ee228a Mon Sep 17 00:00:00 2001 From: baoliang Date: Tue, 11 May 2021 13:55:27 +0800 Subject: [PATCH] Merge remote-tracking branch 'upstream/dev' into json_split # Conflicts: # dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java # dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java --- .../ProcessDefinitionController.java | 1 - .../server/utils/DependentExecute.java | 35 +++++++++----- .../worker/task/procedure/ProcedureTask.java | 48 +++++++++---------- 3 files changed, 47 insertions(+), 37 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java index 94a2dc986c..286ae3d789 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java @@ -38,7 +38,6 @@ import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; -import org.apache.dolphinscheduler.api.utils.RegexUtils; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ReleaseState; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java index 90d55e0689..a7eba1f534 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java @@ -70,12 +70,13 @@ public class DependentExecute { /** * logger */ - private Logger logger = LoggerFactory.getLogger(DependentExecute.class); + private Logger logger = LoggerFactory.getLogger(DependentExecute.class); /** * constructor - * @param itemList item list - * @param relation relation + * + * @param itemList item list + * @param relation relation */ public DependentExecute(List itemList, DependentRelation relation) { this.dependItemList = itemList; @@ -84,6 +85,7 @@ public class DependentExecute { /** * get dependent item for one dependent item + * * @param dependentItem dependent item * @param currentTime current time * @return DependResult @@ -95,6 +97,7 @@ public class DependentExecute { /** * calculate dependent result for one dependent item. + * * @param dependentItem dependent item * @param dateIntervals date intervals * @return dateIntervals @@ -103,17 +106,17 @@ public class DependentExecute { List dateIntervals) { DependResult result = DependResult.FAILED; - for(DateInterval dateInterval : dateIntervals){ + for (DateInterval dateInterval : dateIntervals) { ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionCode(), - dateInterval); - if(processInstance == null){ + dateInterval); + if (processInstance == null) { return DependResult.WAITING; } // need to check workflow for updates, so get all task and check the task state if (dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)) { result = dependResultByProcessInstance(processInstance); } else { - result = getDependTaskResult(dependentItem.getDepTasks(),processInstance); + result = getDependTaskResult(dependentItem.getDepTasks(), processInstance); } if (result != DependResult.SUCCESS) { break; @@ -124,6 +127,7 @@ public class DependentExecute { /** * depend type = depend_all + * * @return */ private DependResult dependResultByProcessInstance(ProcessInstance processInstance) { @@ -138,6 +142,7 @@ public class DependentExecute { /** * get depend task result + * * @param taskName * @param processInstance * @return @@ -173,14 +178,15 @@ public class DependentExecute { * find the last one process instance that : * 1. manual run and finish between the interval * 2. schedule run and schedule time between the interval - * @param definitionCode definition code - * @param dateInterval date interval + * + * @param definitionCode definition code + * @param dateInterval date interval * @return ProcessInstance */ private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval) { ProcessInstance runningProcess = processService.findLastRunningProcess(definitionCode, dateInterval.getStartTime(), dateInterval.getEndTime()); - if(runningProcess != null){ + if (runningProcess != null) { return runningProcess; } @@ -200,6 +206,7 @@ public class DependentExecute { /** * get dependent result by task/process instance state + * * @param state state * @return DependResult */ @@ -216,6 +223,7 @@ public class DependentExecute { /** * get dependent result by task instance state when task instance is null + * * @param state state * @return DependResult */ @@ -232,6 +240,7 @@ public class DependentExecute { /** * judge depend item finished + * * @param currentTime current time * @return boolean */ @@ -245,6 +254,7 @@ public class DependentExecute { /** * get model depend result + * * @param currentTime current time * @return DependResult */ @@ -265,8 +275,9 @@ public class DependentExecute { /** * get dependent item result - * @param item item - * @param currentTime current time + * + * @param item item + * @param currentTime current time * @return DependResult */ private DependResult getDependResultForItem(DependentItem item, Date currentTime) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java index 154ef9d049..2166b1f068 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java @@ -59,7 +59,7 @@ import java.util.Map; import org.slf4j.Logger; /** - * procedure task + * procedure task */ public class ProcedureTask extends AbstractTask { @@ -77,7 +77,7 @@ public class ProcedureTask extends AbstractTask { * constructor * * @param taskExecutionContext taskExecutionContext - * @param logger logger + * @param logger logger */ public ProcedureTask(TaskExecutionContext taskExecutionContext, Logger logger) { super(taskExecutionContext, logger); @@ -118,7 +118,6 @@ public class ProcedureTask extends AbstractTask { // get jdbc connection connection = DatasourceUtil.getConnection(dbType, connectionParam); - // combining local and global parameters Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), @@ -146,14 +145,14 @@ public class ProcedureTask extends AbstractTask { logger.error("procedure task error", e); throw e; } finally { - close(stmt,connection); + close(stmt, connection); } } /** * print outParameter * - * @param stmt CallableStatement + * @param stmt CallableStatement * @param outParameterMap outParameterMap * @throws SQLException SQLException */ @@ -175,13 +174,13 @@ public class ProcedureTask extends AbstractTask { /** * get output parameter * - * @param stmt CallableStatement + * @param stmt CallableStatement * @param paramsMap paramsMap * @return outParameterMap * @throws Exception Exception */ private Map getOutParameterMap(CallableStatement stmt, Map paramsMap) throws Exception { - Map outParameterMap = new HashMap<>(); + Map outParameterMap = new HashMap<>(); if (procedureParameters.getLocalParametersMap() == null) { return outParameterMap; } @@ -195,7 +194,7 @@ public class ProcedureTask extends AbstractTask { int index = 1; for (Property property : userDefParamsList) { logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}" - ,property.getProp(), + , property.getProp(), property.getDirect(), property.getType(), property.getValue()); @@ -203,9 +202,9 @@ public class ProcedureTask extends AbstractTask { if (property.getDirect().equals(Direct.IN)) { ParameterUtils.setInParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue()); } else if (property.getDirect().equals(Direct.OUT)) { - setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue()); + setOutParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue()); property.setValue(paramsMap.get(property.getProp()).getValue()); - outParameterMap.put(index,property); + outParameterMap.put(index, property); } index++; } @@ -227,33 +226,34 @@ public class ProcedureTask extends AbstractTask { } /** - * close jdbc resource - * - * @param stmt stmt - * @param connection connection - */ + * close jdbc resource + * + * @param stmt stmt + * @param connection connection + */ private void close(PreparedStatement stmt, Connection connection) { if (stmt != null) { try { stmt.close(); } catch (SQLException e) { - logger.error("close prepared statement error : {}",e.getMessage(),e); + logger.error("close prepared statement error : {}", e.getMessage(), e); } } if (connection != null) { try { connection.close(); } catch (SQLException e) { - logger.error("close connection error : {}",e.getMessage(),e); + logger.error("close connection error : {}", e.getMessage(), e); } } } /** * get output parameter - * @param stmt stmt - * @param index index - * @param prop prop + * + * @param stmt stmt + * @param index index + * @param prop prop * @param dataType dataType * @throws SQLException SQLException */ @@ -299,13 +299,13 @@ public class ProcedureTask extends AbstractTask { /** * set out parameter * - * @param index index - * @param stmt stmt + * @param index index + * @param stmt stmt * @param dataType dataType - * @param value value + * @param value value * @throws Exception exception */ - private void setOutParameter(int index,CallableStatement stmt,DataType dataType,String value)throws Exception { + private void setOutParameter(int index, CallableStatement stmt, DataType dataType, String value) throws Exception { int sqlType; switch (dataType) { case VARCHAR: