Browse Source

ProcedureTask process test modify (#2159)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify

* worker remove db

* ShellTask modify

* master persistence processId and appIds

* master persistence processId and appIds

* master add kill task logic

* master add kill task logic

* master add kill task logic

* javadoc error modify

* remove chinese log

* executeDirectly method add Override

* remote module modify

* TaskKillResponseProcessor command type modify

* create buildKillCommand

* host add host:port format

* host add host:port format

* TaskAckProcessor modify

* TaskAckProcessor modify

* task prioriry refator

* remove ITaskQueue

* task prioriry refator

* remove ITaskQueue

* TaskPriority refactor

* remove logs

* WorkerServer refactor

* MasterSchedulerService modify

* WorkerConfig listen port modify

* modify master and worker listen port

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* Encapsulate the parameters required by sqltask

* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization

* AbstractTask modify

* ProcedureTask optimization

* MasterSchedulerService modify

* TaskUpdateQueueConsumer modify

* test

Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
pull/2/head
qiaozhanwei 4 years ago committed by GitHub
parent
commit
607ec3d174
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
  2. 46
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/ProcedureTaskExecutionContext.java
  3. 14
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
  4. 23
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
  5. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  6. 229
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.builder;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
@ -99,6 +100,7 @@ public class TaskExecutionContextBuilder {
/**
* build DataxTask related info
*
* @param dataxTaskExecutionContext dataxTaskExecutionContext
* @return TaskExecutionContextBuilder
*/
@ -107,6 +109,17 @@ public class TaskExecutionContextBuilder {
return this;
}
/**
* build procedureTask related info
*
* @param procedureTaskExecutionContext
* @return
*/
public TaskExecutionContextBuilder buildProcedureTaskRelatedInfo(ProcedureTaskExecutionContext procedureTaskExecutionContext){
taskExecutionContext.setProcedureTaskExecutionContext(procedureTaskExecutionContext);
return this;
}
/**
* create

46
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/ProcedureTaskExecutionContext.java

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.entity;
import java.io.Serializable;
/**
* master/worker task transport
*/
public class ProcedureTaskExecutionContext implements Serializable{
/**
* connectionParams
*/
private String connectionParams;
public String getConnectionParams() {
return connectionParams;
}
public void setConnectionParams(String connectionParams) {
this.connectionParams = connectionParams;
}
@Override
public String toString() {
return "ProcedureTaskExecutionContext{" +
"connectionParams='" + connectionParams + '\'' +
'}';
}
}

14
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java

@ -178,6 +178,11 @@ public class TaskExecutionContext implements Serializable{
*/
private DataxTaskExecutionContext dataxTaskExecutionContext;
/**
* procedure TaskExecutionContext
*/
private ProcedureTaskExecutionContext procedureTaskExecutionContext;
public int getTaskInstanceId() {
return taskInstanceId;
}
@ -402,6 +407,14 @@ public class TaskExecutionContext implements Serializable{
this.dataxTaskExecutionContext = dataxTaskExecutionContext;
}
public ProcedureTaskExecutionContext getProcedureTaskExecutionContext() {
return procedureTaskExecutionContext;
}
public void setProcedureTaskExecutionContext(ProcedureTaskExecutionContext procedureTaskExecutionContext) {
this.procedureTaskExecutionContext = procedureTaskExecutionContext;
}
public Command toCommand(){
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(this));
@ -445,6 +458,7 @@ public class TaskExecutionContext implements Serializable{
", workerGroup='" + workerGroup + '\'' +
", sqlTaskExecutionContext=" + sqlTaskExecutionContext +
", dataxTaskExecutionContext=" + dataxTaskExecutionContext +
", procedureTaskExecutionContext=" + procedureTaskExecutionContext +
'}';
}
}

23
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.EnumUtils;
@ -32,10 +33,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskPriority;
import org.apache.dolphinscheduler.server.entity.*;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
@ -149,10 +147,13 @@ public class TaskUpdateQueueConsumer extends Thread{
SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
ProcedureTaskExecutionContext procedureTaskExecutionContext = new ProcedureTaskExecutionContext();
TaskType taskType = TaskType.valueOf(taskInstance.getTaskType());
TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
// SQL task
if (taskType == TaskType.SQL){
TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
SqlParameters sqlParameters = JSONObject.parseObject(taskNode.getParams(), SqlParameters.class);
int datasourceId = sqlParameters.getDatasource();
DataSource datasource = processService.findDataSourceById(datasourceId);
@ -175,17 +176,29 @@ public class TaskUpdateQueueConsumer extends Thread{
}
// DATAX task
if (taskType == TaskType.DATAX){
}
// procedure task
if (taskType == TaskType.PROCEDURE){
ProcedureParameters procedureParameters = JSONObject.parseObject(taskNode.getParams(), ProcedureParameters.class);
int datasourceId = procedureParameters.getDatasource();
DataSource datasource = processService.findDataSourceById(datasourceId);
procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
}
return TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
.buildSQLTaskRelatedInfo(sqlTaskExecutionContext)
.buildDataxTaskRelatedInfo(dataxTaskExecutionContext)
.buildProcedureTaskRelatedInfo(procedureTaskExecutionContext)
.create();
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -137,7 +137,7 @@ public class MasterSchedulerService extends Thread {
}
}
} catch (Exception e){
logger.error("master scheduler thread exception",e);
logger.error("master scheduler thread error",e);
} finally{
zkMasterClient.releaseMutex(mutex);
}

229
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java

@ -19,10 +19,7 @@ package org.apache.dolphinscheduler.server.worker.task.processdure;
import com.alibaba.fastjson.JSONObject;
import com.cronutils.utils.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
@ -30,12 +27,9 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.sql.*;
@ -56,11 +50,6 @@ public class ProcedureTask extends AbstractTask {
*/
private ProcedureParameters procedureParameters;
/**
* process service
*/
private ProcessService processService;
/**
* base datasource
*/
@ -90,8 +79,6 @@ public class ProcedureTask extends AbstractTask {
if (!procedureParameters.checkParameters()) {
throw new RuntimeException("procedure task params is not valid");
}
this.processService = SpringApplicationContext.getBean(ProcessService.class);
}
@Override
@ -100,34 +87,20 @@ public class ProcedureTask extends AbstractTask {
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
logger.info("processdure type : {}, datasource : {}, method : {} , localParams : {}",
logger.info("procedure type : {}, datasource : {}, method : {} , localParams : {}",
procedureParameters.getType(),
procedureParameters.getDatasource(),
procedureParameters.getMethod(),
procedureParameters.getLocalParams());
DataSource dataSource = processService.findDataSourceById(procedureParameters.getDatasource());
if (dataSource == null){
logger.error("datasource not exists");
exitStatusCode = -1;
throw new IllegalArgumentException("datasource not found");
}
logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
dataSource.getName(),
dataSource.getType(),
dataSource.getNote(),
dataSource.getUserId(),
dataSource.getConnectionParams());
Connection connection = null;
CallableStatement stmt = null;
try {
// load class
DataSourceFactory.loadClass(dataSource.getType());
DataSourceFactory.loadClass(DbType.valueOf(procedureParameters.getType()));
// get datasource
baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(),
dataSource.getConnectionParams());
baseDataSource = DataSourceFactory.getDatasource(DbType.valueOf(procedureParameters.getType()),
taskExecutionContext.getProcedureTaskExecutionContext().getConnectionParams());
// get jdbc connection
connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
@ -150,90 +123,148 @@ public class ProcedureTask extends AbstractTask {
userDefParamsList = procedureParameters.getLocalParametersMap().values();
}
String method = "";
// no parameters
if (CollectionUtils.isEmpty(userDefParamsList)){
method = "{call " + procedureParameters.getMethod() + "}";
}else { // exists parameters
int size = userDefParamsList.size();
StringBuilder parameter = new StringBuilder();
parameter.append("(");
for (int i = 0 ;i < size - 1; i++){
parameter.append("?,");
}
parameter.append("?)");
method = "{call " + procedureParameters.getMethod() + parameter.toString()+ "}";
}
String method = getCallMethod(userDefParamsList);
logger.info("call method : {}",method);
// call method
stmt = connection.prepareCall(method);
Boolean failed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED;
Boolean warnfailed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED;
if(failed || warnfailed){
stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
}
Map<Integer,Property> outParameterMap = new HashMap<>();
if (userDefParamsList != null && userDefParamsList.size() > 0){
int index = 1;
for (Property property : userDefParamsList){
logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}"
,property.getProp(),
property.getDirect(),
property.getType(),
property.getValue());
// set parameters
if (property.getDirect().equals(Direct.IN)){
ParameterUtils.setInParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
}else if (property.getDirect().equals(Direct.OUT)){
setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
property.setValue(paramsMap.get(property.getProp()).getValue());
outParameterMap.put(index,property);
}
index++;
}
}
// set timeout
setTimeout(stmt);
// outParameterMap
Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, paramsMap, userDefParamsList);
stmt.executeUpdate();
/**
* print the output parameters to the log
*/
Iterator<Map.Entry<Integer, Property>> iter = outParameterMap.entrySet().iterator();
while (iter.hasNext()){
Map.Entry<Integer, Property> en = iter.next();
int index = en.getKey();
Property property = en.getValue();
String prop = property.getProp();
DataType dataType = property.getType();
// get output parameter
getOutputParameter(stmt, index, prop, dataType);
}
printOutParameter(stmt, outParameterMap);
exitStatusCode = 0;
setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
}catch (Exception e){
logger.error(e.getMessage(),e);
exitStatusCode = -1;
throw new RuntimeException(String.format("process interrupted. exit status code is %d",exitStatusCode));
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
logger.error("procedure task error",e);
throw e;
}
finally {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
exitStatusCode = -1;
logger.error(e.getMessage(),e);
}
close(stmt,connection);
}
}
/**
* get call method
* @param userDefParamsList userDefParamsList
* @return method
*/
private String getCallMethod(Collection<Property> userDefParamsList) {
String method;// no parameters
if (CollectionUtils.isEmpty(userDefParamsList)){
method = "{call " + procedureParameters.getMethod() + "}";
}else { // exists parameters
int size = userDefParamsList.size();
StringBuilder parameter = new StringBuilder();
parameter.append("(");
for (int i = 0 ;i < size - 1; i++){
parameter.append("?,");
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
exitStatusCode = -1;
logger.error(e.getMessage(), e);
parameter.append("?)");
method = "{call " + procedureParameters.getMethod() + parameter.toString()+ "}";
}
return method;
}
/**
* print outParameter
* @param stmt CallableStatement
* @param outParameterMap outParameterMap
* @throws SQLException
*/
private void printOutParameter(CallableStatement stmt,
Map<Integer, Property> outParameterMap) throws SQLException {
Iterator<Map.Entry<Integer, Property>> iter = outParameterMap.entrySet().iterator();
while (iter.hasNext()){
Map.Entry<Integer, Property> en = iter.next();
int index = en.getKey();
Property property = en.getValue();
String prop = property.getProp();
DataType dataType = property.getType();
// get output parameter
getOutputParameter(stmt, index, prop, dataType);
}
}
/**
* get output parameter
*
* @param stmt CallableStatement
* @param paramsMap paramsMap
* @param userDefParamsList userDefParamsList
* @return outParameterMap
* @throws Exception
*/
private Map<Integer, Property> getOutParameterMap(CallableStatement stmt,
Map<String, Property> paramsMap,
Collection<Property> userDefParamsList) throws Exception {
Map<Integer,Property> outParameterMap = new HashMap<>();
if (userDefParamsList != null && userDefParamsList.size() > 0){
int index = 1;
for (Property property : userDefParamsList){
logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}"
,property.getProp(),
property.getDirect(),
property.getType(),
property.getValue());
// set parameters
if (property.getDirect().equals(Direct.IN)){
ParameterUtils.setInParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue());
}else if (property.getDirect().equals(Direct.OUT)){
setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
property.setValue(paramsMap.get(property.getProp()).getValue());
outParameterMap.put(index,property);
}
index++;
}
}
return outParameterMap;
}
/**
* set timtou
* @param stmt CallableStatement
* @throws SQLException
*/
private void setTimeout(CallableStatement stmt) throws SQLException {
Boolean failed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED;
Boolean warnfailed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED;
if(failed || warnfailed){
stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
}
}
/**
* close jdbc resource
*
* @param stmt
* @param connection
*/
private void close(PreparedStatement stmt,
Connection connection){
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
}
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
}
}
}

Loading…
Cancel
Save