|
|
|
@ -42,6 +42,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
|
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; |
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; |
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils; |
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils; |
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.dq.rule.RuleManager; |
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.dq.rule.parameter.DataQualityConfiguration; |
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.dq.utils.spark.SparkArgsUtils; |
|
|
|
@ -52,6 +53,7 @@ import java.io.File;
|
|
|
|
|
import java.time.LocalDateTime; |
|
|
|
|
import java.time.format.DateTimeFormatter; |
|
|
|
|
import java.util.ArrayList; |
|
|
|
|
import java.util.HashMap; |
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.Map; |
|
|
|
|
|
|
|
|
@ -160,8 +162,6 @@ public class DataQualityTask extends AbstractYarnTask {
|
|
|
|
|
List<String> args = new ArrayList<>(); |
|
|
|
|
|
|
|
|
|
args.add(SPARK2_COMMAND); |
|
|
|
|
|
|
|
|
|
// other parameters
|
|
|
|
|
args.addAll(SparkArgsUtils.buildArgs(dataQualityParameters.getSparkParameters())); |
|
|
|
|
|
|
|
|
|
// replace placeholder
|
|
|
|
@ -169,10 +169,15 @@ public class DataQualityTask extends AbstractYarnTask {
|
|
|
|
|
|
|
|
|
|
String command = null; |
|
|
|
|
|
|
|
|
|
if (null != paramsMap) { |
|
|
|
|
command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap)); |
|
|
|
|
if (MapUtils.isEmpty(paramsMap)) { |
|
|
|
|
paramsMap = new HashMap<>(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (MapUtils.isNotEmpty(dqTaskExecutionContext.getParamsMap())) { |
|
|
|
|
paramsMap.putAll(dqTaskExecutionContext.getParamsMap()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap)); |
|
|
|
|
logger.info("data quality task command: {}", command); |
|
|
|
|
|
|
|
|
|
return command; |
|
|
|
@ -181,8 +186,8 @@ public class DataQualityTask extends AbstractYarnTask {
|
|
|
|
|
@Override |
|
|
|
|
protected void setMainJarName() { |
|
|
|
|
ResourceInfo mainJar = new ResourceInfo(); |
|
|
|
|
String basePath = System.getProperty("user.dir").replace(File.separator + "bin", File.separator + "libs"); |
|
|
|
|
mainJar.setRes(basePath + File.separator + CommonUtils.getDataQualityJarName()); |
|
|
|
|
String basePath = System.getProperty("user.dir").replace(File.separator + "bin", ""); |
|
|
|
|
mainJar.setRes(basePath + File.separator + "libs" + File.separator + CommonUtils.getDataQualityJarName()); |
|
|
|
|
dataQualityParameters.getSparkParameters().setMainJar(mainJar); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|