Browse Source

Merge remote-tracking branch 'upstream/dev' into json_split

# Conflicts:
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
pull/3/MERGE
baoliang 4 years ago
parent
commit
7aa8b518ac
  1. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  2. 35
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
  3. 48
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -38,7 +38,6 @@ import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.utils.RegexUtils;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ReleaseState;

35
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java

@ -70,12 +70,13 @@ public class DependentExecute {
/**
* logger
*/
private Logger logger = LoggerFactory.getLogger(DependentExecute.class);
private Logger logger = LoggerFactory.getLogger(DependentExecute.class);
/**
* constructor
* @param itemList item list
* @param relation relation
*
* @param itemList item list
* @param relation relation
*/
public DependentExecute(List<DependentItem> itemList, DependentRelation relation) {
this.dependItemList = itemList;
@ -84,6 +85,7 @@ public class DependentExecute {
/**
* get dependent item for one dependent item
*
* @param dependentItem dependent item
* @param currentTime current time
* @return DependResult
@ -95,6 +97,7 @@ public class DependentExecute {
/**
* calculate dependent result for one dependent item.
*
* @param dependentItem dependent item
* @param dateIntervals date intervals
* @return dateIntervals
@ -103,17 +106,17 @@ public class DependentExecute {
List<DateInterval> dateIntervals) {
DependResult result = DependResult.FAILED;
for(DateInterval dateInterval : dateIntervals){
for (DateInterval dateInterval : dateIntervals) {
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionCode(),
dateInterval);
if(processInstance == null){
dateInterval);
if (processInstance == null) {
return DependResult.WAITING;
}
// need to check workflow for updates, so get all task and check the task state
if (dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)) {
result = dependResultByProcessInstance(processInstance);
} else {
result = getDependTaskResult(dependentItem.getDepTasks(),processInstance);
result = getDependTaskResult(dependentItem.getDepTasks(), processInstance);
}
if (result != DependResult.SUCCESS) {
break;
@ -124,6 +127,7 @@ public class DependentExecute {
/**
* depend type = depend_all
*
* @return
*/
private DependResult dependResultByProcessInstance(ProcessInstance processInstance) {
@ -138,6 +142,7 @@ public class DependentExecute {
/**
* get depend task result
*
* @param taskName
* @param processInstance
* @return
@ -173,14 +178,15 @@ public class DependentExecute {
* find the last one process instance that :
* 1. manual run and finish between the interval
* 2. schedule run and schedule time between the interval
* @param definitionCode definition code
* @param dateInterval date interval
*
* @param definitionCode definition code
* @param dateInterval date interval
* @return ProcessInstance
*/
private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval) {
ProcessInstance runningProcess = processService.findLastRunningProcess(definitionCode, dateInterval.getStartTime(), dateInterval.getEndTime());
if(runningProcess != null){
if (runningProcess != null) {
return runningProcess;
}
@ -200,6 +206,7 @@ public class DependentExecute {
/**
* get dependent result by task/process instance state
*
* @param state state
* @return DependResult
*/
@ -216,6 +223,7 @@ public class DependentExecute {
/**
* get dependent result by task instance state when task instance is null
*
* @param state state
* @return DependResult
*/
@ -232,6 +240,7 @@ public class DependentExecute {
/**
* judge depend item finished
*
* @param currentTime current time
* @return boolean
*/
@ -245,6 +254,7 @@ public class DependentExecute {
/**
* get model depend result
*
* @param currentTime current time
* @return DependResult
*/
@ -265,8 +275,9 @@ public class DependentExecute {
/**
* get dependent item result
* @param item item
* @param currentTime current time
*
* @param item item
* @param currentTime current time
* @return DependResult
*/
private DependResult getDependResultForItem(DependentItem item, Date currentTime) {

48
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java

@ -59,7 +59,7 @@ import java.util.Map;
import org.slf4j.Logger;
/**
* procedure task
* procedure task
*/
public class ProcedureTask extends AbstractTask {
@ -77,7 +77,7 @@ public class ProcedureTask extends AbstractTask {
* constructor
*
* @param taskExecutionContext taskExecutionContext
* @param logger logger
* @param logger logger
*/
public ProcedureTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
@ -118,7 +118,6 @@ public class ProcedureTask extends AbstractTask {
// get jdbc connection
connection = DatasourceUtil.getConnection(dbType, connectionParam);
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
@ -146,14 +145,14 @@ public class ProcedureTask extends AbstractTask {
logger.error("procedure task error", e);
throw e;
} finally {
close(stmt,connection);
close(stmt, connection);
}
}
/**
* print outParameter
*
* @param stmt CallableStatement
* @param stmt CallableStatement
* @param outParameterMap outParameterMap
* @throws SQLException SQLException
*/
@ -175,13 +174,13 @@ public class ProcedureTask extends AbstractTask {
/**
* get output parameter
*
* @param stmt CallableStatement
* @param stmt CallableStatement
* @param paramsMap paramsMap
* @return outParameterMap
* @throws Exception Exception
*/
private Map<Integer, Property> getOutParameterMap(CallableStatement stmt, Map<String, Property> paramsMap) throws Exception {
Map<Integer,Property> outParameterMap = new HashMap<>();
Map<Integer, Property> outParameterMap = new HashMap<>();
if (procedureParameters.getLocalParametersMap() == null) {
return outParameterMap;
}
@ -195,7 +194,7 @@ public class ProcedureTask extends AbstractTask {
int index = 1;
for (Property property : userDefParamsList) {
logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}"
,property.getProp(),
, property.getProp(),
property.getDirect(),
property.getType(),
property.getValue());
@ -203,9 +202,9 @@ public class ProcedureTask extends AbstractTask {
if (property.getDirect().equals(Direct.IN)) {
ParameterUtils.setInParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue());
} else if (property.getDirect().equals(Direct.OUT)) {
setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
setOutParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue());
property.setValue(paramsMap.get(property.getProp()).getValue());
outParameterMap.put(index,property);
outParameterMap.put(index, property);
}
index++;
}
@ -227,33 +226,34 @@ public class ProcedureTask extends AbstractTask {
}
/**
* close jdbc resource
*
* @param stmt stmt
* @param connection connection
*/
* close jdbc resource
*
* @param stmt stmt
* @param connection connection
*/
private void close(PreparedStatement stmt, Connection connection) {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
logger.error("close prepared statement error : {}",e.getMessage(),e);
logger.error("close prepared statement error : {}", e.getMessage(), e);
}
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
logger.error("close connection error : {}",e.getMessage(),e);
logger.error("close connection error : {}", e.getMessage(), e);
}
}
}
/**
* get output parameter
* @param stmt stmt
* @param index index
* @param prop prop
*
* @param stmt stmt
* @param index index
* @param prop prop
* @param dataType dataType
* @throws SQLException SQLException
*/
@ -299,13 +299,13 @@ public class ProcedureTask extends AbstractTask {
/**
* set out parameter
*
* @param index index
* @param stmt stmt
* @param index index
* @param stmt stmt
* @param dataType dataType
* @param value value
* @param value value
* @throws Exception exception
*/
private void setOutParameter(int index,CallableStatement stmt,DataType dataType,String value)throws Exception {
private void setOutParameter(int index, CallableStatement stmt, DataType dataType, String value) throws Exception {
int sqlType;
switch (dataType) {
case VARCHAR:

Loading…
Cancel
Save