@ -56,6 +56,7 @@ public class DataQualityTaskTest {
inputParameterValue . put ( "src_datasource_id" , "2" ) ;
inputParameterValue . put ( "src_table" , "src_result" ) ;
inputParameterValue . put ( "check_type" , "0" ) ;
inputParameterValue . put ( "src_database" , "test" ) ;
inputParameterValue . put ( "operator" , "3" ) ;
inputParameterValue . put ( "threshold" , "1" ) ;
inputParameterValue . put ( "failure_strategy" , "0" ) ;
@ -92,7 +93,7 @@ public class DataQualityTaskTest {
+ "{\"type\":\"JDBC\",\"config\":{\"database\":\"test\",\"password\":\"test\",\"driver\":\"com.mysql.cj.jdbc.Driver\","
+ "\"user\":\"test\",\"table\":\"dqc_statistics_value\",\"url\":"
+ "\"jdbc:mysql://localhost:3306/test?allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false\","
+ "\"sql\":\"select 21 as process_definition_id,287 as task_instance_id,10 as rule_id,'DN/MS5NLTSLVZ/++KEJ9BHPQSEN6/UY/EV5TWI1IRRY =' "
+ "\"sql\":\"select 21 as process_definition_id,287 as task_instance_id,10 as rule_id,'SA8QJTSZZNEXNIXHUL5LTGRTYPWKJ4XY85VPS/NCKES =' "
+ "as unique_code,'table_count.total'AS statistics_name,"
+ "table_count.total AS statistics_value,'2021-08-12 10:15:48' as data_time,'2021-08-12 10:15:48' as create_time,"
+ "'2021-08-12 10:15:48' as update_time from table_count\"}}]}" ;
@ -135,6 +136,20 @@ public class DataQualityTaskTest {
srcDatasourceId . setCreateTime ( new Date ( ) ) ;
srcDatasourceId . setUpdateTime ( new Date ( ) ) ;
DqRuleInputEntry srcDatabase = new DqRuleInputEntry ( ) ;
srcDatabase . setTitle ( "源数据库" ) ;
srcDatabase . setField ( "src_database" ) ;
srcDatabase . setType ( FormType . CASCADER . getFormType ( ) ) ;
srcDatabase . setCanEdit ( true ) ;
srcDatabase . setIsShow ( true ) ;
srcDatabase . setValue ( null ) ;
srcDatabase . setPlaceholder ( "$t(src_database)" ) ;
srcDatabase . setOptionSourceType ( OptionSourceType . DEFAULT . getCode ( ) ) ;
srcDatabase . setInputType ( InputType . DEFAULT . getCode ( ) ) ;
srcDatabase . setValueType ( ValueType . NUMBER . getCode ( ) ) ;
srcDatabase . setCreateTime ( new Date ( ) ) ;
srcDatabase . setUpdateTime ( new Date ( ) ) ;
DqRuleInputEntry srcTable = new DqRuleInputEntry ( ) ;
srcTable . setTitle ( "源数据表" ) ;
srcTable . setField ( "src_table" ) ;
@ -258,6 +273,7 @@ public class DataQualityTaskTest {
defaultInputEntryList . add ( srcConnectorType ) ;
defaultInputEntryList . add ( srcDatasourceId ) ;
defaultInputEntryList . add ( srcDatabase ) ;
defaultInputEntryList . add ( srcTable ) ;
defaultInputEntryList . add ( srcFilter ) ;
defaultInputEntryList . add ( srcField ) ;
@ -344,6 +360,20 @@ public class DataQualityTaskTest {
srcConnectorType . setCreateTime ( new Date ( ) ) ;
srcConnectorType . setUpdateTime ( new Date ( ) ) ;
DqRuleInputEntry srcDatabase = new DqRuleInputEntry ( ) ;
srcDatabase . setTitle ( "源数据库" ) ;
srcDatabase . setField ( "src_database" ) ;
srcDatabase . setType ( FormType . CASCADER . getFormType ( ) ) ;
srcDatabase . setCanEdit ( true ) ;
srcDatabase . setIsShow ( true ) ;
srcDatabase . setValue ( null ) ;
srcDatabase . setPlaceholder ( "$t(src_database)" ) ;
srcDatabase . setOptionSourceType ( OptionSourceType . DEFAULT . getCode ( ) ) ;
srcDatabase . setInputType ( InputType . DEFAULT . getCode ( ) ) ;
srcDatabase . setValueType ( ValueType . NUMBER . getCode ( ) ) ;
srcDatabase . setCreateTime ( new Date ( ) ) ;
srcDatabase . setUpdateTime ( new Date ( ) ) ;
DqRuleInputEntry srcTable = new DqRuleInputEntry ( ) ;
srcTable . setTitle ( "源数据表" ) ;
srcTable . setField ( "src_table" ) ;
@ -448,6 +478,7 @@ public class DataQualityTaskTest {
defaultInputEntryList . add ( afterFailure ) ;
defaultInputEntryList . add ( srcConnectorType ) ;
defaultInputEntryList . add ( srcDatasourceId ) ;
defaultInputEntryList . add ( srcDatabase ) ;
defaultInputEntryList . add ( srcTable ) ;
defaultInputEntryList . add ( statisticsName ) ;
defaultInputEntryList . add ( statisticsExecuteSql ) ;
@ -457,6 +488,7 @@ public class DataQualityTaskTest {
inputParameterValue . put ( "src_connector_type" , "0" ) ;
inputParameterValue . put ( "src_datasource_id" , "2" ) ;
inputParameterValue . put ( "src_table" , "person" ) ;
inputParameterValue . put ( "src_database" , "test" ) ;
inputParameterValue . put ( "statistics_name" , "miss" ) ;
inputParameterValue . put ( "statistics_execute_sql" ,
"select count(*) as miss from ${src_table} where (sex = null or sex='') and age=1" ) ;
@ -536,7 +568,7 @@ public class DataQualityTaskTest {
+ "\"org.postgresql.Driver\",\"user\":\"test\",\"table\":\"t_ds_dq_task_statistics_value\",\"url\":"
+ "\"jdbc:postgresql://localhost:5432/dolphinscheduler?stringtype=unspecified&characterEncoding="
+ "UTF-8&allowMultiQueries=true\",\"sql\":\"select 1 as process_definition_id,1 as "
+ "task_instance_id,1 as rule_id,'FNWZLNCPWWF4ZWKO/LYENOPL6JPV1SHPPWQ9YSYLOCU =' as unique_code,'miss'AS statistics_name,miss AS statistics_value,"
+ "task_instance_id,1 as rule_id,'IGTZ9I6KWVEPXFFJKDVMO6QB6URHHXK0NINS9GAOUEA =' as unique_code,'miss'AS statistics_name,miss AS statistics_value,"
+ "'2021-08-30 00:00:00' as data_time,'2021-08-30 00:00:00' as create_time,'2021-08-30 00:00:00' "
+ "as update_time from test_person\"}}]}" ;
@ -580,6 +612,20 @@ public class DataQualityTaskTest {
srcConnectorType . setCreateTime ( new Date ( ) ) ;
srcConnectorType . setUpdateTime ( new Date ( ) ) ;
DqRuleInputEntry srcDatabase = new DqRuleInputEntry ( ) ;
srcDatabase . setTitle ( "源数据库" ) ;
srcDatabase . setField ( "src_database" ) ;
srcDatabase . setType ( FormType . CASCADER . getFormType ( ) ) ;
srcDatabase . setCanEdit ( true ) ;
srcDatabase . setIsShow ( true ) ;
srcDatabase . setValue ( null ) ;
srcDatasourceId . setPlaceholder ( "$t(src_database)" ) ;
srcDatabase . setOptionSourceType ( OptionSourceType . DEFAULT . getCode ( ) ) ;
srcDatabase . setInputType ( InputType . DEFAULT . getCode ( ) ) ;
srcDatabase . setValueType ( ValueType . NUMBER . getCode ( ) ) ;
srcDatabase . setCreateTime ( new Date ( ) ) ;
srcDatabase . setUpdateTime ( new Date ( ) ) ;
DqRuleInputEntry srcTable = new DqRuleInputEntry ( ) ;
srcTable . setTitle ( "源数据表" ) ;
srcTable . setField ( "src_table" ) ;
@ -636,6 +682,20 @@ public class DataQualityTaskTest {
targetDatasourceId . setValue ( "1" ) ;
targetDatasourceId . setPlaceholder ( "Please select target datasource" ) ;
targetDatasourceId . setOptionSourceType ( OptionSourceType . DATASOURCE_ID . getCode ( ) ) ;
DqRuleInputEntry targetDatabase = new DqRuleInputEntry ( ) ;
targetDatabase . setTitle ( "目标数据库" ) ;
targetDatabase . setField ( "src_database" ) ;
targetDatabase . setType ( FormType . CASCADER . getFormType ( ) ) ;
targetDatabase . setCanEdit ( true ) ;
targetDatabase . setIsShow ( true ) ;
targetDatabase . setValue ( null ) ;
targetDatabase . setPlaceholder ( "$t(src_database)" ) ;
targetDatabase . setOptionSourceType ( OptionSourceType . DEFAULT . getCode ( ) ) ;
targetDatabase . setInputType ( InputType . DEFAULT . getCode ( ) ) ;
targetDatabase . setValueType ( ValueType . NUMBER . getCode ( ) ) ;
targetDatabase . setCreateTime ( new Date ( ) ) ;
targetDatabase . setUpdateTime ( new Date ( ) ) ;
targetDatasourceId . setInputType ( InputType . DEFAULT . getCode ( ) ) ;
DqRuleInputEntry targetTable = new DqRuleInputEntry ( ) ;
@ -731,12 +791,14 @@ public class DataQualityTaskTest {
defaultInputEntryList . add ( srcConnectorType ) ;
defaultInputEntryList . add ( srcDatasourceId ) ;
defaultInputEntryList . add ( srcDatabase ) ;
defaultInputEntryList . add ( srcTable ) ;
defaultInputEntryList . add ( statisticsName ) ;
defaultInputEntryList . add ( statisticsExecuteSql ) ;
defaultInputEntryList . add ( targetConnectorType ) ;
defaultInputEntryList . add ( targetDatasourceId ) ;
defaultInputEntryList . add ( targetDatabase ) ;
defaultInputEntryList . add ( targetTable ) ;
defaultInputEntryList . add ( comparisonName ) ;
defaultInputEntryList . add ( comparisonExecuteSql ) ;
@ -746,11 +808,13 @@ public class DataQualityTaskTest {
Map < String , String > inputParameterValue = new HashMap < > ( ) ;
inputParameterValue . put ( "src_connector_type" , "0" ) ;
inputParameterValue . put ( "src_datasource_id" , "2" ) ;
inputParameterValue . put ( "src_database" , "test" ) ;
inputParameterValue . put ( "src_table" , "test1" ) ;
inputParameterValue . put ( "statistics_name" , "src" ) ;
inputParameterValue . put ( "statistics_execute_sql" , "select count(*) as src from ${src_table} where c1>20" ) ;
inputParameterValue . put ( "target_connector_type" , "2" ) ;
inputParameterValue . put ( "target_datasource_id" , "3" ) ;
inputParameterValue . put ( "target_database" , "default" ) ;
inputParameterValue . put ( "target_table" , "test1_1" ) ;
inputParameterValue . put ( "comparison_name" , "target" ) ;
inputParameterValue . put ( "comparison_execute_sql" , "select count(*) as target from ${target_table} where c1>20" ) ;
@ -1071,10 +1135,12 @@ public class DataQualityTaskTest {
Map < String , String > inputParameterValue = new HashMap < > ( ) ;
inputParameterValue . put ( "src_connector_type" , "0" ) ;
inputParameterValue . put ( "src_datasource_id" , "2" ) ;
inputParameterValue . put ( "src_database" , "test" ) ;
inputParameterValue . put ( "src_table" , "demo_src" ) ;
inputParameterValue . put ( "src_filter" , "age<100" ) ;
inputParameterValue . put ( "target_connector_type" , "2" ) ;
inputParameterValue . put ( "target_datasource_id" , "3" ) ;
inputParameterValue . put ( "target_database" , "default" ) ;
inputParameterValue . put ( "target_table" , "demo_src" ) ;
inputParameterValue . put ( "target_filter" , "age<100" ) ;
inputParameterValue . put ( "mapping_columns" ,
@ -1165,7 +1231,7 @@ public class DataQualityTaskTest {
+ "\"password\":\"test\",\"driver\":\"org.postgresql.Driver\",\"user\":\"test\",\"table\":"
+ "\"t_ds_dq_task_statistics_value\",\"url\":\"jdbc:postgresql://localhost:5432/dolphinscheduler?stringtype=unspecified"
+ "&characterEncoding=UTF-8&allowMultiQueries=true\",\"sql\":\"select 1 as process_definition_id,1 as task_instance_id,"
+ "3 as rule_id,'T4MB2XTVSL+VA/L6XCU1M/ELHKYOMGVNBBE5KHBXHHI =' as unique_code,'miss_count.miss'AS statistics_name,miss_count.miss "
+ "3 as rule_id,'NGRU3S2KPG0GQ4BIHSW9C/LKX3NHN+CEUNU7AMNSPJK =' as unique_code,'miss_count.miss'AS statistics_name,miss_count.miss "
+ "AS statistics_value,'2021-08-30 00:00:00' as data_time,"
+ "'2021-08-30 00:00:00' as create_time,'2021-08-30 00:00:00' as update_time from miss_count\"}},{\"type\":\"hdfs_file\","
+ "\"config\":{\"path\":\"hdfs://localhost:8022/user/ods/data_quality_error_data/1_1_test\",\"input_table\":\"miss_items\"}}]}" ;