diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index e917dda5f2..a5e426e55a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.builder; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; @@ -99,6 +100,7 @@ public class TaskExecutionContextBuilder { /** * build DataxTask related info + * * @param dataxTaskExecutionContext dataxTaskExecutionContext * @return TaskExecutionContextBuilder */ @@ -107,6 +109,17 @@ public class TaskExecutionContextBuilder { return this; } + /** + * build procedureTask related info + * + * @param procedureTaskExecutionContext + * @return + */ + public TaskExecutionContextBuilder buildProcedureTaskRelatedInfo(ProcedureTaskExecutionContext procedureTaskExecutionContext){ + taskExecutionContext.setProcedureTaskExecutionContext(procedureTaskExecutionContext); + return this; + } + /** * create diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/ProcedureTaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/ProcedureTaskExecutionContext.java new file mode 100644 index 0000000000..d5fc97c8de --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/ProcedureTaskExecutionContext.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.entity; + +import java.io.Serializable; + +/** + * master/worker task transport + */ +public class ProcedureTaskExecutionContext implements Serializable{ + + /** + * connectionParams + */ + private String connectionParams; + + public String getConnectionParams() { + return connectionParams; + } + + public void setConnectionParams(String connectionParams) { + this.connectionParams = connectionParams; + } + + @Override + public String toString() { + return "ProcedureTaskExecutionContext{" + + "connectionParams='" + connectionParams + '\'' + + '}'; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index 03ce0ee6bf..cdaa0f0d36 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -178,6 +178,11 @@ public class TaskExecutionContext implements Serializable{ */ private DataxTaskExecutionContext dataxTaskExecutionContext; + /** + * procedure TaskExecutionContext + */ + private ProcedureTaskExecutionContext procedureTaskExecutionContext; + public int getTaskInstanceId() { return taskInstanceId; } @@ -402,6 +407,14 @@ public class TaskExecutionContext implements Serializable{ this.dataxTaskExecutionContext = dataxTaskExecutionContext; } + public ProcedureTaskExecutionContext getProcedureTaskExecutionContext() { + return procedureTaskExecutionContext; + } + + public void setProcedureTaskExecutionContext(ProcedureTaskExecutionContext procedureTaskExecutionContext) { + this.procedureTaskExecutionContext = procedureTaskExecutionContext; + } + public Command toCommand(){ TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(); requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(this)); @@ -445,6 +458,7 @@ public class TaskExecutionContext implements Serializable{ ", workerGroup='" + workerGroup + '\'' + ", sqlTaskExecutionContext=" + sqlTaskExecutionContext + ", dataxTaskExecutionContext=" + dataxTaskExecutionContext + + ", procedureTaskExecutionContext=" + procedureTaskExecutionContext + '}'; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java index ce185a53cc..b54f39f32d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.EnumUtils; @@ -32,10 +33,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; -import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; -import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; -import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.server.entity.TaskPriority; +import org.apache.dolphinscheduler.server.entity.*; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; @@ -149,10 +147,13 @@ public class TaskUpdateQueueConsumer extends Thread{ SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext(); DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext(); + ProcedureTaskExecutionContext procedureTaskExecutionContext = new ProcedureTaskExecutionContext(); TaskType taskType = TaskType.valueOf(taskInstance.getTaskType()); + + TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class); + // SQL task if (taskType == TaskType.SQL){ - TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class); SqlParameters sqlParameters = JSONObject.parseObject(taskNode.getParams(), SqlParameters.class); int datasourceId = sqlParameters.getDatasource(); DataSource datasource = processService.findDataSourceById(datasourceId); @@ -175,17 +176,29 @@ public class TaskUpdateQueueConsumer extends Thread{ } + // DATAX task if (taskType == TaskType.DATAX){ } + // procedure task + if (taskType == TaskType.PROCEDURE){ + ProcedureParameters procedureParameters = JSONObject.parseObject(taskNode.getParams(), ProcedureParameters.class); + int datasourceId = procedureParameters.getDatasource(); + DataSource datasource = processService.findDataSourceById(datasourceId); + procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams()); + } + + + return TaskExecutionContextBuilder.get() .buildTaskInstanceRelatedInfo(taskInstance) .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance()) .buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine()) .buildSQLTaskRelatedInfo(sqlTaskExecutionContext) .buildDataxTaskRelatedInfo(dataxTaskExecutionContext) + .buildProcedureTaskRelatedInfo(procedureTaskExecutionContext) .create(); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index c1925e0adc..4ba0d489c2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -137,7 +137,7 @@ public class MasterSchedulerService extends Thread { } } } catch (Exception e){ - logger.error("master scheduler thread exception",e); + logger.error("master scheduler thread error",e); } finally{ zkMasterClient.releaseMutex(mutex); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java index 82cb6a216e..aa614efd53 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java @@ -19,10 +19,7 @@ package org.apache.dolphinscheduler.server.worker.task.processdure; import com.alibaba.fastjson.JSONObject; import com.cronutils.utils.StringUtils; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.common.enums.DataType; -import org.apache.dolphinscheduler.common.enums.Direct; -import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.common.enums.*; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; @@ -30,12 +27,9 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; -import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import java.sql.*; @@ -56,11 +50,6 @@ public class ProcedureTask extends AbstractTask { */ private ProcedureParameters procedureParameters; - /** - * process service - */ - private ProcessService processService; - /** * base datasource */ @@ -90,8 +79,6 @@ public class ProcedureTask extends AbstractTask { if (!procedureParameters.checkParameters()) { throw new RuntimeException("procedure task params is not valid"); } - - this.processService = SpringApplicationContext.getBean(ProcessService.class); } @Override @@ -100,34 +87,20 @@ public class ProcedureTask extends AbstractTask { String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId()); Thread.currentThread().setName(threadLoggerInfoName); - logger.info("processdure type : {}, datasource : {}, method : {} , localParams : {}", + logger.info("procedure type : {}, datasource : {}, method : {} , localParams : {}", procedureParameters.getType(), procedureParameters.getDatasource(), procedureParameters.getMethod(), procedureParameters.getLocalParams()); - DataSource dataSource = processService.findDataSourceById(procedureParameters.getDatasource()); - if (dataSource == null){ - logger.error("datasource not exists"); - exitStatusCode = -1; - throw new IllegalArgumentException("datasource not found"); - } - - logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}", - dataSource.getName(), - dataSource.getType(), - dataSource.getNote(), - dataSource.getUserId(), - dataSource.getConnectionParams()); - Connection connection = null; CallableStatement stmt = null; try { // load class - DataSourceFactory.loadClass(dataSource.getType()); + DataSourceFactory.loadClass(DbType.valueOf(procedureParameters.getType())); // get datasource - baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(), - dataSource.getConnectionParams()); + baseDataSource = DataSourceFactory.getDatasource(DbType.valueOf(procedureParameters.getType()), + taskExecutionContext.getProcedureTaskExecutionContext().getConnectionParams()); // get jdbc connection connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), @@ -150,90 +123,148 @@ public class ProcedureTask extends AbstractTask { userDefParamsList = procedureParameters.getLocalParametersMap().values(); } - String method = ""; - // no parameters - if (CollectionUtils.isEmpty(userDefParamsList)){ - method = "{call " + procedureParameters.getMethod() + "}"; - }else { // exists parameters - int size = userDefParamsList.size(); - StringBuilder parameter = new StringBuilder(); - parameter.append("("); - for (int i = 0 ;i < size - 1; i++){ - parameter.append("?,"); - } - parameter.append("?)"); - method = "{call " + procedureParameters.getMethod() + parameter.toString()+ "}"; - } + String method = getCallMethod(userDefParamsList); logger.info("call method : {}",method); + // call method stmt = connection.prepareCall(method); - Boolean failed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED; - Boolean warnfailed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED; - if(failed || warnfailed){ - stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout()); - } - Map outParameterMap = new HashMap<>(); - if (userDefParamsList != null && userDefParamsList.size() > 0){ - int index = 1; - for (Property property : userDefParamsList){ - logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}" - ,property.getProp(), - property.getDirect(), - property.getType(), - property.getValue()); - // set parameters - if (property.getDirect().equals(Direct.IN)){ - ParameterUtils.setInParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue()); - }else if (property.getDirect().equals(Direct.OUT)){ - setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue()); - property.setValue(paramsMap.get(property.getProp()).getValue()); - outParameterMap.put(index,property); - } - index++; - } - } + // set timeout + setTimeout(stmt); + + // outParameterMap + Map outParameterMap = getOutParameterMap(stmt, paramsMap, userDefParamsList); stmt.executeUpdate(); /** * print the output parameters to the log */ - Iterator> iter = outParameterMap.entrySet().iterator(); - while (iter.hasNext()){ - Map.Entry en = iter.next(); - - int index = en.getKey(); - Property property = en.getValue(); - String prop = property.getProp(); - DataType dataType = property.getType(); - // get output parameter - getOutputParameter(stmt, index, prop, dataType); - } + printOutParameter(stmt, outParameterMap); - exitStatusCode = 0; + setExitStatusCode(Constants.EXIT_CODE_SUCCESS); }catch (Exception e){ - logger.error(e.getMessage(),e); - exitStatusCode = -1; - throw new RuntimeException(String.format("process interrupted. exit status code is %d",exitStatusCode)); + setExitStatusCode(Constants.EXIT_CODE_FAILURE); + logger.error("procedure task error",e); + throw e; } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - exitStatusCode = -1; - logger.error(e.getMessage(),e); - } + close(stmt,connection); + } + } + + /** + * get call method + * @param userDefParamsList userDefParamsList + * @return method + */ + private String getCallMethod(Collection userDefParamsList) { + String method;// no parameters + if (CollectionUtils.isEmpty(userDefParamsList)){ + method = "{call " + procedureParameters.getMethod() + "}"; + }else { // exists parameters + int size = userDefParamsList.size(); + StringBuilder parameter = new StringBuilder(); + parameter.append("("); + for (int i = 0 ;i < size - 1; i++){ + parameter.append("?,"); } - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - exitStatusCode = -1; - logger.error(e.getMessage(), e); + parameter.append("?)"); + method = "{call " + procedureParameters.getMethod() + parameter.toString()+ "}"; + } + return method; + } + + /** + * print outParameter + * @param stmt CallableStatement + * @param outParameterMap outParameterMap + * @throws SQLException + */ + private void printOutParameter(CallableStatement stmt, + Map outParameterMap) throws SQLException { + Iterator> iter = outParameterMap.entrySet().iterator(); + while (iter.hasNext()){ + Map.Entry en = iter.next(); + + int index = en.getKey(); + Property property = en.getValue(); + String prop = property.getProp(); + DataType dataType = property.getType(); + // get output parameter + getOutputParameter(stmt, index, prop, dataType); + } + } + + /** + * get output parameter + * + * @param stmt CallableStatement + * @param paramsMap paramsMap + * @param userDefParamsList userDefParamsList + * @return outParameterMap + * @throws Exception + */ + private Map getOutParameterMap(CallableStatement stmt, + Map paramsMap, + Collection userDefParamsList) throws Exception { + Map outParameterMap = new HashMap<>(); + if (userDefParamsList != null && userDefParamsList.size() > 0){ + int index = 1; + for (Property property : userDefParamsList){ + logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}" + ,property.getProp(), + property.getDirect(), + property.getType(), + property.getValue()); + // set parameters + if (property.getDirect().equals(Direct.IN)){ + ParameterUtils.setInParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue()); + }else if (property.getDirect().equals(Direct.OUT)){ + setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue()); + property.setValue(paramsMap.get(property.getProp()).getValue()); + outParameterMap.put(index,property); } + index++; + } + } + return outParameterMap; + } + + /** + * set timtou + * @param stmt CallableStatement + * @throws SQLException + */ + private void setTimeout(CallableStatement stmt) throws SQLException { + Boolean failed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED; + Boolean warnfailed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED; + if(failed || warnfailed){ + stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout()); + } + } + + /** + * close jdbc resource + * + * @param stmt + * @param connection + */ + private void close(PreparedStatement stmt, + Connection connection){ + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + + } + } + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + } } }