Browse Source

[cherry-pick-2.0.3]Cherry pick to 2.0.3 (#8041)

* [cherry-pick-2.0.3]Correct typo in English README #7695

* Update NOTICE year (#7784)

* [BUG] Repair JDBC connection of Oracle (#7883)

* [cherry-pick-2.0.3][Fix-7538] [server] Fix when there is a forbidden node in dag, the execution flow is abnormal #7613

* [cherry-pick-2.0.3][Fix-7825] Remedy the value of create time and update time to be current time when importing a process json file. #7828

* [FIX-7732][fix] fix column 'is_directory' of table `t_ds_resources` type error in PG database (#7898)

Fix column 'is_directory' of table t_ds_resources type error in PG database
This closes #7732

Co-authored-by: Brennan Fox <brnnnfx@users.noreply.github.com>
Co-authored-by: Kirs <acm_master@163.com>
Co-authored-by: X&Z <980813351@qq.com>
Co-authored-by: caishunfeng <534328519@qq.com>
Co-authored-by: 天仇 <532066967@qq.com>
2.0.3-release
wind 2 years ago committed by GitHub
parent
commit
1f0aeabb92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      NOTICE
  2. 10
      README.md
  3. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  4. 455
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java
  5. 2
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
  6. 4
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDatasourceProcessor.java
  7. 10
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDatasourceProcessorTest.java
  8. 2
      dolphinscheduler-dist/release-docs/NOTICE
  9. 59
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

2
NOTICE

@ -1,5 +1,5 @@
Apache DolphinScheduler
Copyright 2019-2021 The Apache Software Foundation
Copyright 2019-2022 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).

10
README.md

@ -41,7 +41,7 @@ Its main objectives are as follows:
## What's in DolphinScheduler
Stability | Accessibility | Features | Scalability |
-- | -- | -- | --
--------- | ------------- | -------- | ------------|
Decentralized multi-master and multi-worker | Visualization of workflow key information, such as task status, task type, retry times, task operation machine information, visual variables, and so on at a glance.  |  Support pause, recover operation | Support customized task types
support HA | Visualization of all workflow operations, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, provide API mode operations. | Users on DolphinScheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. | The scheduler supports distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic adjustment.
Overload processing: By using the task queue mechanism, the number of schedulable tasks on a single machine can be flexibly configured. Machine jam can be avoided with high tolerance to numbers of tasks cached in task queue. | One-click deployment | Support traditional shell tasks, and big data platform task scheduling: MR, Spark, SQL (MySQL, PostgreSQL, hive, spark SQL), Python, Procedure, Sub_Process | |
@ -59,11 +59,11 @@ Overload processing: By using the task queue mechanism, the number of schedulabl
## QuickStart in Docker
Please referer the official website document: [QuickStart in Docker](https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/docker-deployment.html)
Please refer the official website document: [QuickStart in Docker](https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/installation/docker.html)
## QuickStart in Kubernetes
Please referer the official website document: [QuickStart in Kubernetes](https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/kubernetes-deployment.html)
Please refer to the official website document: [QuickStart in Kubernetes](https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/installation/kubernetes.html)
## How to Build
@ -95,8 +95,8 @@ You are very welcome to communicate with the developers and users of Dolphin Sch
2. Follow the [Twitter account of DolphinScheduler](https://twitter.com/dolphinschedule) and get the latest news on time.
### Contributor over time
[![Contributor over time](https://contributor-graph-api.apiseven.com/contributors-svg?chart=contributorOverTime&repo=apache/dolphinscheduler)](https://www.apiseven.com/en/contributor-graph?chart=contributorOverTime&repo=apache/dolphinscheduler)
[![Contributor over time](https://contributor-graph-api.apiseven.com/contributors-svg?chart=contributorOverTime&repo=apache/dolphinscheduler)](https://www.apiseven.com/en/contributor-graph?chart=contributorOverTime&repo=apache/dolphinscheduler)
## How to Contribute

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -951,6 +951,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
processDefinition.setLocations(newArrayNode.toString());
}
processDefinition.setCreateTime(new Date());
processDefinition.setUpdateTime(new Date());
Map<String, Object> createDagResult = createDagDefine(loginUser, taskRelationLogList, processDefinition, Lists.newArrayList());
if (Status.SUCCESS.equals(createDagResult.get(Constants.STATUS))) {
putMsg(createDagResult, Status.SUCCESS);

455
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java

@ -28,219 +28,208 @@ import com.fasterxml.jackson.annotation.JsonFormat;
@TableName("t_ds_resources")
public class Resource {
/**
* id
*/
@TableId(value="id", type=IdType.AUTO)
private int id;
/**
* parent id
*/
private int pid;
/**
* resource alias
*/
private String alias;
/**
* full name
*/
private String fullName;
/**
* is directory
*/
private boolean isDirectory=false;
/**
* description
*/
private String description;
/**
* file alias
*/
private String fileName;
/**
* user id
*/
private int userId;
/**
* resource type
*/
private ResourceType type;
/**
* resource size
*/
private long size;
/**
* create time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
private Date createTime;
/**
* update time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
private Date updateTime;
public Resource() {
}
public Resource(int id, String alias, String fileName, String description, int userId,
ResourceType type, long size,
Date createTime, Date updateTime) {
this.id = id;
this.alias = alias;
this.fileName = fileName;
this.description = description;
this.userId = userId;
this.type = type;
this.size = size;
this.createTime = createTime;
this.updateTime = updateTime;
}
public Resource(int id, int pid, String alias, String fullName, boolean isDirectory) {
this.id = id;
this.pid = pid;
this.alias = alias;
this.fullName = fullName;
this.isDirectory = isDirectory;
}
/*public Resource(String alias, String fileName, String description, int userId, ResourceType type, long size, Date createTime, Date updateTime) {
this.alias = alias;
this.fileName = fileName;
this.description = description;
this.userId = userId;
this.type = type;
this.size = size;
this.createTime = createTime;
this.updateTime = updateTime;
}*/
public Resource(int pid, String alias, String fullName, boolean isDirectory, String description, String fileName, int userId, ResourceType type, long size, Date createTime, Date updateTime) {
this.pid = pid;
this.alias = alias;
this.fullName = fullName;
this.isDirectory = isDirectory;
this.description = description;
this.fileName = fileName;
this.userId = userId;
this.type = type;
this.size = size;
this.createTime = createTime;
this.updateTime = updateTime;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getAlias() {
return alias;
}
public void setAlias(String alias) {
this.alias = alias;
}
public int getPid() {
return pid;
}
public void setPid(int pid) {
this.pid = pid;
}
public String getFullName() {
return fullName;
}
public void setFullName(String fullName) {
this.fullName = fullName;
}
public boolean isDirectory() {
return isDirectory;
}
public void setDirectory(boolean directory) {
isDirectory = directory;
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public ResourceType getType() {
return type;
}
public void setType(ResourceType type) {
this.type = type;
}
public long getSize() {
return size;
}
public void setSize(long size) {
this.size = size;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
/**
* id
*/
@TableId(value = "id", type = IdType.AUTO)
private int id;
/**
* parent id
*/
private int pid;
/**
* resource alias
*/
private String alias;
/**
* full name
*/
private String fullName;
/**
* is directory
*/
private boolean isDirectory = false;
/**
* description
*/
private String description;
/**
* file alias
*/
private String fileName;
/**
* user id
*/
private int userId;
/**
* resource type
*/
private ResourceType type;
/**
* resource size
*/
private long size;
/**
* create time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date createTime;
/**
* update time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date updateTime;
public Resource() {
}
public Resource(int id, String alias, String fileName, String description, int userId,
ResourceType type, long size,
Date createTime, Date updateTime) {
this.id = id;
this.alias = alias;
this.fileName = fileName;
this.description = description;
this.userId = userId;
this.type = type;
this.size = size;
this.createTime = createTime;
this.updateTime = updateTime;
}
public Resource(int id, int pid, String alias, String fullName, boolean isDirectory) {
this.id = id;
this.pid = pid;
this.alias = alias;
this.fullName = fullName;
this.isDirectory = isDirectory;
}
public Resource(int pid, String alias, String fullName, boolean isDirectory, String description, String fileName, int userId, ResourceType type, long size, Date createTime, Date updateTime) {
this.pid = pid;
this.alias = alias;
this.fullName = fullName;
this.isDirectory = isDirectory;
this.description = description;
this.fileName = fileName;
this.userId = userId;
this.type = type;
this.size = size;
this.createTime = createTime;
this.updateTime = updateTime;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getAlias() {
return alias;
}
public void setAlias(String alias) {
this.alias = alias;
}
public int getPid() {
return pid;
}
public void setPid(int pid) {
this.pid = pid;
}
public String getFullName() {
return fullName;
}
public void setFullName(String fullName) {
this.fullName = fullName;
}
public boolean isDirectory() {
return isDirectory;
}
public void setDirectory(boolean directory) {
isDirectory = directory;
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
@Override
public String toString() {
return "Resource{" +
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public ResourceType getType() {
return type;
}
public void setType(ResourceType type) {
this.type = type;
}
public long getSize() {
return size;
}
public void setSize(long size) {
this.size = size;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
@Override
public String toString() {
return "Resource{" +
"id=" + id +
", pid=" + pid +
", alias='" + alias + '\'' +
@ -254,30 +243,30 @@ public class Resource {
", createTime=" + createTime +
", updateTime=" + updateTime +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Resource resource = (Resource) o;
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (id != resource.id) {
return false;
}
return alias.equals(resource.alias);
Resource resource = (Resource) o;
}
if (id != resource.id) {
return false;
}
return alias.equals(resource.alias);
@Override
public int hashCode() {
int result = id;
result = 31 * result + alias.hashCode();
return result;
}
}
@Override
public int hashCode() {
int result = id;
result = 31 * result + alias.hashCode();
return result;
}
}

2
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@ -640,7 +640,7 @@ CREATE TABLE t_ds_resources (
update_time timestamp DEFAULT NULL ,
pid int,
full_name varchar(64),
is_directory int,
is_directory boolean DEFAULT FALSE,
PRIMARY KEY (id),
CONSTRAINT t_ds_resources_un UNIQUE (full_name, type)
) ;

4
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDatasourceProcessor.java

@ -65,14 +65,16 @@ public class OracleDatasourceProcessor extends AbstractDatasourceProcessor {
public BaseConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam) {
OracleDatasourceParamDTO oracleParam = (OracleDatasourceParamDTO) datasourceParam;
String address;
String jdbcUrl;
if (DbConnectType.ORACLE_SID.equals(oracleParam.getConnectType())) {
address = String.format("%s%s:%s",
Constants.JDBC_ORACLE_SID, oracleParam.getHost(), oracleParam.getPort());
jdbcUrl = address + ":" + oracleParam.getDatabase();
} else {
address = String.format("%s%s:%s",
Constants.JDBC_ORACLE_SERVICE_NAME, oracleParam.getHost(), oracleParam.getPort());
jdbcUrl = address + "/" + oracleParam.getDatabase();
}
String jdbcUrl = address + "/" + oracleParam.getDatabase();
OracleConnectionParam oracleConnectionParam = new OracleConnectionParam();
oracleConnectionParam.setUser(oracleParam.getUserName());

10
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDatasourceProcessorTest.java

@ -61,13 +61,13 @@ public class OracleDatasourceProcessorTest {
.createConnectionParams(oracleDatasourceParamDTO);
Assert.assertNotNull(connectionParams);
Assert.assertEquals("jdbc:oracle:thin:@localhost:3308", connectionParams.getAddress());
Assert.assertEquals("jdbc:oracle:thin:@localhost:3308/default", connectionParams.getJdbcUrl());
Assert.assertEquals("jdbc:oracle:thin:@localhost:3308:default", connectionParams.getJdbcUrl());
}
@Test
public void testCreateConnectionParams2() {
String connectionJson = "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:oracle:thin:@localhost:3308\""
+ ",\"database\":\"default\",\"jdbcUrl\":\"jdbc:oracle:thin:@localhost:3308/default\",\"connectType\":\"ORACLE_SID\"}";
+ ",\"database\":\"default\",\"jdbcUrl\":\"jdbc:oracle:thin:@localhost:3308:default\",\"connectType\":\"ORACLE_SID\"}";
OracleConnectionParam connectionParams = (OracleConnectionParam) oracleDatasourceProcessor
.createConnectionParams(connectionJson);
Assert.assertNotNull(connectionParams);
@ -82,9 +82,9 @@ public class OracleDatasourceProcessorTest {
@Test
public void testGetJdbcUrl() {
OracleConnectionParam oracleConnectionParam = new OracleConnectionParam();
oracleConnectionParam.setJdbcUrl("jdbc:oracle:thin:@localhost:3308/default");
oracleConnectionParam.setJdbcUrl("jdbc:oracle:thin:@localhost:3308:default");
oracleConnectionParam.setOther("other=other");
Assert.assertEquals("jdbc:oracle:thin:@localhost:3308/default?other=other",
Assert.assertEquals("jdbc:oracle:thin:@localhost:3308:default?other=other",
oracleDatasourceProcessor.getJdbcUrl(oracleConnectionParam));
}
@ -97,4 +97,4 @@ public class OracleDatasourceProcessorTest {
public void testGetValidationQuery() {
Assert.assertEquals(Constants.ORACLE_VALIDATION_QUERY, oracleDatasourceProcessor.getValidationQuery());
}
}
}

2
dolphinscheduler-dist/release-docs/NOTICE vendored

@ -1,5 +1,5 @@
Apache DolphinScheduler
Copyright 2019-2021 The Apache Software Foundation
Copyright 2019-2022 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).

59
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -920,33 +920,50 @@ public class WorkflowExecuteThread implements Runnable {
return DependResult.SUCCESS;
}
TaskNode taskNode = dag.getNode(taskCode);
List<String> depCodeList = taskNode.getDepList();
for (String depsNode : depCodeList) {
if (!dag.containsNode(depsNode)
|| forbiddenTaskList.containsKey(depsNode)
|| skipTaskNodeList.containsKey(depsNode)) {
continue;
}
// dependencies must be fully completed
if (!completeTaskList.containsKey(depsNode)) {
return DependResult.WAITING;
}
ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState();
if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
return DependResult.NON_EXEC;
}
// ignore task state if current task is condition
if (taskNode.isConditionsTask()) {
continue;
}
if (!dependTaskSuccess(depsNode, taskCode)) {
return DependResult.FAILED;
List<String> indirectDepCodeList = new ArrayList<>();
setIndirectDepList(taskCode, indirectDepCodeList);
for (String depsNode : indirectDepCodeList) {
if (dag.containsNode(depsNode) && !skipTaskNodeList.containsKey(depsNode)) {
// dependencies must be fully completed
if (!completeTaskList.containsKey(depsNode)) {
return DependResult.WAITING;
}
ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState();
if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
return DependResult.NON_EXEC;
}
// ignore task state if current task is condition
if (taskNode.isConditionsTask()) {
continue;
}
if (!dependTaskSuccess(depsNode, taskCode)) {
return DependResult.FAILED;
}
}
}
logger.info("taskCode: {} completeDependTaskList: {}", taskCode, Arrays.toString(completeTaskList.keySet().toArray()));
return DependResult.SUCCESS;
}
/**
* This function is specially used to handle the dependency situation where the parent node is a prohibited node.
* When the parent node is a forbidden node, the dependency relationship should continue to be traced
*
* @param taskCode taskCode
* @param indirectDepCodeList All indirectly dependent nodes
*/
private void setIndirectDepList(String taskCode, List<String> indirectDepCodeList) {
TaskNode taskNode = dag.getNode(taskCode);
List<String> depCodeList = taskNode.getDepList();
for (String depsNode : depCodeList) {
if (forbiddenTaskList.containsKey(depsNode)) {
setIndirectDepList(depsNode, indirectDepCodeList);
} else {
indirectDepCodeList.add(depsNode);
}
}
}
/**
* depend node is completed, but here need check the condition task branch is the next node
*/

Loading…
Cancel
Save