zixi0825
3 years ago
committed by
GitHub
116 changed files with 6408 additions and 2474 deletions
@ -0,0 +1,96 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.common.model; |
||||
|
||||
/** |
||||
* JdbcInfo |
||||
*/ |
||||
public class JdbcInfo { |
||||
|
||||
private String host; |
||||
|
||||
private String port; |
||||
|
||||
private String driverName; |
||||
|
||||
private String database; |
||||
|
||||
private String params; |
||||
|
||||
private String address; |
||||
|
||||
public String getHost() { |
||||
return host; |
||||
} |
||||
|
||||
public void setHost(String host) { |
||||
this.host = host; |
||||
} |
||||
|
||||
public String getPort() { |
||||
return port; |
||||
} |
||||
|
||||
public void setPort(String port) { |
||||
this.port = port; |
||||
} |
||||
|
||||
public String getDriverName() { |
||||
return driverName; |
||||
} |
||||
|
||||
public void setDriverName(String driverName) { |
||||
this.driverName = driverName; |
||||
} |
||||
|
||||
public String getDatabase() { |
||||
return database; |
||||
} |
||||
|
||||
public void setDatabase(String database) { |
||||
this.database = database; |
||||
} |
||||
|
||||
public String getParams() { |
||||
return params; |
||||
} |
||||
|
||||
public void setParams(String params) { |
||||
this.params = params; |
||||
} |
||||
|
||||
public String getAddress() { |
||||
return address; |
||||
} |
||||
|
||||
public void setAddress(String address) { |
||||
this.address = address; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "JdbcInfo{" |
||||
+ "host='" + host + '\'' |
||||
+ ", port='" + port + '\'' |
||||
+ ", driverName='" + driverName + '\'' |
||||
+ ", database='" + database + '\'' |
||||
+ ", params='" + params + '\'' |
||||
+ ", address='" + address + '\'' |
||||
+ '}'; |
||||
} |
||||
} |
@ -0,0 +1,152 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.dao.entity; |
||||
|
||||
import java.io.Serializable; |
||||
import java.util.Date; |
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType; |
||||
import com.baomidou.mybatisplus.annotation.TableField; |
||||
import com.baomidou.mybatisplus.annotation.TableId; |
||||
import com.baomidou.mybatisplus.annotation.TableName; |
||||
import com.fasterxml.jackson.annotation.JsonFormat; |
||||
|
||||
@TableName("t_ds_dq_comparison_type") |
||||
public class DqComparisonType implements Serializable { |
||||
/** |
||||
* primary key |
||||
*/ |
||||
@TableId(value = "id", type = IdType.AUTO) |
||||
private int id; |
||||
/** |
||||
* type |
||||
*/ |
||||
@TableField(value = "type") |
||||
private String type; |
||||
/** |
||||
* execute sql |
||||
*/ |
||||
@TableField(value = "execute_sql") |
||||
private String executeSql; |
||||
/** |
||||
* output table |
||||
*/ |
||||
@TableField(value = "output_table") |
||||
private String outputTable; |
||||
/** |
||||
* comparison name |
||||
*/ |
||||
@TableField(value = "name") |
||||
private String name; |
||||
/** |
||||
* is inner source |
||||
*/ |
||||
@TableField(value = "is_inner_source") |
||||
private Boolean isInnerSource; |
||||
/** |
||||
* create_time |
||||
*/ |
||||
@TableField(value = "create_time") |
||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") |
||||
private Date createTime; |
||||
/** |
||||
* update_time |
||||
*/ |
||||
@TableField(value = "update_time") |
||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") |
||||
private Date updateTime; |
||||
|
||||
public int getId() { |
||||
return id; |
||||
} |
||||
|
||||
public void setId(int id) { |
||||
this.id = id; |
||||
} |
||||
|
||||
public String getType() { |
||||
return type; |
||||
} |
||||
|
||||
public void setType(String type) { |
||||
this.type = type; |
||||
} |
||||
|
||||
public String getExecuteSql() { |
||||
return executeSql; |
||||
} |
||||
|
||||
public void setExecuteSql(String executeSql) { |
||||
this.executeSql = executeSql; |
||||
} |
||||
|
||||
public String getOutputTable() { |
||||
return outputTable; |
||||
} |
||||
|
||||
public void setOutputTable(String outputTable) { |
||||
this.outputTable = outputTable; |
||||
} |
||||
|
||||
public String getName() { |
||||
return name; |
||||
} |
||||
|
||||
public void setName(String name) { |
||||
this.name = name; |
||||
} |
||||
|
||||
public Boolean getInnerSource() { |
||||
return isInnerSource; |
||||
} |
||||
|
||||
public void setInnerSource(Boolean innerSource) { |
||||
isInnerSource = innerSource; |
||||
} |
||||
|
||||
public Date getCreateTime() { |
||||
return createTime; |
||||
} |
||||
|
||||
public void setCreateTime(Date createTime) { |
||||
this.createTime = createTime; |
||||
} |
||||
|
||||
public Date getUpdateTime() { |
||||
return updateTime; |
||||
} |
||||
|
||||
public void setUpdateTime(Date updateTime) { |
||||
this.updateTime = updateTime; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "DqComparisonType{" |
||||
+ "id=" + id |
||||
+ ", type='" |
||||
+ type + '\'' |
||||
+ ", executeSql='" + executeSql + '\'' |
||||
+ ", outputTable='" + outputTable + '\'' |
||||
+ ", name='" + name + '\'' |
||||
+ ", isInnerSource='" + isInnerSource + '\'' |
||||
+ ", createTime=" + createTime |
||||
+ ", updateTime=" + updateTime |
||||
+ '}'; |
||||
} |
||||
} |
@ -0,0 +1,224 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.dao.entity; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.dq.RuleType; |
||||
|
||||
import java.io.Serializable; |
||||
import java.util.Date; |
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType; |
||||
import com.baomidou.mybatisplus.annotation.TableField; |
||||
import com.baomidou.mybatisplus.annotation.TableId; |
||||
import com.baomidou.mybatisplus.annotation.TableName; |
||||
import com.fasterxml.jackson.annotation.JsonFormat; |
||||
|
||||
@TableName("t_ds_dq_task_statistics_value") |
||||
public class DqTaskStatisticsValue implements Serializable { |
||||
/** |
||||
* primary key |
||||
*/ |
||||
@TableId(value = "id", type = IdType.AUTO) |
||||
private int id; |
||||
/** |
||||
* process defined id |
||||
*/ |
||||
@TableField(value = "process_definition_id") |
||||
private long processDefinitionId; |
||||
/** |
||||
* process definition name |
||||
*/ |
||||
@TableField(exist = false) |
||||
private String processDefinitionName; |
||||
/** |
||||
* task instance id |
||||
*/ |
||||
@TableField(value = "task_instance_id") |
||||
private long taskInstanceId; |
||||
/** |
||||
* task name |
||||
*/ |
||||
@TableField(exist = false) |
||||
private String taskName; |
||||
/** |
||||
* rule id |
||||
*/ |
||||
@TableField(value = "rule_id") |
||||
private long ruleId; |
||||
/** |
||||
* rule type |
||||
*/ |
||||
@TableField(exist = false) |
||||
private RuleType ruleType; |
||||
/** |
||||
* rule name |
||||
*/ |
||||
@TableField(exist = false) |
||||
private String ruleName; |
||||
/** |
||||
* statistics value |
||||
*/ |
||||
@TableField(value = "statistics_value") |
||||
private double statisticsValue; |
||||
/** |
||||
* comparison value |
||||
*/ |
||||
@TableField(value = "statistics_name") |
||||
private String statisticsName; |
||||
/** |
||||
* data time |
||||
*/ |
||||
@TableField(value = "data_time") |
||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") |
||||
private Date dataTime; |
||||
/** |
||||
* create time |
||||
*/ |
||||
@TableField(value = "create_time") |
||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") |
||||
private Date createTime; |
||||
/** |
||||
* update time |
||||
*/ |
||||
@TableField(value = "update_time") |
||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") |
||||
private Date updateTime; |
||||
|
||||
public int getId() { |
||||
return id; |
||||
} |
||||
|
||||
public void setId(int id) { |
||||
this.id = id; |
||||
} |
||||
|
||||
public long getProcessDefinitionId() { |
||||
return processDefinitionId; |
||||
} |
||||
|
||||
public void setProcessDefinitionId(long processDefinitionId) { |
||||
this.processDefinitionId = processDefinitionId; |
||||
} |
||||
|
||||
public String getProcessDefinitionName() { |
||||
return processDefinitionName; |
||||
} |
||||
|
||||
public void setProcessDefinitionName(String processDefinitionName) { |
||||
this.processDefinitionName = processDefinitionName; |
||||
} |
||||
|
||||
public long getTaskInstanceId() { |
||||
return taskInstanceId; |
||||
} |
||||
|
||||
public void setTaskInstanceId(long taskInstanceId) { |
||||
this.taskInstanceId = taskInstanceId; |
||||
} |
||||
|
||||
public String getTaskName() { |
||||
return taskName; |
||||
} |
||||
|
||||
public void setTaskName(String taskName) { |
||||
this.taskName = taskName; |
||||
} |
||||
|
||||
public long getRuleId() { |
||||
return ruleId; |
||||
} |
||||
|
||||
public void setRuleId(long ruleId) { |
||||
this.ruleId = ruleId; |
||||
} |
||||
|
||||
public RuleType getRuleType() { |
||||
return ruleType; |
||||
} |
||||
|
||||
public void setRuleType(RuleType ruleType) { |
||||
this.ruleType = ruleType; |
||||
} |
||||
|
||||
public String getRuleName() { |
||||
return ruleName; |
||||
} |
||||
|
||||
public void setRuleName(String ruleName) { |
||||
this.ruleName = ruleName; |
||||
} |
||||
|
||||
public double getStatisticsValue() { |
||||
return statisticsValue; |
||||
} |
||||
|
||||
public void setStatisticsValue(double statisticsValue) { |
||||
this.statisticsValue = statisticsValue; |
||||
} |
||||
|
||||
public String getStatisticsName() { |
||||
return statisticsName; |
||||
} |
||||
|
||||
public void setStatisticsName(String statisticsName) { |
||||
this.statisticsName = statisticsName; |
||||
} |
||||
|
||||
public Date getDataTime() { |
||||
return dataTime; |
||||
} |
||||
|
||||
public void setDataTime(Date dataTime) { |
||||
this.dataTime = dataTime; |
||||
} |
||||
|
||||
public Date getCreateTime() { |
||||
return createTime; |
||||
} |
||||
|
||||
public void setCreateTime(Date createTime) { |
||||
this.createTime = createTime; |
||||
} |
||||
|
||||
public Date getUpdateTime() { |
||||
return updateTime; |
||||
} |
||||
|
||||
public void setUpdateTime(Date updateTime) { |
||||
this.updateTime = updateTime; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "DqTaskStatisticsValue{" |
||||
+ "id=" + id |
||||
+ ", processDefinitionId=" + processDefinitionId |
||||
+ ", processDefinitionName='" + processDefinitionName + '\'' |
||||
+ ", taskInstanceId=" + taskInstanceId |
||||
+ ", taskName='" + taskName + '\'' |
||||
+ ", ruleId=" + ruleId |
||||
+ ", ruleType=" + ruleType |
||||
+ ", ruleName='" + ruleName + '\'' |
||||
+ ", statisticsValue=" + statisticsValue |
||||
+ ", statisticsName='" + statisticsName + '\'' |
||||
+ ", dataTime=" + dataTime |
||||
+ ", createTime=" + createTime |
||||
+ ", updateTime=" + updateTime |
||||
+ '}'; |
||||
} |
||||
} |
@ -0,0 +1,156 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.dao.entity; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
|
||||
import java.io.Serializable; |
||||
import java.util.Date; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonFormat; |
||||
import com.fasterxml.jackson.annotation.JsonInclude; |
||||
import com.fasterxml.jackson.annotation.JsonInclude.Include; |
||||
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
|
||||
@JsonInclude(Include.NON_NULL) |
||||
public class TaskAlertContent implements Serializable { |
||||
@JsonProperty("taskInstanceId") |
||||
private int taskInstanceId; |
||||
@JsonProperty("taskName") |
||||
private String taskName; |
||||
@JsonProperty("taskType") |
||||
private String taskType; |
||||
@JsonProperty("processDefinitionId") |
||||
private int processDefinitionId; |
||||
@JsonProperty("processDefinitionName") |
||||
private String processDefinitionName; |
||||
@JsonProperty("processInstanceId") |
||||
private int processInstanceId; |
||||
@JsonProperty("processInstanceName") |
||||
private String processInstanceName; |
||||
@JsonProperty("state") |
||||
private ExecutionStatus state; |
||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") |
||||
@JsonProperty("startTime") |
||||
private Date startTime; |
||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") |
||||
@JsonProperty("endTime") |
||||
private Date endTime; |
||||
@JsonProperty("host") |
||||
private String host; |
||||
@JsonProperty("logPath") |
||||
private String logPath; |
||||
|
||||
private TaskAlertContent(Builder builder) { |
||||
this.taskInstanceId = builder.taskInstanceId; |
||||
this.taskName = builder.taskName; |
||||
this.taskType = builder.taskType; |
||||
this.processDefinitionId = builder.processDefinitionId; |
||||
this.processDefinitionName = builder.processDefinitionName; |
||||
this.processInstanceId = builder.processInstanceId; |
||||
this.processInstanceName = builder.processInstanceName; |
||||
this.state = builder.state; |
||||
this.startTime = builder.startTime; |
||||
this.endTime = builder.endTime; |
||||
this.host = builder.host; |
||||
this.logPath = builder.logPath; |
||||
} |
||||
|
||||
public static Builder newBuilder() { |
||||
return new Builder(); |
||||
} |
||||
|
||||
public static class Builder { |
||||
private int taskInstanceId; |
||||
private String taskName; |
||||
private String taskType; |
||||
private int processDefinitionId; |
||||
private String processDefinitionName; |
||||
private int processInstanceId; |
||||
private String processInstanceName; |
||||
private ExecutionStatus state; |
||||
private Date startTime; |
||||
private Date endTime; |
||||
private String host; |
||||
private String logPath; |
||||
|
||||
public Builder taskInstanceId(int taskInstanceId) { |
||||
this.taskInstanceId = taskInstanceId; |
||||
return this; |
||||
} |
||||
|
||||
public Builder taskName(String taskName) { |
||||
this.taskName = taskName; |
||||
return this; |
||||
} |
||||
|
||||
public Builder taskType(String taskType) { |
||||
this.taskType = taskType; |
||||
return this; |
||||
} |
||||
|
||||
public Builder processDefinitionId(int processDefinitionId) { |
||||
this.processDefinitionId = processDefinitionId; |
||||
return this; |
||||
} |
||||
|
||||
public Builder processDefinitionName(String processDefinitionName) { |
||||
this.processDefinitionName = processDefinitionName; |
||||
return this; |
||||
} |
||||
|
||||
public Builder processInstanceId(int processInstanceId) { |
||||
this.processInstanceId = processInstanceId; |
||||
return this; |
||||
} |
||||
|
||||
public Builder processInstanceName(String processInstanceName) { |
||||
this.processInstanceName = processInstanceName; |
||||
return this; |
||||
} |
||||
|
||||
public Builder state(ExecutionStatus state) { |
||||
this.state = state; |
||||
return this; |
||||
} |
||||
|
||||
public Builder startTime(Date startTime) { |
||||
this.startTime = startTime; |
||||
return this; |
||||
} |
||||
|
||||
public Builder endTime(Date endTime) { |
||||
this.endTime = endTime; |
||||
return this; |
||||
} |
||||
|
||||
public Builder host(String host) { |
||||
this.host = host; |
||||
return this; |
||||
} |
||||
|
||||
public Builder logPath(String logPath) { |
||||
this.logPath = logPath; |
||||
return this; |
||||
} |
||||
|
||||
public TaskAlertContent build() { |
||||
return new TaskAlertContent(this); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,29 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.dao.mapper; |
||||
|
||||
import org.apache.dolphinscheduler.dao.entity.DqComparisonType; |
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||
|
||||
/** |
||||
* DqComparisonTypeMapper |
||||
*/ |
||||
public interface DqComparisonTypeMapper extends BaseMapper<DqComparisonType> { |
||||
|
||||
} |
@ -0,0 +1,29 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.dao.mapper; |
||||
|
||||
import org.apache.dolphinscheduler.dao.entity.DqTaskStatisticsValue; |
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||
|
||||
/** |
||||
* DqTaskStatisticsValueMapper |
||||
*/ |
||||
public interface DqTaskStatisticsValueMapper extends BaseMapper<DqTaskStatisticsValue> { |
||||
|
||||
} |
@ -0,0 +1,22 @@
|
||||
<?xml version="1.0" encoding="UTF-8" ?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > |
||||
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.DqComparisonTypeMapper"> |
||||
|
||||
</mapper> |
@ -0,0 +1,22 @@
|
||||
<?xml version="1.0" encoding="UTF-8" ?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
|
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > |
||||
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.DqTaskStatisticsValueMapper"> |
||||
|
||||
</mapper> |
@ -0,0 +1,94 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.config; |
||||
|
||||
import java.util.HashMap; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Map.Entry; |
||||
import java.util.Set; |
||||
|
||||
/** |
||||
* Config |
||||
*/ |
||||
public class Config { |
||||
|
||||
private Map<String,Object> configuration = new HashMap<>(); |
||||
|
||||
public Config() { |
||||
|
||||
} |
||||
|
||||
public Config(Map<String,Object> configuration) { |
||||
if (configuration != null) { |
||||
this.configuration = configuration; |
||||
} |
||||
} |
||||
|
||||
public String getString(String key) { |
||||
return configuration.get(key) == null ? null : String.valueOf(configuration.get(key)); |
||||
} |
||||
|
||||
public List<String> getStringList(String key) { |
||||
return (List<String>)configuration.get(key); |
||||
} |
||||
|
||||
public Integer getInt(String key) { |
||||
return Integer.valueOf(String.valueOf(configuration.get(key))); |
||||
} |
||||
|
||||
public Boolean getBoolean(String key) { |
||||
return Boolean.valueOf(String.valueOf(configuration.get(key))); |
||||
} |
||||
|
||||
public Double getDouble(String key) { |
||||
return Double.valueOf(String.valueOf(configuration.get(key))); |
||||
} |
||||
|
||||
public Long getLong(String key) { |
||||
return Long.valueOf(String.valueOf(configuration.get(key))); |
||||
} |
||||
|
||||
public Boolean has(String key) { |
||||
return configuration.get(key) != null; |
||||
} |
||||
|
||||
public Set<Entry<String, Object>> entrySet() { |
||||
return configuration.entrySet(); |
||||
} |
||||
|
||||
public boolean isEmpty() { |
||||
return configuration.size() <= 0; |
||||
} |
||||
|
||||
public boolean isNotEmpty() { |
||||
return configuration.size() > 0; |
||||
} |
||||
|
||||
public void put(String key, Object value) { |
||||
this.configuration.put(key,value); |
||||
} |
||||
|
||||
public void merge(Map<String, Object> configuration) { |
||||
configuration.forEach(this.configuration::putIfAbsent); |
||||
} |
||||
|
||||
public Map<String, Object> configurationMap() { |
||||
return this.configuration; |
||||
} |
||||
} |
@ -0,0 +1,133 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.config; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.utils.Preconditions; |
||||
import org.apache.dolphinscheduler.data.quality.utils.StringUtils; |
||||
|
||||
import java.util.List; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
|
||||
/** |
||||
* DataQualityConfiguration |
||||
*/ |
||||
public class DataQualityConfiguration implements IConfig { |
||||
|
||||
@JsonProperty("name") |
||||
private String name; |
||||
|
||||
@JsonProperty("env") |
||||
private EnvConfig envConfig; |
||||
|
||||
@JsonProperty("readers") |
||||
private List<ReaderConfig> readerConfigs; |
||||
|
||||
@JsonProperty("transformers") |
||||
private List<TransformerConfig> transformerConfigs; |
||||
|
||||
@JsonProperty("writers") |
||||
private List<WriterConfig> writerConfigs; |
||||
|
||||
public DataQualityConfiguration(){} |
||||
|
||||
public DataQualityConfiguration(String name, |
||||
EnvConfig envConfig, |
||||
List<ReaderConfig> readerConfigs, |
||||
List<WriterConfig> writerConfigs, |
||||
List<TransformerConfig> transformerConfigs) { |
||||
this.name = name; |
||||
this.envConfig = envConfig; |
||||
this.readerConfigs = readerConfigs; |
||||
this.writerConfigs = writerConfigs; |
||||
this.transformerConfigs = transformerConfigs; |
||||
} |
||||
|
||||
public String getName() { |
||||
return name; |
||||
} |
||||
|
||||
public void setName(String name) { |
||||
this.name = name; |
||||
} |
||||
|
||||
public EnvConfig getEnvConfig() { |
||||
return envConfig; |
||||
} |
||||
|
||||
public void setEnvConfig(EnvConfig envConfig) { |
||||
this.envConfig = envConfig; |
||||
} |
||||
|
||||
public List<ReaderConfig> getReaderConfigs() { |
||||
return readerConfigs; |
||||
} |
||||
|
||||
public void setReaderConfigs(List<ReaderConfig> readerConfigs) { |
||||
this.readerConfigs = readerConfigs; |
||||
} |
||||
|
||||
public List<TransformerConfig> getTransformerConfigs() { |
||||
return transformerConfigs; |
||||
} |
||||
|
||||
public void setTransformerConfigs(List<TransformerConfig> transformerConfigs) { |
||||
this.transformerConfigs = transformerConfigs; |
||||
} |
||||
|
||||
public List<WriterConfig> getWriterConfigs() { |
||||
return writerConfigs; |
||||
} |
||||
|
||||
public void setWriterConfigs(List<WriterConfig> writerConfigs) { |
||||
this.writerConfigs = writerConfigs; |
||||
} |
||||
|
||||
@Override |
||||
public void validate() { |
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(name), "name should not be empty"); |
||||
|
||||
Preconditions.checkArgument(envConfig != null, "env config should not be empty"); |
||||
|
||||
Preconditions.checkArgument(readerConfigs != null, "reader config should not be empty"); |
||||
for (ReaderConfig readerConfig : readerConfigs) { |
||||
readerConfig.validate(); |
||||
} |
||||
|
||||
Preconditions.checkArgument(transformerConfigs != null, "transform config should not be empty"); |
||||
for (TransformerConfig transformParameter : transformerConfigs) { |
||||
transformParameter.validate(); |
||||
} |
||||
|
||||
Preconditions.checkArgument(writerConfigs != null, "writer config should not be empty"); |
||||
for (WriterConfig writerConfig :writerConfigs) { |
||||
writerConfig.validate(); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "DataQualityConfiguration{" |
||||
+ "name='" + name + '\'' |
||||
+ ", envConfig=" + envConfig |
||||
+ ", readerConfigs=" + readerConfigs |
||||
+ ", transformerConfigs=" + transformerConfigs |
||||
+ ", writerConfigs=" + writerConfigs |
||||
+ '}'; |
||||
} |
||||
} |
@ -0,0 +1,32 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.config; |
||||
|
||||
import java.util.Map; |
||||
|
||||
/** |
||||
* TransformerConfig |
||||
*/ |
||||
public class TransformerConfig extends BaseConfig { |
||||
|
||||
public TransformerConfig() {} |
||||
|
||||
public TransformerConfig(String type, Map<String,Object> config) { |
||||
super(type, config); |
||||
} |
||||
} |
@ -0,0 +1,46 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.config; |
||||
|
||||
public class ValidateResult { |
||||
|
||||
private boolean success; |
||||
|
||||
private String msg; |
||||
|
||||
public ValidateResult(boolean success, String msg) { |
||||
this.success = success; |
||||
this.msg = msg; |
||||
} |
||||
|
||||
public boolean isSuccess() { |
||||
return success; |
||||
} |
||||
|
||||
public void setSuccess(boolean success) { |
||||
this.success = success; |
||||
} |
||||
|
||||
public String getMsg() { |
||||
return msg; |
||||
} |
||||
|
||||
public void setMsg(String msg) { |
||||
this.msg = msg; |
||||
} |
||||
} |
@ -1,67 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.configuration; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.utils.Preconditions; |
||||
import org.apache.dolphinscheduler.data.quality.utils.StringUtils; |
||||
|
||||
import java.util.Map; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
|
||||
/** |
||||
* ConnectorParameter |
||||
*/ |
||||
public class ConnectorParameter implements IParameter { |
||||
|
||||
@JsonProperty("type") |
||||
private String type; |
||||
|
||||
@JsonProperty("config") |
||||
private Map<String,Object> config; |
||||
|
||||
public ConnectorParameter(){ |
||||
} |
||||
|
||||
public ConnectorParameter(String type, Map<String,Object> config) { |
||||
this.type = type; |
||||
this.config = config; |
||||
} |
||||
|
||||
public String getType() { |
||||
return type; |
||||
} |
||||
|
||||
public void setType(String type) { |
||||
this.type = type; |
||||
} |
||||
|
||||
public Map<String, Object> getConfig() { |
||||
return config; |
||||
} |
||||
|
||||
public void setConfig(Map<String, Object> config) { |
||||
this.config = config; |
||||
} |
||||
|
||||
@Override |
||||
public void validate() { |
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(type), "type should not be empty"); |
||||
Preconditions.checkArgument(config != null, "config should not be empty"); |
||||
} |
||||
} |
@ -1,104 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.configuration; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.utils.Preconditions; |
||||
import org.apache.dolphinscheduler.data.quality.utils.StringUtils; |
||||
|
||||
import java.util.List; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
|
||||
/** |
||||
* DataQualityConfiguration |
||||
*/ |
||||
public class DataQualityConfiguration implements IParameter { |
||||
|
||||
@JsonProperty("name") |
||||
private String name; |
||||
|
||||
@JsonProperty("connectors") |
||||
private List<ConnectorParameter> connectorParameters; |
||||
|
||||
@JsonProperty("writers") |
||||
private List<WriterParameter> writerParams; |
||||
|
||||
@JsonProperty("executors") |
||||
private List<ExecutorParameter> executorParameters; |
||||
|
||||
public DataQualityConfiguration(){} |
||||
|
||||
public DataQualityConfiguration(String name, |
||||
List<ConnectorParameter> connectorParameters, |
||||
List<WriterParameter> writerParams, |
||||
List<ExecutorParameter> executorParameters) { |
||||
this.name = name; |
||||
this.connectorParameters = connectorParameters; |
||||
this.writerParams = writerParams; |
||||
this.executorParameters = executorParameters; |
||||
} |
||||
|
||||
public String getName() { |
||||
return name; |
||||
} |
||||
|
||||
public void setName(String name) { |
||||
this.name = name; |
||||
} |
||||
|
||||
public List<ConnectorParameter> getConnectorParameters() { |
||||
return connectorParameters; |
||||
} |
||||
|
||||
public void setConnectorParameters(List<ConnectorParameter> connectorParameters) { |
||||
this.connectorParameters = connectorParameters; |
||||
} |
||||
|
||||
public List<WriterParameter> getWriterParams() { |
||||
return writerParams; |
||||
} |
||||
|
||||
public void setWriterParams(List<WriterParameter> writerParams) { |
||||
this.writerParams = writerParams; |
||||
} |
||||
|
||||
public List<ExecutorParameter> getExecutorParameters() { |
||||
return executorParameters; |
||||
} |
||||
|
||||
public void setExecutorParameters(List<ExecutorParameter> executorParameters) { |
||||
this.executorParameters = executorParameters; |
||||
} |
||||
|
||||
@Override |
||||
public void validate() { |
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(name), "name should not be empty"); |
||||
Preconditions.checkArgument(connectorParameters != null, "connector param should not be empty"); |
||||
for (ConnectorParameter connectorParameter:connectorParameters) { |
||||
connectorParameter.validate(); |
||||
} |
||||
Preconditions.checkArgument(writerParams != null, "writer param should not be empty"); |
||||
for (WriterParameter writerParameter:writerParams) { |
||||
writerParameter.validate(); |
||||
} |
||||
Preconditions.checkArgument(executorParameters != null, "executor param should not be empty"); |
||||
for (ExecutorParameter executorParameter:executorParameters) { |
||||
executorParameter.validate(); |
||||
} |
||||
} |
||||
} |
@ -1,77 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.configuration; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.utils.Preconditions; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
|
||||
/** |
||||
* ExecutorParameter |
||||
*/ |
||||
public class ExecutorParameter implements IParameter { |
||||
|
||||
@JsonProperty("index") |
||||
private String index; |
||||
|
||||
@JsonProperty("execute.sql") |
||||
private String executeSql; |
||||
|
||||
@JsonProperty("table.alias") |
||||
private String tableAlias; |
||||
|
||||
public ExecutorParameter() { |
||||
} |
||||
|
||||
public ExecutorParameter(String index, String executeSql, String tableAlias) { |
||||
this.index = index; |
||||
this.executeSql = executeSql; |
||||
this.tableAlias = tableAlias; |
||||
} |
||||
|
||||
public String getIndex() { |
||||
return index; |
||||
} |
||||
|
||||
public void setIndex(String index) { |
||||
this.index = index; |
||||
} |
||||
|
||||
public String getExecuteSql() { |
||||
return executeSql; |
||||
} |
||||
|
||||
public void setExecuteSql(String executeSql) { |
||||
this.executeSql = executeSql; |
||||
} |
||||
|
||||
public String getTableAlias() { |
||||
return tableAlias; |
||||
} |
||||
|
||||
public void setTableAlias(String tableAlias) { |
||||
this.tableAlias = tableAlias; |
||||
} |
||||
|
||||
@Override |
||||
public void validate() { |
||||
Preconditions.checkArgument(index != null, "index should not be empty"); |
||||
Preconditions.checkArgument(executeSql != null, "executeSql should not be empty"); |
||||
Preconditions.checkArgument(tableAlias != null, "tableAlias should not be empty"); |
||||
} |
||||
} |
@ -0,0 +1,38 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.enums; |
||||
|
||||
/** |
||||
* TransformerType |
||||
*/ |
||||
public enum TransformerType { |
||||
/** |
||||
* JDBC |
||||
*/ |
||||
SQL; |
||||
|
||||
public static TransformerType getType(String name) { |
||||
for (TransformerType type: TransformerType.values()) { |
||||
if (type.name().equalsIgnoreCase(name)) { |
||||
return type; |
||||
} |
||||
} |
||||
|
||||
return null; |
||||
} |
||||
} |
@ -0,0 +1,40 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.exception; |
||||
|
||||
/** |
||||
* ConfigRuntimeException |
||||
*/ |
||||
public class ConfigRuntimeException extends RuntimeException { |
||||
|
||||
public ConfigRuntimeException() { |
||||
super(); |
||||
} |
||||
|
||||
public ConfigRuntimeException(String message) { |
||||
super(message); |
||||
} |
||||
|
||||
public ConfigRuntimeException(String message, Throwable cause) { |
||||
super(message, cause); |
||||
} |
||||
|
||||
public ConfigRuntimeException(Throwable cause) { |
||||
super(cause); |
||||
} |
||||
} |
@ -0,0 +1,35 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.execution; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.flow.Component; |
||||
|
||||
import java.util.List; |
||||
|
||||
/** |
||||
* Execution |
||||
*/ |
||||
public interface Execution<R extends Component, T extends Component, W extends Component> { |
||||
/** |
||||
* execute |
||||
* @param readers readers |
||||
* @param transformers transformers |
||||
* @param writers writers |
||||
*/ |
||||
void execute(List<R> readers, List<T> transformers, List<W> writers); |
||||
} |
@ -0,0 +1,132 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.execution; |
||||
|
||||
import static org.apache.dolphinscheduler.data.quality.Constants.INPUT_TABLE; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.OUTPUT_TABLE; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.TMP_TABLE; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.config.Config; |
||||
import org.apache.dolphinscheduler.data.quality.exception.ConfigRuntimeException; |
||||
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchReader; |
||||
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchTransformer; |
||||
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchWriter; |
||||
|
||||
import org.apache.spark.sql.Dataset; |
||||
import org.apache.spark.sql.Row; |
||||
|
||||
import java.util.List; |
||||
|
||||
/** |
||||
* SparkBatchExecution |
||||
*/ |
||||
public class SparkBatchExecution implements Execution<BatchReader, BatchTransformer, BatchWriter> { |
||||
|
||||
private final SparkRuntimeEnvironment environment; |
||||
|
||||
public SparkBatchExecution(SparkRuntimeEnvironment environment) throws ConfigRuntimeException { |
||||
this.environment = environment; |
||||
} |
||||
|
||||
@Override |
||||
public void execute(List<BatchReader> readers, List<BatchTransformer> transformers, List<BatchWriter> writers) { |
||||
readers.forEach(reader -> registerInputTempView(reader, environment)); |
||||
|
||||
if (!readers.isEmpty()) { |
||||
Dataset<Row> ds = readers.get(0).read(environment); |
||||
for (BatchTransformer tf:transformers) { |
||||
ds = executeTransformer(environment, tf, ds); |
||||
registerTransformTempView(tf, ds); |
||||
} |
||||
|
||||
for (BatchWriter sink: writers) { |
||||
executeWriter(environment, sink, ds); |
||||
} |
||||
} |
||||
|
||||
environment.sparkSession().stop(); |
||||
} |
||||
|
||||
private void registerTempView(String tableName, Dataset<Row> ds) { |
||||
if (ds != null) { |
||||
ds.createOrReplaceTempView(tableName); |
||||
} else { |
||||
throw new ConfigRuntimeException("dataset is null, can not createOrReplaceTempView"); |
||||
} |
||||
} |
||||
|
||||
private void registerInputTempView(BatchReader reader, SparkRuntimeEnvironment environment) { |
||||
Config conf = reader.getConfig(); |
||||
if (Boolean.TRUE.equals(conf.has(OUTPUT_TABLE))) { |
||||
String tableName = conf.getString(OUTPUT_TABLE); |
||||
registerTempView(tableName, reader.read(environment)); |
||||
} else { |
||||
throw new ConfigRuntimeException( |
||||
"[" + reader.getClass().getName() + "] must be registered as dataset, please set \"output_table\" config"); |
||||
} |
||||
} |
||||
|
||||
private Dataset<Row> executeTransformer(SparkRuntimeEnvironment environment, BatchTransformer transformer, Dataset<Row> dataset) { |
||||
Config config = transformer.getConfig(); |
||||
Dataset<Row> inputDataset; |
||||
Dataset<Row> outputDataset = null; |
||||
if (Boolean.TRUE.equals(config.has(INPUT_TABLE))) { |
||||
String[] tableNames = config.getString(INPUT_TABLE).split(","); |
||||
|
||||
for (String sourceTableName: tableNames) { |
||||
inputDataset = environment.sparkSession().read().table(sourceTableName); |
||||
|
||||
if (outputDataset == null) { |
||||
outputDataset = inputDataset; |
||||
} else { |
||||
outputDataset = outputDataset.union(inputDataset); |
||||
} |
||||
} |
||||
} else { |
||||
outputDataset = dataset; |
||||
} |
||||
|
||||
if (Boolean.TRUE.equals(config.has(TMP_TABLE))) { |
||||
if (outputDataset == null) { |
||||
outputDataset = dataset; |
||||
} |
||||
String tableName = config.getString(TMP_TABLE); |
||||
registerTempView(tableName, outputDataset); |
||||
} |
||||
|
||||
return transformer.transform(outputDataset, environment); |
||||
} |
||||
|
||||
private void registerTransformTempView(BatchTransformer transformer, Dataset<Row> ds) { |
||||
Config config = transformer.getConfig(); |
||||
if (Boolean.TRUE.equals(config.has(OUTPUT_TABLE))) { |
||||
String tableName = config.getString(OUTPUT_TABLE); |
||||
registerTempView(tableName, ds); |
||||
} |
||||
} |
||||
|
||||
private void executeWriter(SparkRuntimeEnvironment environment, BatchWriter writer, Dataset<Row> ds) { |
||||
Config config = writer.getConfig(); |
||||
Dataset<Row> inputDataSet = ds; |
||||
if (Boolean.TRUE.equals(config.has(INPUT_TABLE))) { |
||||
String sourceTableName = config.getString(INPUT_TABLE); |
||||
inputDataSet = environment.sparkSession().read().table(sourceTableName); |
||||
} |
||||
writer.write(inputDataSet, environment); |
||||
} |
||||
} |
@ -0,0 +1,72 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.execution; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.config.Config; |
||||
|
||||
import org.apache.spark.SparkConf; |
||||
import org.apache.spark.sql.SparkSession; |
||||
|
||||
/** |
||||
* SparkRuntimeEnvironment |
||||
*/ |
||||
public class SparkRuntimeEnvironment { |
||||
|
||||
private static final String TYPE = "type"; |
||||
private static final String BATCH = "batch"; |
||||
|
||||
private SparkSession sparkSession; |
||||
|
||||
private Config config = new Config(); |
||||
|
||||
public SparkRuntimeEnvironment(Config config) { |
||||
if (config != null) { |
||||
this.config = config; |
||||
} |
||||
|
||||
this.prepare(); |
||||
} |
||||
|
||||
public Config getConfig() { |
||||
return this.config; |
||||
} |
||||
|
||||
public void prepare() { |
||||
sparkSession = SparkSession.builder().config(createSparkConf()).getOrCreate(); |
||||
} |
||||
|
||||
private SparkConf createSparkConf() { |
||||
SparkConf conf = new SparkConf(); |
||||
this.config.entrySet() |
||||
.forEach(entry -> conf.set(entry.getKey(), String.valueOf(entry.getValue()))); |
||||
conf.set("spark.sql.crossJoin.enabled","true"); |
||||
return conf; |
||||
} |
||||
|
||||
public SparkSession sparkSession() { |
||||
return sparkSession; |
||||
} |
||||
|
||||
public boolean isBatch() { |
||||
return BATCH.equalsIgnoreCase(config.getString(TYPE)); |
||||
} |
||||
|
||||
public SparkBatchExecution getBatchExecution() { |
||||
return new SparkBatchExecution(this); |
||||
} |
||||
} |
@ -0,0 +1,56 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.config.Config; |
||||
import org.apache.dolphinscheduler.data.quality.config.ValidateResult; |
||||
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
import java.util.stream.Collectors; |
||||
|
||||
/** |
||||
* Component |
||||
*/ |
||||
public interface Component { |
||||
|
||||
Config getConfig(); |
||||
|
||||
ValidateResult validateConfig(); |
||||
|
||||
default ValidateResult validate(List<String> requiredOptions) { |
||||
List<String> nonExistsOptions = new ArrayList<>(); |
||||
requiredOptions.forEach(x -> { |
||||
if (Boolean.FALSE.equals(getConfig().has(x))) { |
||||
nonExistsOptions.add(x); |
||||
} |
||||
}); |
||||
|
||||
if (!nonExistsOptions.isEmpty()) { |
||||
return new ValidateResult( |
||||
false, |
||||
nonExistsOptions.stream().map(option -> |
||||
"[" + option + "]").collect(Collectors.joining(",")) + " is not exist"); |
||||
} else { |
||||
return new ValidateResult(true, ""); |
||||
} |
||||
} |
||||
|
||||
void prepare(SparkRuntimeEnvironment prepareEnv); |
||||
} |
@ -1,90 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow; |
||||
|
||||
import static org.apache.dolphinscheduler.data.quality.Constants.DATABASE; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.DEFAULT_DATABASE; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.DEFAULT_DRIVER; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.DRIVER; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.EMPTY; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.PASSWORD; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.TABLE; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.URL; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.USER; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.Constants; |
||||
|
||||
import java.util.Map; |
||||
|
||||
/** |
||||
* JdbcBaseConfig |
||||
*/ |
||||
public class JdbcBaseConfig { |
||||
|
||||
private String database; |
||||
|
||||
private String table; |
||||
|
||||
private String dbTable; |
||||
|
||||
private String url; |
||||
|
||||
private String user; |
||||
|
||||
private String password; |
||||
|
||||
private String driver; |
||||
|
||||
public JdbcBaseConfig(Map<String,Object> config) { |
||||
database = String.valueOf(config.getOrDefault(DATABASE,DEFAULT_DATABASE)); |
||||
table = String.valueOf(config.getOrDefault(TABLE,EMPTY)); |
||||
dbTable = database + Constants.DOTS + table; |
||||
url = String.valueOf(config.getOrDefault(URL,EMPTY)); |
||||
user = String.valueOf(config.getOrDefault(USER,EMPTY)); |
||||
password = String.valueOf(config.getOrDefault(PASSWORD,EMPTY)); |
||||
driver = String.valueOf(config.getOrDefault(DRIVER,DEFAULT_DRIVER)); |
||||
} |
||||
|
||||
public String getDatabase() { |
||||
return database; |
||||
} |
||||
|
||||
public String getTable() { |
||||
return table; |
||||
} |
||||
|
||||
public String getDbTable() { |
||||
return dbTable; |
||||
} |
||||
|
||||
public String getUrl() { |
||||
return url; |
||||
} |
||||
|
||||
public String getUser() { |
||||
return user; |
||||
} |
||||
|
||||
public String getPassword() { |
||||
return password; |
||||
} |
||||
|
||||
public String getDriver() { |
||||
return driver; |
||||
} |
||||
} |
@ -0,0 +1,37 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.batch; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment; |
||||
import org.apache.dolphinscheduler.data.quality.flow.Component; |
||||
|
||||
import org.apache.spark.sql.Dataset; |
||||
import org.apache.spark.sql.Row; |
||||
|
||||
/** |
||||
* BatchReader |
||||
*/ |
||||
public interface BatchReader extends Component { |
||||
|
||||
/** |
||||
* read data from source return dataset |
||||
* @param env env |
||||
* @return Dataset<Row> |
||||
*/ |
||||
Dataset<Row> read(SparkRuntimeEnvironment env); |
||||
} |
@ -0,0 +1,38 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.batch; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment; |
||||
import org.apache.dolphinscheduler.data.quality.flow.Component; |
||||
|
||||
import org.apache.spark.sql.Dataset; |
||||
import org.apache.spark.sql.Row; |
||||
|
||||
/** |
||||
* BatchTransformer |
||||
*/ |
||||
public interface BatchTransformer extends Component { |
||||
|
||||
/** |
||||
* transform the dataset |
||||
* @param data data |
||||
* @param env env |
||||
* @return Dataset<Row> |
||||
*/ |
||||
Dataset<Row> transform(Dataset<Row> data, SparkRuntimeEnvironment env); |
||||
} |
@ -0,0 +1,37 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.batch; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment; |
||||
import org.apache.dolphinscheduler.data.quality.flow.Component; |
||||
|
||||
import org.apache.spark.sql.Dataset; |
||||
import org.apache.spark.sql.Row; |
||||
|
||||
/** |
||||
* BatchWriter |
||||
*/ |
||||
public interface BatchWriter extends Component { |
||||
|
||||
/** |
||||
* write data to target storage |
||||
* @param data data |
||||
* @param environment environment |
||||
*/ |
||||
void write(Dataset<Row> data, SparkRuntimeEnvironment environment); |
||||
} |
@ -0,0 +1,68 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.batch.reader; |
||||
|
||||
import static org.apache.dolphinscheduler.data.quality.Constants.DATABASE; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.SQL; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.TABLE; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.config.Config; |
||||
import org.apache.dolphinscheduler.data.quality.config.ValidateResult; |
||||
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment; |
||||
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchReader; |
||||
import org.apache.dolphinscheduler.data.quality.utils.StringUtils; |
||||
|
||||
import org.apache.spark.sql.Dataset; |
||||
import org.apache.spark.sql.Row; |
||||
|
||||
import java.util.Arrays; |
||||
|
||||
/** |
||||
* HiveReader |
||||
*/ |
||||
public class HiveReader implements BatchReader { |
||||
|
||||
private final Config config; |
||||
|
||||
public HiveReader(Config config) { |
||||
this.config = config; |
||||
} |
||||
|
||||
@Override |
||||
public Config getConfig() { |
||||
return config; |
||||
} |
||||
|
||||
@Override |
||||
public ValidateResult validateConfig() { |
||||
return validate(Arrays.asList(DATABASE, TABLE)); |
||||
} |
||||
|
||||
@Override |
||||
public void prepare(SparkRuntimeEnvironment prepareEnv) { |
||||
if (StringUtils.isEmpty(config.getString(SQL))) { |
||||
config.put(SQL,"select * from " + config.getString(DATABASE) + "." + config.getString(TABLE)); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public Dataset<Row> read(SparkRuntimeEnvironment env) { |
||||
return env.sparkSession().sql(config.getString(SQL)); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,95 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.batch.reader; |
||||
|
||||
import static org.apache.dolphinscheduler.data.quality.Constants.DB_TABLE; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.DOTS; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.DRIVER; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.JDBC; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.PASSWORD; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.TABLE; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.URL; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.USER; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.config.Config; |
||||
import org.apache.dolphinscheduler.data.quality.config.ValidateResult; |
||||
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment; |
||||
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchReader; |
||||
import org.apache.dolphinscheduler.data.quality.utils.ConfigUtils; |
||||
|
||||
import org.apache.spark.sql.DataFrameReader; |
||||
import org.apache.spark.sql.Dataset; |
||||
import org.apache.spark.sql.Row; |
||||
import org.apache.spark.sql.SparkSession; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.HashMap; |
||||
import java.util.Map; |
||||
|
||||
/** |
||||
* AbstractJdbcSource |
||||
*/ |
||||
public class JdbcReader implements BatchReader { |
||||
|
||||
private final Config config; |
||||
|
||||
public JdbcReader(Config config) { |
||||
this.config = config; |
||||
} |
||||
|
||||
@Override |
||||
public Config getConfig() { |
||||
return config; |
||||
} |
||||
|
||||
@Override |
||||
public ValidateResult validateConfig() { |
||||
return validate(Arrays.asList(URL, TABLE, USER, PASSWORD)); |
||||
} |
||||
|
||||
@Override |
||||
public void prepare(SparkRuntimeEnvironment prepareEnv) { |
||||
// Do nothing
|
||||
} |
||||
|
||||
@Override |
||||
public Dataset<Row> read(SparkRuntimeEnvironment env) { |
||||
return jdbcReader(env.sparkSession()).load(); |
||||
} |
||||
|
||||
private DataFrameReader jdbcReader(SparkSession sparkSession) { |
||||
|
||||
DataFrameReader reader = sparkSession.read() |
||||
.format(JDBC) |
||||
.option(URL, config.getString(URL)) |
||||
.option(DB_TABLE, config.getString(TABLE)) |
||||
.option(USER, config.getString(USER)) |
||||
.option(PASSWORD, config.getString(PASSWORD)) |
||||
.option(DRIVER, config.getString(DRIVER)); |
||||
|
||||
Config jdbcConfig = ConfigUtils.extractSubConfig(config, JDBC + DOTS, false); |
||||
|
||||
if (!config.isEmpty()) { |
||||
Map<String,String> optionMap = new HashMap<>(16); |
||||
jdbcConfig.entrySet().forEach(x -> optionMap.put(x.getKey(),String.valueOf(x.getValue()))); |
||||
reader.options(optionMap); |
||||
} |
||||
|
||||
return reader; |
||||
} |
||||
} |
@ -0,0 +1,76 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.batch.reader; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.config.Config; |
||||
import org.apache.dolphinscheduler.data.quality.config.ReaderConfig; |
||||
import org.apache.dolphinscheduler.data.quality.enums.ReaderType; |
||||
import org.apache.dolphinscheduler.data.quality.exception.DataQualityException; |
||||
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment; |
||||
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchReader; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
|
||||
/** |
||||
* ReaderFactory |
||||
*/ |
||||
public class ReaderFactory { |
||||
|
||||
private static class Singleton { |
||||
static ReaderFactory instance = new ReaderFactory(); |
||||
} |
||||
|
||||
public static ReaderFactory getInstance() { |
||||
return Singleton.instance; |
||||
} |
||||
|
||||
public List<BatchReader> getReaders(SparkRuntimeEnvironment sparkRuntimeEnvironment, List<ReaderConfig> readerConfigs) throws DataQualityException { |
||||
|
||||
List<BatchReader> readerList = new ArrayList<>(); |
||||
|
||||
for (ReaderConfig readerConfig : readerConfigs) { |
||||
BatchReader reader = getReader(readerConfig); |
||||
if (reader != null) { |
||||
reader.validateConfig(); |
||||
reader.prepare(sparkRuntimeEnvironment); |
||||
readerList.add(reader); |
||||
} |
||||
} |
||||
|
||||
return readerList; |
||||
} |
||||
|
||||
private BatchReader getReader(ReaderConfig readerConfig) throws DataQualityException { |
||||
ReaderType readerType = ReaderType.getType(readerConfig.getType()); |
||||
Config config = new Config(readerConfig.getConfig()); |
||||
if (readerType != null) { |
||||
switch (readerType) { |
||||
case JDBC: |
||||
return new JdbcReader(config); |
||||
case HIVE: |
||||
return new HiveReader(config); |
||||
default: |
||||
throw new DataQualityException("reader type " + readerType + " is not supported!"); |
||||
} |
||||
} |
||||
|
||||
return null; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,62 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.batch.transformer; |
||||
|
||||
import static org.apache.dolphinscheduler.data.quality.Constants.SQL; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.config.Config; |
||||
import org.apache.dolphinscheduler.data.quality.config.ValidateResult; |
||||
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment; |
||||
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchTransformer; |
||||
|
||||
import org.apache.spark.sql.Dataset; |
||||
import org.apache.spark.sql.Row; |
||||
|
||||
import java.util.Collections; |
||||
|
||||
/** |
||||
* SqlTransformer |
||||
*/ |
||||
public class SqlTransformer implements BatchTransformer { |
||||
|
||||
private final Config config; |
||||
|
||||
public SqlTransformer(Config config) { |
||||
this.config = config; |
||||
} |
||||
|
||||
@Override |
||||
public Config getConfig() { |
||||
return config; |
||||
} |
||||
|
||||
@Override |
||||
public ValidateResult validateConfig() { |
||||
return validate(Collections.singletonList(SQL)); |
||||
} |
||||
|
||||
@Override |
||||
public void prepare(SparkRuntimeEnvironment prepareEnv) { |
||||
// Do nothing
|
||||
} |
||||
|
||||
@Override |
||||
public Dataset<Row> transform(Dataset<Row> data, SparkRuntimeEnvironment env) { |
||||
return env.sparkSession().sql(config.getString(SQL)); |
||||
} |
||||
} |
@ -0,0 +1,72 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.batch.transformer; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.config.Config; |
||||
import org.apache.dolphinscheduler.data.quality.config.TransformerConfig; |
||||
import org.apache.dolphinscheduler.data.quality.enums.TransformerType; |
||||
import org.apache.dolphinscheduler.data.quality.exception.DataQualityException; |
||||
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment; |
||||
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchTransformer; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
|
||||
/** |
||||
* WriterFactory |
||||
*/ |
||||
public class TransformerFactory { |
||||
|
||||
private static class Singleton { |
||||
static TransformerFactory instance = new TransformerFactory(); |
||||
} |
||||
|
||||
public static TransformerFactory getInstance() { |
||||
return Singleton.instance; |
||||
} |
||||
|
||||
public List<BatchTransformer> getTransformer(SparkRuntimeEnvironment sparkRuntimeEnvironment, List<TransformerConfig> transformerConfigs) throws DataQualityException { |
||||
|
||||
List<BatchTransformer> transformers = new ArrayList<>(); |
||||
|
||||
for (TransformerConfig transformerConfig:transformerConfigs) { |
||||
BatchTransformer transformer = getTransformer(transformerConfig); |
||||
if (transformer != null) { |
||||
transformer.validateConfig(); |
||||
transformer.prepare(sparkRuntimeEnvironment); |
||||
transformers.add(transformer); |
||||
} |
||||
} |
||||
|
||||
return transformers; |
||||
} |
||||
|
||||
private BatchTransformer getTransformer(TransformerConfig transformerConfig) throws DataQualityException { |
||||
TransformerType transformerType = TransformerType.getType(transformerConfig.getType()); |
||||
Config config = new Config(transformerConfig.getConfig()); |
||||
if (transformerType != null) { |
||||
if (transformerType == TransformerType.SQL) { |
||||
return new SqlTransformer(config); |
||||
} |
||||
throw new DataQualityException("transformer type " + transformerType + " is not supported!"); |
||||
} |
||||
|
||||
return null; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,86 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.batch.writer; |
||||
|
||||
import static org.apache.dolphinscheduler.data.quality.Constants.APPEND; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.DB_TABLE; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.DRIVER; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.JDBC; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.PASSWORD; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.SAVE_MODE; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.SQL; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.TABLE; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.URL; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.USER; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.config.Config; |
||||
import org.apache.dolphinscheduler.data.quality.config.ValidateResult; |
||||
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment; |
||||
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchWriter; |
||||
import org.apache.dolphinscheduler.data.quality.utils.StringUtils; |
||||
|
||||
import org.apache.spark.sql.Dataset; |
||||
import org.apache.spark.sql.Row; |
||||
|
||||
import java.util.Arrays; |
||||
|
||||
/** |
||||
* JdbcWriter |
||||
*/ |
||||
public class JdbcWriter implements BatchWriter { |
||||
|
||||
private final Config config; |
||||
|
||||
public JdbcWriter(Config config) { |
||||
this.config = config; |
||||
} |
||||
|
||||
@Override |
||||
public Config getConfig() { |
||||
return config; |
||||
} |
||||
|
||||
@Override |
||||
public ValidateResult validateConfig() { |
||||
return validate(Arrays.asList(URL, TABLE, USER, PASSWORD)); |
||||
} |
||||
|
||||
@Override |
||||
public void prepare(SparkRuntimeEnvironment prepareEnv) { |
||||
if (StringUtils.isEmpty(config.getString(SAVE_MODE))) { |
||||
config.put(SAVE_MODE,APPEND); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void write(Dataset<Row> data, SparkRuntimeEnvironment env) { |
||||
if (!StringUtils.isBlank(config.getString(SQL))) { |
||||
data = env.sparkSession().sql(config.getString(SQL)); |
||||
} |
||||
|
||||
data.write() |
||||
.format(JDBC) |
||||
.option(DRIVER,config.getString(DRIVER)) |
||||
.option(URL,config.getString(URL)) |
||||
.option(DB_TABLE, config.getString(TABLE)) |
||||
.option(USER, config.getString(USER)) |
||||
.option(PASSWORD, config.getString(PASSWORD)) |
||||
.mode(config.getString(SAVE_MODE)) |
||||
.save(); |
||||
} |
||||
} |
@ -0,0 +1,81 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.batch.writer; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.config.Config; |
||||
import org.apache.dolphinscheduler.data.quality.config.WriterConfig; |
||||
import org.apache.dolphinscheduler.data.quality.enums.WriterType; |
||||
import org.apache.dolphinscheduler.data.quality.exception.DataQualityException; |
||||
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment; |
||||
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchWriter; |
||||
import org.apache.dolphinscheduler.data.quality.flow.batch.writer.file.HdfsFileWriter; |
||||
import org.apache.dolphinscheduler.data.quality.flow.batch.writer.file.LocalFileWriter; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
|
||||
/** |
||||
* WriterFactory |
||||
*/ |
||||
public class WriterFactory { |
||||
|
||||
private static class Singleton { |
||||
static WriterFactory instance = new WriterFactory(); |
||||
} |
||||
|
||||
public static WriterFactory getInstance() { |
||||
return Singleton.instance; |
||||
} |
||||
|
||||
public List<BatchWriter> getWriters(SparkRuntimeEnvironment sparkRuntimeEnvironment, List<WriterConfig> writerConfigs) throws DataQualityException { |
||||
|
||||
List<BatchWriter> writerList = new ArrayList<>(); |
||||
|
||||
for (WriterConfig writerConfig:writerConfigs) { |
||||
BatchWriter writer = getWriter(writerConfig); |
||||
if (writer != null) { |
||||
writer.validateConfig(); |
||||
writer.prepare(sparkRuntimeEnvironment); |
||||
writerList.add(writer); |
||||
} |
||||
} |
||||
|
||||
return writerList; |
||||
} |
||||
|
||||
private BatchWriter getWriter(WriterConfig writerConfig) throws DataQualityException { |
||||
|
||||
WriterType writerType = WriterType.getType(writerConfig.getType()); |
||||
Config config = new Config(writerConfig.getConfig()); |
||||
if (writerType != null) { |
||||
switch (writerType) { |
||||
case JDBC: |
||||
return new JdbcWriter(config); |
||||
case LOCAL_FILE: |
||||
return new LocalFileWriter(config); |
||||
case HDFS_FILE: |
||||
return new HdfsFileWriter(config); |
||||
default: |
||||
throw new DataQualityException("writer type " + writerType + " is not supported!"); |
||||
} |
||||
} |
||||
|
||||
return null; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,130 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.batch.writer.file; |
||||
|
||||
import static org.apache.dolphinscheduler.data.quality.Constants.SAVE_MODE; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.config.Config; |
||||
import org.apache.dolphinscheduler.data.quality.config.ValidateResult; |
||||
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment; |
||||
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchWriter; |
||||
import org.apache.dolphinscheduler.data.quality.utils.ConfigUtils; |
||||
import org.apache.dolphinscheduler.data.quality.utils.StringUtils; |
||||
|
||||
import org.apache.commons.collections.CollectionUtils; |
||||
import org.apache.spark.sql.DataFrameWriter; |
||||
import org.apache.spark.sql.Dataset; |
||||
import org.apache.spark.sql.Row; |
||||
|
||||
import java.util.Collections; |
||||
import java.util.HashMap; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
/** |
||||
* BaseFileWriter |
||||
*/ |
||||
public abstract class BaseFileWriter implements BatchWriter { |
||||
|
||||
public static final String PARTITION_BY = "partition_by"; |
||||
public static final String SERIALIZER = "serializer"; |
||||
public static final String PATH = "path"; |
||||
|
||||
private final Config config; |
||||
|
||||
protected BaseFileWriter(Config config) { |
||||
this.config = config; |
||||
} |
||||
|
||||
@Override |
||||
public Config getConfig() { |
||||
return config; |
||||
} |
||||
|
||||
@Override |
||||
public void prepare(SparkRuntimeEnvironment prepareEnv) { |
||||
Map<String,Object> defaultConfig = new HashMap<>(); |
||||
|
||||
defaultConfig.put(PARTITION_BY, Collections.emptyList()); |
||||
defaultConfig.put(SAVE_MODE,"error"); |
||||
defaultConfig.put(SERIALIZER,"json"); |
||||
|
||||
config.merge(defaultConfig); |
||||
} |
||||
|
||||
protected ValidateResult checkConfigImpl(List<String> allowedUri) { |
||||
|
||||
if (Boolean.TRUE.equals(config.has(PATH)) && StringUtils.isNotEmpty(config.getString(PATH))) { |
||||
String dir = config.getString(PATH); |
||||
if (dir.startsWith("/") || uriInAllowedSchema(dir, allowedUri)) { |
||||
return new ValidateResult(true, ""); |
||||
} else { |
||||
return new ValidateResult(false, "invalid path URI, please set the following allowed schemas: " + String.join(",", allowedUri)); |
||||
} |
||||
} else { |
||||
return new ValidateResult(false, "please specify [path] as non-empty string"); |
||||
} |
||||
} |
||||
|
||||
protected boolean uriInAllowedSchema(String uri, List<String> allowedUri) { |
||||
return allowedUri.stream().map(uri::startsWith).reduce(true, (a, b) -> a && b); |
||||
} |
||||
|
||||
protected String buildPathWithDefaultSchema(String uri, String defaultUriSchema) { |
||||
return uri.startsWith("/") ? defaultUriSchema + uri : uri; |
||||
} |
||||
|
||||
protected void outputImpl(Dataset<Row> df, String defaultUriSchema) { |
||||
|
||||
DataFrameWriter<Row> writer = df.write().mode(config.getString(SAVE_MODE)); |
||||
|
||||
if (CollectionUtils.isNotEmpty(config.getStringList(PARTITION_BY))) { |
||||
List<String> partitionKeys = config.getStringList(PARTITION_BY); |
||||
writer.partitionBy(partitionKeys.toArray(new String[]{})); |
||||
} |
||||
|
||||
Config fileConfig = ConfigUtils.extractSubConfig(config, "options.", false); |
||||
if (fileConfig.isNotEmpty()) { |
||||
Map<String,String> optionMap = new HashMap<>(16); |
||||
fileConfig.entrySet().forEach(x -> optionMap.put(x.getKey(),String.valueOf(x.getValue()))); |
||||
writer.options(optionMap); |
||||
} |
||||
|
||||
String path = buildPathWithDefaultSchema(config.getString(PATH), defaultUriSchema); |
||||
|
||||
switch (config.getString(SERIALIZER)) { |
||||
case "csv": |
||||
writer.csv(path); |
||||
break; |
||||
case "json": |
||||
writer.json(path); |
||||
break; |
||||
case "parquet": |
||||
writer.parquet(path); |
||||
break; |
||||
case "text": |
||||
writer.text(path); |
||||
break; |
||||
case "orc": |
||||
writer.orc(path); |
||||
break; |
||||
default: |
||||
break; |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,47 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.batch.writer.file; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.config.Config; |
||||
import org.apache.dolphinscheduler.data.quality.config.ValidateResult; |
||||
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment; |
||||
|
||||
import org.apache.spark.sql.Dataset; |
||||
import org.apache.spark.sql.Row; |
||||
|
||||
import java.util.Collections; |
||||
|
||||
/** |
||||
* LocalFileWriter |
||||
*/ |
||||
public class LocalFileWriter extends BaseFileWriter { |
||||
|
||||
public LocalFileWriter(Config config) { |
||||
super(config); |
||||
} |
||||
|
||||
@Override |
||||
public void write(Dataset<Row> data, SparkRuntimeEnvironment environment) { |
||||
outputImpl(data,"file://"); |
||||
} |
||||
|
||||
@Override |
||||
public ValidateResult validateConfig() { |
||||
return checkConfigImpl(Collections.singletonList("file://")); |
||||
} |
||||
} |
@ -1,73 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.connector; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.configuration.ConnectorParameter; |
||||
import org.apache.dolphinscheduler.data.quality.context.DataQualityContext; |
||||
import org.apache.dolphinscheduler.data.quality.enums.ConnectorType; |
||||
import org.apache.dolphinscheduler.data.quality.exception.DataQualityException; |
||||
|
||||
import org.apache.spark.sql.SparkSession; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
|
||||
/** |
||||
* ConnectorFactory |
||||
*/ |
||||
public class ConnectorFactory { |
||||
|
||||
private static class Singleton { |
||||
static ConnectorFactory instance = new ConnectorFactory(); |
||||
} |
||||
|
||||
public static ConnectorFactory getInstance() { |
||||
return Singleton.instance; |
||||
} |
||||
|
||||
public List<IConnector> getConnectors(DataQualityContext context) throws DataQualityException { |
||||
|
||||
List<IConnector> connectorList = new ArrayList<>(); |
||||
|
||||
for (ConnectorParameter connectorParameter :context.getConnectorParameterList()) { |
||||
IConnector connector = getConnector(context.getSparkSession(), connectorParameter); |
||||
if (connector != null) { |
||||
connectorList.add(connector); |
||||
} |
||||
} |
||||
|
||||
return connectorList; |
||||
} |
||||
|
||||
private IConnector getConnector(SparkSession sparkSession,ConnectorParameter connectorParameter) throws DataQualityException { |
||||
ConnectorType connectorType = ConnectorType.getType(connectorParameter.getType()); |
||||
if (connectorType != null) { |
||||
switch (connectorType) { |
||||
case HIVE: |
||||
return new HiveConnector(sparkSession, connectorParameter); |
||||
case JDBC: |
||||
return new JdbcConnector(sparkSession, connectorParameter); |
||||
default: |
||||
throw new DataQualityException("connector type ${connectorType} is not supported!"); |
||||
} |
||||
} |
||||
|
||||
return null; |
||||
} |
||||
|
||||
} |
@ -1,55 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.connector; |
||||
|
||||
import static org.apache.dolphinscheduler.data.quality.Constants.DATABASE; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.DEFAULT_DATABASE; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.EMPTY; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.TABLE; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.Constants; |
||||
import org.apache.dolphinscheduler.data.quality.configuration.ConnectorParameter; |
||||
|
||||
import org.apache.spark.sql.SparkSession; |
||||
|
||||
import java.util.Map; |
||||
|
||||
/** |
||||
* HiveConnector |
||||
*/ |
||||
public class HiveConnector implements IConnector { |
||||
|
||||
private final SparkSession sparkSession; |
||||
|
||||
private final ConnectorParameter connectorParameter; |
||||
|
||||
public HiveConnector(SparkSession sparkSession, ConnectorParameter connectorParameter) { |
||||
this.sparkSession = sparkSession; |
||||
this.connectorParameter = connectorParameter; |
||||
} |
||||
|
||||
@Override |
||||
public void execute() { |
||||
Map<String,Object> config = connectorParameter.getConfig(); |
||||
String database = String.valueOf(config.getOrDefault(DATABASE,DEFAULT_DATABASE)); |
||||
String table = String.valueOf(config.getOrDefault(TABLE,EMPTY)); |
||||
String dbTable = database + Constants.DOTS + table; |
||||
|
||||
sparkSession.table(dbTable).createOrReplaceTempView(table); |
||||
} |
||||
} |
@ -1,61 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.connector; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.configuration.ConnectorParameter; |
||||
import org.apache.dolphinscheduler.data.quality.flow.JdbcBaseConfig; |
||||
import org.apache.dolphinscheduler.data.quality.utils.JdbcUtils; |
||||
import org.apache.dolphinscheduler.data.quality.utils.Preconditions; |
||||
|
||||
import org.apache.spark.sql.SparkSession; |
||||
|
||||
import java.util.Map; |
||||
|
||||
/** |
||||
* JdbcConnector |
||||
*/ |
||||
public class JdbcConnector implements IConnector { |
||||
|
||||
private final SparkSession sparkSession; |
||||
|
||||
private final ConnectorParameter connectorParameter; |
||||
|
||||
public JdbcConnector(SparkSession sparkSession, ConnectorParameter connectorParameter) { |
||||
this.sparkSession = sparkSession; |
||||
this.connectorParameter = connectorParameter; |
||||
} |
||||
|
||||
@Override |
||||
public void execute() { |
||||
|
||||
Map<String,Object> config = connectorParameter.getConfig(); |
||||
JdbcBaseConfig jdbcBaseConfig = new JdbcBaseConfig(config); |
||||
|
||||
Preconditions.checkArgument(JdbcUtils.isJdbcDriverLoaded(jdbcBaseConfig.getDriver()), "JDBC driver $driver not present in classpath"); |
||||
|
||||
sparkSession |
||||
.read() |
||||
.format("jdbc") |
||||
.option("driver",jdbcBaseConfig.getDriver()) |
||||
.option("url",jdbcBaseConfig.getUrl()) |
||||
.option("dbtable", jdbcBaseConfig.getDbTable()) |
||||
.option("user", jdbcBaseConfig.getUser()) |
||||
.option("password", jdbcBaseConfig.getPassword()) |
||||
.load().createOrReplaceTempView(jdbcBaseConfig.getTable()); |
||||
} |
||||
} |
@ -1,59 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.executor; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.configuration.ExecutorParameter; |
||||
import org.apache.dolphinscheduler.data.quality.flow.DataQualityTask; |
||||
import org.apache.dolphinscheduler.data.quality.utils.StringUtils; |
||||
|
||||
import org.apache.spark.sql.SparkSession; |
||||
|
||||
import java.util.List; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
/** |
||||
* SparkSqlExecuteTask |
||||
*/ |
||||
public class SparkSqlExecuteTask implements DataQualityTask { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(SparkSqlExecuteTask.class); |
||||
|
||||
private final SparkSession sparkSession; |
||||
|
||||
private final List<ExecutorParameter> executorParameterList; |
||||
|
||||
public SparkSqlExecuteTask(SparkSession sparkSession,List<ExecutorParameter> executorParameterList) { |
||||
this.sparkSession = sparkSession; |
||||
this.executorParameterList = executorParameterList; |
||||
} |
||||
|
||||
@Override |
||||
public void execute() { |
||||
for (ExecutorParameter executorParameter : executorParameterList) { |
||||
if (StringUtils.isNotEmpty(executorParameter.getTableAlias())) { |
||||
sparkSession |
||||
.sql(executorParameter.getExecuteSql()) |
||||
.createOrReplaceTempView(executorParameter.getTableAlias()); |
||||
} else { |
||||
logger.error("lost table alias"); |
||||
} |
||||
} |
||||
} |
||||
} |
@ -1,68 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.writer; |
||||
|
||||
import static org.apache.dolphinscheduler.data.quality.Constants.EMPTY; |
||||
import static org.apache.dolphinscheduler.data.quality.Constants.SQL; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.configuration.WriterParameter; |
||||
import org.apache.dolphinscheduler.data.quality.flow.JdbcBaseConfig; |
||||
import org.apache.dolphinscheduler.data.quality.utils.JdbcUtils; |
||||
import org.apache.dolphinscheduler.data.quality.utils.Preconditions; |
||||
|
||||
import org.apache.spark.sql.SaveMode; |
||||
import org.apache.spark.sql.SparkSession; |
||||
|
||||
import java.util.Map; |
||||
|
||||
/** |
||||
* JdbcWriter |
||||
*/ |
||||
public class JdbcWriter implements IWriter { |
||||
|
||||
private final SparkSession sparkSession; |
||||
|
||||
private final WriterParameter writerParam; |
||||
|
||||
public JdbcWriter(SparkSession sparkSession, WriterParameter writerParam) { |
||||
this.sparkSession = sparkSession; |
||||
this.writerParam = writerParam; |
||||
} |
||||
|
||||
@Override |
||||
public void execute() { |
||||
|
||||
Map<String,Object> config = writerParam.getConfig(); |
||||
JdbcBaseConfig jdbcBaseConfig = new JdbcBaseConfig(config); |
||||
String sql = String.valueOf(config.getOrDefault(SQL,EMPTY)); |
||||
|
||||
Preconditions.checkArgument(JdbcUtils.isJdbcDriverLoaded(jdbcBaseConfig.getDriver()), "JDBC driver $driver not present in classpath"); |
||||
|
||||
sparkSession.sql(sql) |
||||
.write() |
||||
.format("jdbc") |
||||
.option("driver",jdbcBaseConfig.getDriver()) |
||||
.option("url",jdbcBaseConfig.getUrl()) |
||||
.option("dbtable", jdbcBaseConfig.getTable()) |
||||
.option("user", jdbcBaseConfig.getUser()) |
||||
.option("password", jdbcBaseConfig.getPassword()) |
||||
.mode(SaveMode.Append) |
||||
.save(); |
||||
} |
||||
|
||||
} |
@ -1,69 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.flow.writer; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.configuration.WriterParameter; |
||||
import org.apache.dolphinscheduler.data.quality.context.DataQualityContext; |
||||
import org.apache.dolphinscheduler.data.quality.enums.WriterType; |
||||
import org.apache.dolphinscheduler.data.quality.exception.DataQualityException; |
||||
|
||||
import org.apache.spark.sql.SparkSession; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
|
||||
/** |
||||
* WriterFactory |
||||
*/ |
||||
public class WriterFactory { |
||||
|
||||
private static class Singleton { |
||||
static WriterFactory instance = new WriterFactory(); |
||||
} |
||||
|
||||
public static WriterFactory getInstance() { |
||||
return Singleton.instance; |
||||
} |
||||
|
||||
public List<IWriter> getWriters(DataQualityContext context) throws DataQualityException { |
||||
|
||||
List<IWriter> writerList = new ArrayList<>(); |
||||
|
||||
for (WriterParameter writerParam:context.getWriterParamList()) { |
||||
IWriter writer = getWriter(context.getSparkSession(),writerParam); |
||||
if (writer != null) { |
||||
writerList.add(writer); |
||||
} |
||||
} |
||||
|
||||
return writerList; |
||||
} |
||||
|
||||
private IWriter getWriter(SparkSession sparkSession,WriterParameter writerParam) throws DataQualityException { |
||||
WriterType writerType = WriterType.getType(writerParam.getType()); |
||||
if (writerType != null) { |
||||
if (writerType == WriterType.JDBC) { |
||||
return new JdbcWriter(sparkSession, writerParam); |
||||
} |
||||
throw new DataQualityException("writer type $readerType is not supported!"); |
||||
} |
||||
|
||||
return null; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,56 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.data.quality.utils; |
||||
|
||||
import org.apache.dolphinscheduler.data.quality.config.Config; |
||||
|
||||
import java.util.LinkedHashMap; |
||||
import java.util.Map; |
||||
|
||||
public class ConfigUtils { |
||||
|
||||
private ConfigUtils() { |
||||
throw new IllegalStateException("Construct ConfigUtils"); |
||||
} |
||||
|
||||
/** |
||||
* Extract sub config with fixed prefix |
||||
* |
||||
* @param source config source |
||||
* @param prefix config prefix |
||||
* @param keepPrefix true if keep prefix |
||||
*/ |
||||
public static Config extractSubConfig(Config source, String prefix, boolean keepPrefix) { |
||||
Map<String, Object> values = new LinkedHashMap<>(); |
||||
|
||||
for (Map.Entry<String, Object> entry : source.entrySet()) { |
||||
final String key = entry.getKey(); |
||||
final String value = String.valueOf(entry.getValue()); |
||||
|
||||
if (key.startsWith(prefix)) { |
||||
if (keepPrefix) { |
||||
values.put(key, value); |
||||
} else { |
||||
values.put(key.substring(prefix.length()), value); |
||||
} |
||||
} |
||||
} |
||||
|
||||
return new Config(values); |
||||
} |
||||
} |
@ -0,0 +1,182 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.utils; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.common.enums.TaskType; |
||||
import org.apache.dolphinscheduler.common.enums.dq.CheckType; |
||||
import org.apache.dolphinscheduler.common.enums.dq.DqFailureStrategy; |
||||
import org.apache.dolphinscheduler.common.enums.dq.DqTaskState; |
||||
import org.apache.dolphinscheduler.common.enums.dq.OperatorType; |
||||
import org.apache.dolphinscheduler.dao.entity.DqExecuteResult; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
|
||||
import java.math.BigDecimal; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
/** |
||||
* DataQualityResultOperator |
||||
*/ |
||||
@Component |
||||
public class DataQualityResultOperator { |
||||
|
||||
/** |
||||
* logger |
||||
*/ |
||||
private final Logger logger = LoggerFactory.getLogger(DataQualityResultOperator.class); |
||||
|
||||
/** |
||||
* process service |
||||
*/ |
||||
@Autowired |
||||
private ProcessService processService; |
||||
|
||||
/** |
||||
* alert manager |
||||
*/ |
||||
private final AlertManager alertManager = new AlertManager(); |
||||
|
||||
public void operateDqExecuteResult(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) { |
||||
if (TaskType.DATA_QUALITY == TaskType.valueOf(taskInstance.getTaskType())) { |
||||
|
||||
ProcessInstance processInstance = |
||||
processService.findProcessInstanceDetailById( |
||||
Integer.parseInt(String.valueOf(taskInstance.getProcessInstanceId()))); |
||||
|
||||
if (taskResponseEvent.getState().typeIsFailure() |
||||
|| taskResponseEvent.getState().typeIsCancel()) { |
||||
processService.deleteDqExecuteResultByTaskInstanceId(taskInstance.getId()); |
||||
processService.deleteTaskStatisticsValueByTaskInstanceId(taskInstance.getId()); |
||||
sendDqTaskErrorAlert(taskInstance,processInstance); |
||||
return; |
||||
} |
||||
|
||||
processService.updateDqExecuteResultUserId(taskInstance.getId()); |
||||
DqExecuteResult dqExecuteResult = |
||||
processService.getDqExecuteResultByTaskInstanceId(taskInstance.getId()); |
||||
if (dqExecuteResult != null) { |
||||
//check the result ,if result is failure do some operator by failure strategy
|
||||
checkDqExecuteResult(taskResponseEvent, dqExecuteResult, processInstance); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void checkDqExecuteResult(TaskResponseEvent taskResponseEvent, |
||||
DqExecuteResult dqExecuteResult, |
||||
ProcessInstance processInstance) { |
||||
if (isFailure(dqExecuteResult)) { |
||||
DqFailureStrategy dqFailureStrategy = DqFailureStrategy.of(dqExecuteResult.getFailureStrategy()); |
||||
if (dqFailureStrategy != null) { |
||||
dqExecuteResult.setState(DqTaskState.FAILURE); |
||||
sendDqTaskResultAlert(dqExecuteResult,processInstance); |
||||
switch (dqFailureStrategy) { |
||||
case ALERT: |
||||
logger.info("task is failure, continue and alert"); |
||||
break; |
||||
case BLOCK: |
||||
taskResponseEvent.setState(ExecutionStatus.FAILURE); |
||||
logger.info("task is failure, end and alert"); |
||||
break; |
||||
default: |
||||
break; |
||||
} |
||||
} |
||||
} else { |
||||
dqExecuteResult.setState(DqTaskState.SUCCESS); |
||||
} |
||||
|
||||
processService.updateDqExecuteResultState(dqExecuteResult); |
||||
} |
||||
|
||||
private boolean isFailure(DqExecuteResult dqExecuteResult) { |
||||
CheckType checkType = dqExecuteResult.getCheckType(); |
||||
|
||||
double statisticsValue = dqExecuteResult.getStatisticsValue(); |
||||
double comparisonValue = dqExecuteResult.getComparisonValue(); |
||||
double threshold = dqExecuteResult.getThreshold(); |
||||
|
||||
OperatorType operatorType = OperatorType.of(dqExecuteResult.getOperator()); |
||||
|
||||
boolean isFailure = false; |
||||
if (operatorType != null) { |
||||
double srcValue = 0; |
||||
switch (checkType) { |
||||
case COMPARISON_MINUS_STATISTICS: |
||||
srcValue = comparisonValue - statisticsValue; |
||||
isFailure = getCompareResult(operatorType,srcValue,threshold); |
||||
break; |
||||
case STATISTICS_MINUS_COMPARISON: |
||||
srcValue = statisticsValue - comparisonValue; |
||||
isFailure = getCompareResult(operatorType,srcValue,threshold); |
||||
break; |
||||
case STATISTICS_COMPARISON_PERCENTAGE: |
||||
if (comparisonValue > 0) { |
||||
srcValue = statisticsValue / comparisonValue * 100; |
||||
} |
||||
isFailure = getCompareResult(operatorType,srcValue,threshold); |
||||
break; |
||||
case STATISTICS_COMPARISON_DIFFERENCE_COMPARISON_PERCENTAGE: |
||||
if (comparisonValue > 0) { |
||||
srcValue = Math.abs(comparisonValue - statisticsValue) / comparisonValue * 100; |
||||
} |
||||
isFailure = getCompareResult(operatorType,srcValue,threshold); |
||||
break; |
||||
default: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
return isFailure; |
||||
} |
||||
|
||||
private void sendDqTaskResultAlert(DqExecuteResult dqExecuteResult, ProcessInstance processInstance) { |
||||
alertManager.sendDataQualityTaskExecuteResultAlert(dqExecuteResult,processInstance); |
||||
} |
||||
|
||||
private void sendDqTaskErrorAlert(TaskInstance taskInstance, ProcessInstance processInstance) { |
||||
alertManager.sendTaskErrorAlert(taskInstance,processInstance); |
||||
} |
||||
|
||||
private boolean getCompareResult(OperatorType operatorType, double srcValue, double targetValue) { |
||||
BigDecimal src = BigDecimal.valueOf(srcValue); |
||||
BigDecimal target = BigDecimal.valueOf(targetValue); |
||||
switch (operatorType) { |
||||
case EQ: |
||||
return src.compareTo(target) == 0; |
||||
case LT: |
||||
return src.compareTo(target) <= -1; |
||||
case LE: |
||||
return src.compareTo(target) == 0 || src.compareTo(target) <= -1; |
||||
case GT: |
||||
return src.compareTo(target) >= 1; |
||||
case GE: |
||||
return src.compareTo(target) == 0 || src.compareTo(target) >= 1; |
||||
case NE: |
||||
return src.compareTo(target) != 0; |
||||
default: |
||||
return true; |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,112 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.utils; |
||||
|
||||
import static org.apache.dolphinscheduler.common.Constants.COLON; |
||||
import static org.apache.dolphinscheduler.common.Constants.DOUBLE_SLASH; |
||||
import static org.apache.dolphinscheduler.common.Constants.MYSQL; |
||||
import static org.apache.dolphinscheduler.common.Constants.POSTGRESQL; |
||||
import static org.apache.dolphinscheduler.common.Constants.QUESTION; |
||||
import static org.apache.dolphinscheduler.common.Constants.SEMICOLON; |
||||
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.DbType; |
||||
import org.apache.dolphinscheduler.common.model.JdbcInfo; |
||||
import org.apache.dolphinscheduler.common.utils.StringUtils; |
||||
|
||||
/** |
||||
* JdbcUrlParser |
||||
*/ |
||||
public class JdbcUrlParser { |
||||
|
||||
private JdbcUrlParser() { |
||||
throw new IllegalStateException("Utility class"); |
||||
} |
||||
|
||||
public static DbType getDbType(String datasourceType) { |
||||
switch (datasourceType.toUpperCase()) { |
||||
case MYSQL: |
||||
return DbType.MYSQL; |
||||
case POSTGRESQL: |
||||
return DbType.POSTGRESQL; |
||||
default: |
||||
return null; |
||||
} |
||||
} |
||||
|
||||
public static JdbcInfo getJdbcInfo(String jdbcUrl) { |
||||
|
||||
JdbcInfo jdbcInfo = new JdbcInfo(); |
||||
|
||||
int pos; |
||||
int pos1; |
||||
int pos2; |
||||
String tempUri; |
||||
|
||||
if (jdbcUrl == null || !jdbcUrl.startsWith("jdbc:") || (pos1 = jdbcUrl.indexOf(COLON, 5)) == -1) { |
||||
return null; |
||||
} |
||||
|
||||
String driverName = jdbcUrl.substring(5, pos1); |
||||
String params = ""; |
||||
String host = ""; |
||||
String database = ""; |
||||
String port = ""; |
||||
if (((pos2 = jdbcUrl.indexOf(SEMICOLON, pos1)) == -1) && ((pos2 = jdbcUrl.indexOf(QUESTION, pos1)) == -1)) { |
||||
tempUri = jdbcUrl.substring(pos1 + 1); |
||||
} else { |
||||
tempUri = jdbcUrl.substring(pos1 + 1, pos2); |
||||
params = jdbcUrl.substring(pos2 + 1); |
||||
} |
||||
|
||||
if (tempUri.startsWith(DOUBLE_SLASH)) { |
||||
if ((pos = tempUri.indexOf(SINGLE_SLASH, 2)) != -1) { |
||||
host = tempUri.substring(2, pos); |
||||
database = tempUri.substring(pos + 1); |
||||
|
||||
if ((pos = host.indexOf(COLON)) != -1) { |
||||
port = host.substring(pos + 1); |
||||
host = host.substring(0, pos); |
||||
} |
||||
} |
||||
} else { |
||||
database = tempUri; |
||||
} |
||||
|
||||
if (StringUtils.isEmpty(database)) { |
||||
return null; |
||||
} |
||||
|
||||
if (database.contains(QUESTION)) { |
||||
database = database.substring(0, database.indexOf(QUESTION)); |
||||
} |
||||
|
||||
if (database.contains(SEMICOLON)) { |
||||
database = database.substring(0, database.indexOf(SEMICOLON)); |
||||
} |
||||
|
||||
jdbcInfo.setDriverName(driverName); |
||||
jdbcInfo.setHost(host); |
||||
jdbcInfo.setPort(port); |
||||
jdbcInfo.setDatabase(database); |
||||
jdbcInfo.setParams(params); |
||||
jdbcInfo.setAddress("jdbc:" + driverName + "://" + host + COLON + port); |
||||
|
||||
return jdbcInfo; |
||||
} |
||||
} |
@ -0,0 +1,34 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.worker.task.dq.rule.parameter; |
||||
|
||||
import java.util.Map; |
||||
|
||||
/** |
||||
* EnvConfig |
||||
*/ |
||||
public class EnvConfig extends BaseConfig { |
||||
|
||||
public EnvConfig() { |
||||
} |
||||
|
||||
public EnvConfig(String type, Map<String,Object> config) { |
||||
super(type,config); |
||||
} |
||||
|
||||
} |
@ -1,68 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.worker.task.dq.rule.parameter; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
|
||||
/** |
||||
* ExecutorParameter |
||||
*/ |
||||
public class ExecutorParameter { |
||||
|
||||
@JsonProperty("index") |
||||
private String index; |
||||
|
||||
@JsonProperty("execute.sql") |
||||
private String executeSql; |
||||
|
||||
@JsonProperty("table.alias") |
||||
private String tableAlias; |
||||
|
||||
public ExecutorParameter() { |
||||
} |
||||
|
||||
public ExecutorParameter(String index, String executeSql, String tableAlias) { |
||||
this.index = index; |
||||
this.executeSql = executeSql; |
||||
this.tableAlias = tableAlias; |
||||
} |
||||
|
||||
public String getIndex() { |
||||
return index; |
||||
} |
||||
|
||||
public void setIndex(String index) { |
||||
this.index = index; |
||||
} |
||||
|
||||
public String getExecuteSql() { |
||||
return executeSql; |
||||
} |
||||
|
||||
public void setExecuteSql(String executeSql) { |
||||
this.executeSql = executeSql; |
||||
} |
||||
|
||||
public String getTableAlias() { |
||||
return tableAlias; |
||||
} |
||||
|
||||
public void setTableAlias(String tableAlias) { |
||||
this.tableAlias = tableAlias; |
||||
} |
||||
} |
@ -0,0 +1,44 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.utils; |
||||
|
||||
import org.apache.dolphinscheduler.common.model.JdbcInfo; |
||||
|
||||
import org.junit.Assert; |
||||
import org.junit.Test; |
||||
|
||||
/** |
||||
* JdbcUrlParserTest |
||||
*/ |
||||
public class JdbcUrlParserTest { |
||||
|
||||
@Test |
||||
public void testGetJdbcInfo() { |
||||
JdbcInfo jdbcInfo = |
||||
JdbcUrlParser.getJdbcInfo("jdbc:mysql://localhost:3306/dolphinscheduler?" |
||||
+ "useUnicode=true&characterEncoding=UTF-8"); |
||||
if (jdbcInfo != null) { |
||||
String jdbcInfoStr = jdbcInfo.toString(); |
||||
String expected = "JdbcInfo{host='localhost', port='3306', " |
||||
+ "driverName='mysql', database='dolphinscheduler', " |
||||
+ "params='useUnicode=true&characterEncoding=UTF-8', " |
||||
+ "address='jdbc:mysql://localhost:3306'}"; |
||||
Assert.assertEquals(expected,jdbcInfoStr); |
||||
} |
||||
} |
||||
} |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue