Browse Source

[SPI][Task]fix Task parse Err (#6244)

* [SPI][Task]fix Task parse Err
2.0.7-release
Kirs 3 years ago committed by GitHub
parent
commit
fab4eae923
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
  2. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
  3. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  4. 115
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/DataxTaskRequest.java
  5. 2
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/ProcedureTaskExecutionContext.java
  6. 45
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/ProcedureTaskRequest.java
  7. 80
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskRequest.java
  8. 115
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SqoopTaskRequest.java
  9. 13
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java
  10. 25
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
  11. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskChannel.java
  12. 1
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
  13. 1
      dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
  14. 14
      dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
  15. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTaskChannel.java
  16. 20
      dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
  17. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTaskChannel.java
  18. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/ISourceGenerator.java
  19. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/ITargetGenerator.java
  20. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/SqoopJobGenerator.java
  21. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/HdfsSourceGenerator.java
  22. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/HiveSourceGenerator.java
  23. 8
      dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/MysqlSourceGenerator.java
  24. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/HdfsTargetGenerator.java
  25. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/HiveTargetGenerator.java
  26. 8
      dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/MysqlTargetGenerator.java

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java

@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -42,7 +42,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;

115
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/DataxTaskRequest.java

@ -1,115 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.task.request;
/**
* DataX Task ExecutionContext
* to master/worker task transport
*/
public class DataxTaskRequest extends TaskRequest {
/**
* dataSourceId
*/
private int dataSourceId;
/**
* sourcetype
*/
private int sourcetype;
/**
* sourceConnectionParams
*/
private String sourceConnectionParams;
/**
* dataTargetId
*/
private int dataTargetId;
/**
* targetType
*/
private int targetType;
/**
* targetConnectionParams
*/
private String targetConnectionParams;
public int getDataSourceId() {
return dataSourceId;
}
public void setDataSourceId(int dataSourceId) {
this.dataSourceId = dataSourceId;
}
public int getSourcetype() {
return sourcetype;
}
public void setSourcetype(int sourcetype) {
this.sourcetype = sourcetype;
}
public String getSourceConnectionParams() {
return sourceConnectionParams;
}
public void setSourceConnectionParams(String sourceConnectionParams) {
this.sourceConnectionParams = sourceConnectionParams;
}
public int getDataTargetId() {
return dataTargetId;
}
public void setDataTargetId(int dataTargetId) {
this.dataTargetId = dataTargetId;
}
public int getTargetType() {
return targetType;
}
public void setTargetType(int targetType) {
this.targetType = targetType;
}
public String getTargetConnectionParams() {
return targetConnectionParams;
}
public void setTargetConnectionParams(String targetConnectionParams) {
this.targetConnectionParams = targetConnectionParams;
}
@Override
public String toString() {
return "DataxTaskExecutionContext{"
+ "dataSourceId=" + dataSourceId
+ ", sourcetype=" + sourcetype
+ ", sourceConnectionParams='" + sourceConnectionParams + '\''
+ ", dataTargetId=" + dataTargetId
+ ", targetType=" + targetType
+ ", targetConnectionParams='" + targetConnectionParams + '\''
+ '}';
}
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/ProcedureTaskExecutionContext.java → dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/ProcedureTaskExecutionContext.java

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.entity;
package org.apache.dolphinscheduler.spi.task.request;
import java.io.Serializable;

45
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/ProcedureTaskRequest.java

@ -1,45 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.task.request;
/**
* Procedure Task ExecutionContext
* to master/worker task transport
*/
public class ProcedureTaskRequest extends TaskRequest {
/**
* connectionParams
*/
private String connectionParams;
public String getConnectionParams() {
return connectionParams;
}
public void setConnectionParams(String connectionParams) {
this.connectionParams = connectionParams;
}
@Override
public String toString() {
return "ProcedureTaskExecutionContext{"
+ "connectionParams='" + connectionParams + '\''
+ '}';
}
}

80
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskRequest.java

@ -1,80 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.task.request;
import org.apache.dolphinscheduler.spi.task.UdfFuncBean;
import org.apache.dolphinscheduler.spi.task.UdfFuncBean.UdfFuncDeserializer;
import java.util.Map;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
/**
* SQL Task ExecutionContext
* to master/worker task transport
*/
public class SQLTaskRequest extends TaskRequest {
/**
* warningGroupId
*/
private int warningGroupId;
/**
* connectionParams
*/
private String connectionParams;
/**
* udf function tenant code map
*/
@JsonDeserialize(keyUsing = UdfFuncDeserializer.class)
private Map<UdfFuncBean,String> udfFuncTenantCodeMap;
public int getWarningGroupId() {
return warningGroupId;
}
public void setWarningGroupId(int warningGroupId) {
this.warningGroupId = warningGroupId;
}
public Map<UdfFuncBean, String> getUdfFuncTenantCodeMap() {
return udfFuncTenantCodeMap;
}
public void setUdfFuncTenantCodeMap(Map<UdfFuncBean, String> udfFuncTenantCodeMap) {
this.udfFuncTenantCodeMap = udfFuncTenantCodeMap;
}
public String getConnectionParams() {
return connectionParams;
}
public void setConnectionParams(String connectionParams) {
this.connectionParams = connectionParams;
}
@Override
public String toString() {
return "SQLTaskExecutionContext{"
+ "warningGroupId=" + warningGroupId
+ ", connectionParams='" + connectionParams + '\''
+ ", udfFuncTenantCodeMap=" + udfFuncTenantCodeMap
+ '}';
}
}

115
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SqoopTaskRequest.java

@ -1,115 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.task.request;
/**
* Sqoop Task ExecutionContext
* to master/worker task transport
*/
public class SqoopTaskRequest extends TaskRequest {
/**
* dataSourceId
*/
private int dataSourceId;
/**
* sourcetype
*/
private int sourcetype;
/**
* sourceConnectionParams
*/
private String sourceConnectionParams;
/**
* dataTargetId
*/
private int dataTargetId;
/**
* targetType
*/
private int targetType;
/**
* targetConnectionParams
*/
private String targetConnectionParams;
public int getDataSourceId() {
return dataSourceId;
}
public void setDataSourceId(int dataSourceId) {
this.dataSourceId = dataSourceId;
}
public int getSourcetype() {
return sourcetype;
}
public void setSourcetype(int sourcetype) {
this.sourcetype = sourcetype;
}
public String getSourceConnectionParams() {
return sourceConnectionParams;
}
public void setSourceConnectionParams(String sourceConnectionParams) {
this.sourceConnectionParams = sourceConnectionParams;
}
public int getDataTargetId() {
return dataTargetId;
}
public void setDataTargetId(int dataTargetId) {
this.dataTargetId = dataTargetId;
}
public int getTargetType() {
return targetType;
}
public void setTargetType(int targetType) {
this.targetType = targetType;
}
public String getTargetConnectionParams() {
return targetConnectionParams;
}
public void setTargetConnectionParams(String targetConnectionParams) {
this.targetConnectionParams = targetConnectionParams;
}
@Override
public String toString() {
return "SqoopTaskExecutionContext{"
+ "dataSourceId=" + dataSourceId
+ ", sourcetype=" + sourcetype
+ ", sourceConnectionParams='" + sourceConnectionParams + '\''
+ ", dataTargetId=" + dataTargetId
+ ", targetType=" + targetType
+ ", targetConnectionParams='" + targetConnectionParams + '\''
+ '}';
}
}

13
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java

@ -199,6 +199,11 @@ public class TaskRequest {
*/
private DataxTaskExecutionContext dataxTaskExecutionContext;
/**
* procedure TaskExecutionContext
*/
private ProcedureTaskExecutionContext procedureTaskExecutionContext;
/**
* sqoop TaskExecutionContext
*/
@ -467,4 +472,12 @@ public class TaskRequest {
public void setSqoopTaskExecutionContext(SqoopTaskExecutionContext sqoopTaskExecutionContext) {
this.sqoopTaskExecutionContext = sqoopTaskExecutionContext;
}
public ProcedureTaskExecutionContext getProcedureTaskExecutionContext() {
return procedureTaskExecutionContext;
}
public void setProcedureTaskExecutionContext(ProcedureTaskExecutionContext procedureTaskExecutionContext) {
this.procedureTaskExecutionContext = procedureTaskExecutionContext;
}
}

25
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java

@ -34,7 +34,8 @@ import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils;
import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskRequest;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.CollectionUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@ -107,14 +108,14 @@ public class DataxTask extends AbstractTaskExecutor {
/**
* taskExecutionContext
*/
private DataxTaskRequest taskExecutionContext;
private TaskRequest taskExecutionContext;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
*/
public DataxTask(DataxTaskRequest taskExecutionContext) {
public DataxTask(TaskRequest taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
@ -232,14 +233,14 @@ public class DataxTask extends AbstractTaskExecutor {
* @throws SQLException if error throws SQLException
*/
private List<ObjectNode> buildDataxJobContentJson() {
DataxTaskExecutionContext dataxTaskExecutionContext = taskExecutionContext.getDataxTaskExecutionContext();
BaseConnectionParam dataSourceCfg = (BaseConnectionParam) DatasourceUtil.buildConnectionParams(
DbType.of(taskExecutionContext.getSourcetype()),
taskExecutionContext.getSourceConnectionParams());
DbType.of(dataxTaskExecutionContext.getSourcetype()),
dataxTaskExecutionContext.getSourceConnectionParams());
BaseConnectionParam dataTargetCfg = (BaseConnectionParam) DatasourceUtil.buildConnectionParams(
DbType.of(taskExecutionContext.getTargetType()),
taskExecutionContext.getTargetConnectionParams());
DbType.of(dataxTaskExecutionContext.getTargetType()),
dataxTaskExecutionContext.getTargetConnectionParams());
List<ObjectNode> readerConnArr = new ArrayList<>();
ObjectNode readerConn = JSONUtils.createObjectNode();
@ -260,7 +261,7 @@ public class DataxTask extends AbstractTaskExecutor {
readerParam.putArray("connection").addAll(readerConnArr);
ObjectNode reader = JSONUtils.createObjectNode();
reader.put("name", DataxUtils.getReaderPluginName(DbType.of(taskExecutionContext.getSourcetype())));
reader.put("name", DataxUtils.getReaderPluginName(DbType.of(dataxTaskExecutionContext.getSourcetype())));
reader.set("parameter", readerParam);
List<ObjectNode> writerConnArr = new ArrayList<>();
@ -275,8 +276,8 @@ public class DataxTask extends AbstractTaskExecutor {
writerParam.put("username", dataTargetCfg.getUser());
writerParam.put("password", decodePassword(dataTargetCfg.getPassword()));
String[] columns = parsingSqlColumnNames(DbType.of(taskExecutionContext.getSourcetype()),
DbType.of(taskExecutionContext.getTargetType()),
String[] columns = parsingSqlColumnNames(DbType.of(dataxTaskExecutionContext.getSourcetype()),
DbType.of(dataxTaskExecutionContext.getTargetType()),
dataSourceCfg, dataXParameters.getSql());
ArrayNode columnArr = writerParam.putArray("column");
@ -301,7 +302,7 @@ public class DataxTask extends AbstractTaskExecutor {
}
ObjectNode writer = JSONUtils.createObjectNode();
writer.put("name", DataxUtils.getWriterPluginName(DbType.of(taskExecutionContext.getTargetType())));
writer.put("name", DataxUtils.getWriterPluginName(DbType.of(dataxTaskExecutionContext.getTargetType())));
writer.set("parameter", writerParam);
List<ObjectNode> contentList = new ArrayList<>();

3
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskChannel.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.datax;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskRequest;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
public class DataxTaskChannel implements TaskChannel {
@ -31,6 +30,6 @@ public class DataxTaskChannel implements TaskChannel {
@Override
public AbstractTask createTask(TaskRequest taskRequest) {
return new DataxTask((DataxTaskRequest) taskRequest);
return new DataxTask(taskRequest);
}
}

1
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java

@ -122,7 +122,6 @@ public class FlinkTask extends AbstractYarnTask {
if (resourceId == 0) {
resourceName = mainJar.getRes();
} else {
//when update resource maybe has error ,也许也可以交给上层去做控制 需要看资源是否可以抽象为共性 目前来讲我认为是可以的
resourceName = mainJar.getResourceName().replaceFirst("/", "");
}
mainJar.setRes(resourceName);

1
dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java

@ -125,7 +125,6 @@ public class MapReduceTask extends AbstractYarnTask {
if (resourceId == 0) {
resourceName = mainJar.getRes();
} else {
//when update resource maybe has error ,也许也可以交给上层去做控制 需要看资源是否可以抽象为共性 目前来讲我认为是可以的
resourceName = mainJar.getResourceName().replaceFirst("/", "");
}
mainJar.setRes(resourceName);

14
dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java

@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.spi.task.Direct;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils;
import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskRequest;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.CollectionUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@ -44,7 +44,6 @@ import java.sql.SQLException;
import java.sql.Types;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
@ -60,14 +59,14 @@ public class ProcedureTask extends AbstractTaskExecutor {
/**
* taskExecutionContext
*/
private ProcedureTaskRequest taskExecutionContext;
private TaskRequest taskExecutionContext;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
*/
public ProcedureTask(ProcedureTaskRequest taskExecutionContext) {
public ProcedureTask(TaskRequest taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
@ -101,7 +100,7 @@ public class ProcedureTask extends AbstractTaskExecutor {
DbType dbType = DbType.valueOf(procedureParameters.getType());
// get datasource
ConnectionParam connectionParam = DatasourceUtil.buildConnectionParams(DbType.valueOf(procedureParameters.getType()),
taskExecutionContext.getConnectionParams());
taskExecutionContext.getProcedureTaskExecutionContext().getConnectionParams());
// get jdbc connection
connection = DatasourceUtil.getConnection(dbType, connectionParam);
@ -142,10 +141,7 @@ public class ProcedureTask extends AbstractTaskExecutor {
*/
private void printOutParameter(CallableStatement stmt,
Map<Integer, Property> outParameterMap) throws SQLException {
Iterator<Map.Entry<Integer, Property>> iter = outParameterMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Integer, Property> en = iter.next();
for (Map.Entry<Integer, Property> en : outParameterMap.entrySet()) {
int index = en.getKey();
Property property = en.getValue();
String prop = property.getProp();

3
dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTaskChannel.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.procedure;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskRequest;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
public class ProcedureTaskChannel implements TaskChannel {
@ -31,6 +30,6 @@ public class ProcedureTaskChannel implements TaskChannel {
@Override
public AbstractTask createTask(TaskRequest taskRequest) {
return new ProcedureTask((ProcedureTaskRequest) taskRequest);
return new ProcedureTask(taskRequest);
}
}

20
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java

@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils;
import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.HashMap;
@ -44,18 +44,18 @@ public class SqoopTask extends AbstractYarnTask {
/**
* taskExecutionContext
*/
private final SqoopTaskRequest sqoopTaskExecutionContext;
private final TaskRequest taskExecutionContext;
public SqoopTask(SqoopTaskRequest taskExecutionContext) {
public SqoopTask(TaskRequest taskExecutionContext) {
super(taskExecutionContext);
this.sqoopTaskExecutionContext = taskExecutionContext;
this.taskExecutionContext = taskExecutionContext;
}
@Override
public void init() {
logger.info("sqoop task params {}", sqoopTaskExecutionContext.getTaskParams());
logger.info("sqoop task params {}", taskExecutionContext.getTaskParams());
sqoopParameters =
JSONUtils.parseObject(sqoopTaskExecutionContext.getTaskParams(), SqoopParameters.class);
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqoopParameters.class);
//check sqoop task params
if (null == sqoopParameters) {
throw new IllegalArgumentException("Sqoop Task params is null");
@ -70,16 +70,16 @@ public class SqoopTask extends AbstractYarnTask {
protected String buildCommand() {
//get sqoop scripts
SqoopJobGenerator generator = new SqoopJobGenerator();
String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext);
String script = generator.generateSqoopJob(sqoopParameters, taskExecutionContext);
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(sqoopTaskExecutionContext, getParameters());
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
if (MapUtils.isEmpty(paramsMap)) {
paramsMap = new HashMap<>();
}
if (MapUtils.isNotEmpty(sqoopTaskExecutionContext.getParamsMap())) {
paramsMap.putAll(sqoopTaskExecutionContext.getParamsMap());
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));

3
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTaskChannel.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.sqoop;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
public class SqoopTaskChannel implements TaskChannel {
@ -31,6 +30,6 @@ public class SqoopTaskChannel implements TaskChannel {
@Override
public AbstractTask createTask(TaskRequest taskRequest) {
return new SqoopTask((SqoopTaskRequest) taskRequest);
return new SqoopTask(taskRequest);
}
}

4
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/ISourceGenerator.java

@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.plugin.task.sqoop.generator;
import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
/**
* Source Generator Interface
@ -32,5 +32,5 @@ public interface ISourceGenerator {
* @param taskExecutionContext taskExecutionContext
* @return source script
*/
String generate(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext);
String generate(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext);
}

4
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/ITargetGenerator.java

@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.plugin.task.sqoop.generator;
import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
/**
* Target Generator Interface
@ -32,5 +32,5 @@ public interface ITargetGenerator {
* @param taskExecutionContext taskExecutionContext
* @return target script
*/
String generate(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext);
String generate(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext);
}

4
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/SqoopJobGenerator.java

@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.sqoop.generator.targets.HdfsTarge
import org.apache.dolphinscheduler.plugin.task.sqoop.generator.targets.HiveTargetGenerator;
import org.apache.dolphinscheduler.plugin.task.sqoop.generator.targets.MysqlTargetGenerator;
import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
/**
* Sqoop Job Scripts Generator
@ -64,7 +64,7 @@ public class SqoopJobGenerator {
* @param sqoopParameters sqoop params
* @return sqoop scripts
*/
public String generateSqoopJob(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext) {
public String generateSqoopJob(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext) {
String sqoopScripts = "";

4
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/HdfsSourceGenerator.java

@ -23,7 +23,7 @@ import static org.apache.dolphinscheduler.spi.task.TaskConstants.SPACE;
import org.apache.dolphinscheduler.plugin.task.sqoop.generator.ISourceGenerator;
import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters;
import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.sources.SourceHdfsParameter;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@ -38,7 +38,7 @@ public class HdfsSourceGenerator implements ISourceGenerator {
private static final Logger logger = LoggerFactory.getLogger(HdfsSourceGenerator.class);
@Override
public String generate(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext) {
public String generate(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext) {
StringBuilder hdfsSourceSb = new StringBuilder();

4
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/HiveSourceGenerator.java

@ -26,7 +26,7 @@ import static org.apache.dolphinscheduler.spi.task.TaskConstants.SPACE;
import org.apache.dolphinscheduler.plugin.task.sqoop.generator.ISourceGenerator;
import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters;
import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.sources.SourceHiveParameter;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@ -41,7 +41,7 @@ public class HiveSourceGenerator implements ISourceGenerator {
private static final Logger logger = LoggerFactory.getLogger(HiveSourceGenerator.class);
@Override
public String generate(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext) {
public String generate(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext) {
StringBuilder hiveSourceSb = new StringBuilder();

8
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/sources/MysqlSourceGenerator.java

@ -42,7 +42,7 @@ import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters;
import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@ -59,7 +59,7 @@ public class MysqlSourceGenerator implements ISourceGenerator {
private static final Logger logger = LoggerFactory.getLogger(MysqlSourceGenerator.class);
@Override
public String generate(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext) {
public String generate(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext) {
StringBuilder mysqlSourceSb = new StringBuilder();
@ -68,8 +68,8 @@ public class MysqlSourceGenerator implements ISourceGenerator {
if (null != sourceMysqlParameter) {
BaseConnectionParam baseDataSource = (BaseConnectionParam) DatasourceUtil.buildConnectionParams(
DbType.of(taskExecutionContext.getSourcetype()),
taskExecutionContext.getSourceConnectionParams());
DbType.of(taskExecutionContext.getSqoopTaskExecutionContext().getSourcetype()),
taskExecutionContext.getSqoopTaskExecutionContext().getSourceConnectionParams());
if (null != baseDataSource) {

4
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/HdfsTargetGenerator.java

@ -29,7 +29,7 @@ import static org.apache.dolphinscheduler.spi.task.TaskConstants.SPACE;
import org.apache.dolphinscheduler.plugin.task.sqoop.generator.ITargetGenerator;
import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters;
import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.targets.TargetHdfsParameter;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@ -44,7 +44,7 @@ public class HdfsTargetGenerator implements ITargetGenerator {
private static final Logger logger = LoggerFactory.getLogger(HdfsTargetGenerator.class);
@Override
public String generate(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext) {
public String generate(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext) {
StringBuilder hdfsTargetSb = new StringBuilder();

4
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/HiveTargetGenerator.java

@ -32,7 +32,7 @@ import static org.apache.dolphinscheduler.spi.task.TaskConstants.SPACE;
import org.apache.dolphinscheduler.plugin.task.sqoop.generator.ITargetGenerator;
import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters;
import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.targets.TargetHiveParameter;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@ -47,7 +47,7 @@ public class HiveTargetGenerator implements ITargetGenerator {
private static final Logger logger = LoggerFactory.getLogger(HiveTargetGenerator.class);
@Override
public String generate(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext) {
public String generate(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext) {
StringBuilder hiveTargetSb = new StringBuilder();

8
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/generator/targets/MysqlTargetGenerator.java

@ -37,7 +37,7 @@ import org.apache.dolphinscheduler.plugin.task.sqoop.generator.ITargetGenerator;
import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters;
import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@ -52,7 +52,7 @@ public class MysqlTargetGenerator implements ITargetGenerator {
private static final Logger logger = LoggerFactory.getLogger(MysqlTargetGenerator.class);
@Override
public String generate(SqoopParameters sqoopParameters, SqoopTaskRequest taskExecutionContext) {
public String generate(SqoopParameters sqoopParameters, TaskRequest taskExecutionContext) {
StringBuilder mysqlTargetSb = new StringBuilder();
@ -64,8 +64,8 @@ public class MysqlTargetGenerator implements ITargetGenerator {
// get datasource
BaseConnectionParam baseDataSource = (BaseConnectionParam) DatasourceUtil.buildConnectionParams(
DbType.of(taskExecutionContext.getTargetType()),
taskExecutionContext.getTargetConnectionParams());
DbType.of(taskExecutionContext.getSqoopTaskExecutionContext().getTargetType()),
taskExecutionContext.getSqoopTaskExecutionContext().getTargetConnectionParams());
if (null != baseDataSource) {

Loading…
Cancel
Save