Browse Source

[cherry-pick][api] rewrite code generate,fix bit shift (#6937)

* [Fix][Common] rewrite code generate,fix bit shift (#6914)

* rewrite code generate,fix bit shift

* fix ut

* add algorithm from licenses file

* add algorithm from licenses file

* add algorithm from licenses file

* add algorithm from licenses file

* add algorithm from licenses file

* fix ut
# Conflicts:
#	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
#	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
#	dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
#	dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
#	dolphinscheduler-dist/release-docs/LICENSE
#	dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
#	dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

* [Fix][Common] rewrite code generate,fix bit shift (#6914)

* rewrite code generate,fix bit shift

* fix ut

* add algorithm from licenses file

* add algorithm from licenses file

* add algorithm from licenses file

* add algorithm from licenses file

* add algorithm from licenses file

* fix ut
# Conflicts:
#	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
#	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
#	dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
#	dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
#	dolphinscheduler-dist/release-docs/LICENSE
#	dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
#	dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

* [Fix][Common] rewrite code generate,fix bit shift (#6914)

* rewrite code generate,fix bit shift

* fix ut

* add algorithm from licenses file

* add algorithm from licenses file

* add algorithm from licenses file

* add algorithm from licenses file

* add algorithm from licenses file

* fix ut
# Conflicts:
#	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
#	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
#	dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
#	dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
#	dolphinscheduler-dist/release-docs/LICENSE
#	dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
#	dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

* [Fix][Common] rewrite code generate,fix bit shift (#6914)

* rewrite code generate,fix bit shift

* fix ut

* add algorithm from licenses file

* add algorithm from licenses file

* add algorithm from licenses file

* add algorithm from licenses file

* add algorithm from licenses file

* fix ut
# Conflicts:
#	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
#	dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
#	dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
#	dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
#	dolphinscheduler-dist/release-docs/LICENSE
#	dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
#	dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

Co-authored-by: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com>
2.0.1-release
OS 3 years ago committed by GitHub
parent
commit
adaeb962ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      .licenserc.yaml
  2. 2
      LICENSE
  3. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
  4. 20
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  5. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
  6. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  7. 74
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java
  8. 94
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtils.java
  9. 19
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java
  10. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
  11. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java
  12. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
  13. 2
      dolphinscheduler-dist/release-docs/LICENSE
  14. 11
      dolphinscheduler-dist/release-docs/licenses/LICENSE-snowflake.txt
  15. 374
      dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
  16. 138
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  17. 11
      licenses/LICENSE-snowflake.txt

1
.licenserc.yaml

@ -26,6 +26,7 @@ header:
- LICENSE
- DISCLAIMER
- dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java
- dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java
- mvnw.cmd
- sql/soft_version
- .mvn

2
LICENSE

@ -219,4 +219,4 @@ The text of each license is the standard Apache 2.0 license.
DolphinPluginClassLoader from https://github.com/prestosql/presto Apache 2.0
DolphinPluginDiscovery from https://github.com/prestosql/presto Apache 2.0
DolphinPluginLoader from https://github.com/prestosql/presto Apache 2.0
CodeGenerateUtils from https://github.com/twitter-archive/snowflake/tree/snowflake-2010 Apache 2.0

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java

@ -23,10 +23,10 @@ import org.apache.dolphinscheduler.api.service.EnvironmentService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@ -115,9 +115,9 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
env.setUpdateTime(new Date());
long code = 0L;
try {
code = SnowFlakeUtils.getInstance().nextId();
code = CodeGenerateUtils.getInstance().genCode();
env.setCode(code);
} catch (SnowFlakeException e) {
} catch (CodeGenerateException e) {
logger.error("Environment code get error, ", e);
}
if (code == 0L) {

20
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -39,10 +39,10 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
@ -220,8 +220,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
long processDefinitionCode;
try {
processDefinitionCode = SnowFlakeUtils.getInstance().nextId();
} catch (SnowFlakeException e) {
processDefinitionCode = CodeGenerateUtils.getInstance().genCode();
} catch (CodeGenerateException e) {
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
return result;
}
@ -868,8 +868,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.setProjectCode(projectCode);
processDefinition.setUserId(loginUser.getId());
try {
processDefinition.setCode(SnowFlakeUtils.getInstance().nextId());
} catch (SnowFlakeException e) {
processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
} catch (CodeGenerateException e) {
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
return false;
}
@ -888,10 +888,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
taskDefinitionLog.setOperator(loginUser.getId());
taskDefinitionLog.setOperateTime(now);
try {
long code = SnowFlakeUtils.getInstance().nextId();
long code = CodeGenerateUtils.getInstance().genCode();
taskCodeMap.put(taskDefinitionLog.getCode(), code);
taskDefinitionLog.setCode(code);
} catch (SnowFlakeException e) {
} catch (CodeGenerateException e) {
logger.error("Task code get error, ", e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
return false;
@ -1357,8 +1357,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.setProjectCode(targetProjectCode);
if (isCopy) {
try {
processDefinition.setCode(SnowFlakeUtils.getInstance().nextId());
} catch (SnowFlakeException e) {
processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
} catch (CodeGenerateException e) {
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
}

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java

@ -25,8 +25,8 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
@ -97,14 +97,14 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
project = Project
.newBuilder()
.name(name)
.code(SnowFlakeUtils.getInstance().nextId())
.code(CodeGenerateUtils.getInstance().genCode())
.description(desc)
.userId(loginUser.getId())
.userName(loginUser.getUserName())
.createTime(now)
.updateTime(now)
.build();
} catch (SnowFlakeException e) {
} catch (CodeGenerateException e) {
putMsg(result, Status.CREATE_PROJECT_ERROR);
return result;
}

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -25,9 +25,9 @@ import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@ -416,9 +416,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
List<Long> taskCodes = new ArrayList<>();
try {
for (int i = 0; i < genNum; i++) {
taskCodes.add(SnowFlakeUtils.getInstance().nextId());
taskCodes.add(CodeGenerateUtils.getInstance().genCode());
}
} catch (SnowFlakeException e) {
} catch (CodeGenerateException e) {
logger.error("Task code get error, ", e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
}

74
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java

@ -0,0 +1,74 @@
/** Copyright 2010-2012 Twitter, Inc.*/
package org.apache.dolphinscheduler.common.utils;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Objects;
/**
* Rewriting based on Twitter snowflake algorithm
*/
public class CodeGenerateUtils {
// start timestamp
private static final long START_TIMESTAMP = 1609430400000L; //2021-01-01 00:00:00
// Each machine generates 32 in the same millisecond
private static final long LOW_DIGIT_BIT = 5L;
private static final long MIDDLE_BIT = 2L;
private static final long MAX_LOW_DIGIT = ~(-1L << LOW_DIGIT_BIT);
// The displacement to the left
private static final long MIDDLE_LEFT = LOW_DIGIT_BIT;
private static final long HIGH_DIGIT_LEFT = LOW_DIGIT_BIT + MIDDLE_BIT;
private final long machineHash;
private long lowDigit = 0L;
private long recordMillisecond = -1L;
private static final long SYSTEM_TIMESTAMP = System.currentTimeMillis();
private static final long SYSTEM_NANOTIME = System.nanoTime();
private CodeGenerateUtils() throws CodeGenerateException {
try {
this.machineHash = Math.abs(Objects.hash(InetAddress.getLocalHost().getHostName())) % (2 << (MIDDLE_BIT - 1));
} catch (UnknownHostException e) {
throw new CodeGenerateException(e.getMessage());
}
}
private static CodeGenerateUtils instance = null;
public static synchronized CodeGenerateUtils getInstance() throws CodeGenerateException {
if (instance == null) {
instance = new CodeGenerateUtils();
}
return instance;
}
public synchronized long genCode() throws CodeGenerateException {
long nowtMillisecond = systemMillisecond();
if (nowtMillisecond < recordMillisecond) {
throw new CodeGenerateException("New code exception because time is set back.");
}
if (nowtMillisecond == recordMillisecond) {
lowDigit = (lowDigit + 1) & MAX_LOW_DIGIT;
if (lowDigit == 0L) {
while (nowtMillisecond <= recordMillisecond) {
nowtMillisecond = systemMillisecond();
}
}
} else {
lowDigit = 0L;
}
recordMillisecond = nowtMillisecond;
return (nowtMillisecond - START_TIMESTAMP) << HIGH_DIGIT_LEFT | machineHash << MIDDLE_LEFT | lowDigit;
}
private long systemMillisecond() {
return SYSTEM_TIMESTAMP + (System.nanoTime() - SYSTEM_NANOTIME) / 1000000;
}
public static class CodeGenerateException extends Exception {
public CodeGenerateException(String message) {
super(message);
}
}
}

94
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtils.java

@ -1,94 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Objects;
public class SnowFlakeUtils {
// start timestamp
private static final long START_TIMESTAMP = 1609430400000L; //2021-01-01 00:00:00
// Each machine generates 32 in the same millisecond
private static final long SEQUENCE_BIT = 5;
private static final long MACHINE_BIT = 2;
private static final long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT);
// The displacement to the left
private static final long MACHINE_LEFT = SEQUENCE_BIT + MACHINE_BIT;
private static final long TIMESTAMP_LEFT = SEQUENCE_BIT + MACHINE_BIT + MACHINE_LEFT;
private final int machineId;
private long sequence = 0L;
private long lastTimestamp = -1L;
private static final long SYSTEM_TIMESTAMP = System.currentTimeMillis();
private static final long SYSTEM_NANOTIME = System.nanoTime();
private SnowFlakeUtils() throws SnowFlakeException {
try {
this.machineId = Math.abs(Objects.hash(InetAddress.getLocalHost().getHostName())) % 4;
} catch (UnknownHostException e) {
throw new SnowFlakeException(e.getMessage());
}
}
private static SnowFlakeUtils instance = null;
public static synchronized SnowFlakeUtils getInstance() throws SnowFlakeException {
if (instance == null) {
instance = new SnowFlakeUtils();
}
return instance;
}
public synchronized long nextId() throws SnowFlakeException {
long currStmp = nowTimestamp();
if (currStmp < lastTimestamp) {
throw new SnowFlakeException("Clock moved backwards. Refusing to generate id");
}
if (currStmp == lastTimestamp) {
sequence = (sequence + 1) & MAX_SEQUENCE;
if (sequence == 0L) {
currStmp = getNextMill();
}
} else {
sequence = 0L;
}
lastTimestamp = currStmp;
return (currStmp - START_TIMESTAMP) << TIMESTAMP_LEFT
| machineId << MACHINE_LEFT
| sequence;
}
private long getNextMill() {
long mill = nowTimestamp();
while (mill <= lastTimestamp) {
mill = nowTimestamp();
}
return mill;
}
private long nowTimestamp() {
return SYSTEM_TIMESTAMP + (System.nanoTime() - SYSTEM_NANOTIME) / 1000000;
}
public static class SnowFlakeException extends Exception {
public SnowFlakeException(String message) {
super(message);
}
}
}

19
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtilsTest.java → dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtilsTest.java

@ -17,18 +17,19 @@
package org.apache.dolphinscheduler.common.utils;
import java.util.HashSet;
import org.junit.Assert;
import org.junit.Test;
public class SnowFlakeUtilsTest {
public class CodeGenerateUtilsTest {
@Test
public void testNextId() {
try {
for (int i = 0; i < 100; i++) {
Thread.sleep(1);
System.out.println(SnowFlakeUtils.getInstance().nextId());
}
} catch (Exception e) {
e.printStackTrace();
public void testNoGenerateDuplicateCode() throws CodeGenerateUtils.CodeGenerateException {
HashSet<Long> existsCode = new HashSet<>();
for (int i = 0; i < 100; i++) {
Long currentCode = CodeGenerateUtils.getInstance().genCode();
Assert.assertFalse(existsCode.contains(currentCode));
existsCode.add(currentCode);
}
}
}

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java

@ -20,8 +20,8 @@ package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import java.sql.Connection;
@ -110,7 +110,7 @@ public class ProcessDefinitionDao {
processDefinition.setId(rs.getInt(1));
long code = rs.getLong(2);
if (code == 0L) {
code = SnowFlakeUtils.getInstance().nextId();
code = CodeGenerateUtils.getInstance().genCode();
}
processDefinition.setCode(code);
processDefinition.setVersion(Constants.VERSION_FIRST);

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java

@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
@ -51,7 +51,7 @@ public class ProjectDao {
Integer id = rs.getInt(1);
long code = rs.getLong(2);
if (code == 0L) {
code = SnowFlakeUtils.getInstance().nextId();
code = CodeGenerateUtils.getInstance().genCode();
}
projectMap.put(id, code);
}

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java

@ -25,12 +25,12 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.SchemaUtils;
import org.apache.dolphinscheduler.common.utils.ScriptRunner;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.dao.AbstractBaseDao;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@ -684,8 +684,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
String name = task.get("name").asText();
taskDefinitionLog.setName(name);
taskDefinitionLog.setWorkerGroup(task.get("workerGroup").asText());
long taskCode = SnowFlakeUtils.getInstance().nextId();
// System.out.println(taskCode);
long taskCode = CodeGenerateUtils.getInstance().genCode();
taskDefinitionLog.setCode(taskCode);
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
taskDefinitionLog.setProjectCode(processDefinition.getProjectCode());

2
dolphinscheduler-dist/release-docs/LICENSE vendored

@ -417,6 +417,8 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
protostuff-runtime 1.7.2: https://github.com/protostuff/protostuff/protostuff-core Apache-2.0
protostuff-api 1.7.2: https://github.com/protostuff/protostuff/protostuff-api Apache-2.0
protostuff-collectionschema 1.7.2: https://github.com/protostuff/protostuff/protostuff-collectionschema Apache-2.0
snowflake snowflake-2010: https://github.com/twitter-archive/snowflake/tree/snowflake-2010, Apache 2.0
========================================================================
BSD licenses
========================================================================

11
dolphinscheduler-dist/release-docs/licenses/LICENSE-snowflake.txt vendored

@ -0,0 +1,11 @@
Copyright 2010-2012 Twitter, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.

374
dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java

@ -0,0 +1,374 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.QueueService;
import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.service.TenantService;
import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import py4j.GatewayServer;
@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
@ComponentScan.Filter(type = FilterType.REGEX, pattern = {
"org.apache.dolphinscheduler.server.master.*",
"org.apache.dolphinscheduler.server.worker.*",
"org.apache.dolphinscheduler.server.monitor.*",
"org.apache.dolphinscheduler.server.log.*",
"org.apache.dolphinscheduler.alert.*"
})
})
public class PythonGatewayServer extends SpringBootServletInitializer {
private static final Logger LOGGER = LoggerFactory.getLogger(PythonGatewayServer.class);
private static final WarningType DEFAULT_WARNING_TYPE = WarningType.NONE;
private static final int DEFAULT_WARNING_GROUP_ID = 0;
private static final FailureStrategy DEFAULT_FAILURE_STRATEGY = FailureStrategy.CONTINUE;
private static final Priority DEFAULT_PRIORITY = Priority.MEDIUM;
private static final Long DEFAULT_ENVIRONMENT_CODE = -1L;
private static final TaskDependType DEFAULT_TASK_DEPEND_TYPE = TaskDependType.TASK_POST;
private static final RunMode DEFAULT_RUN_MODE = RunMode.RUN_MODE_SERIAL;
private static final int DEFAULT_DRY_RUN = 0;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@Autowired
private ProjectService projectService;
@Autowired
private TenantService tenantService;
@Autowired
private ExecutorService executorService;
@Autowired
private ProcessDefinitionService processDefinitionService;
@Autowired
private TaskDefinitionService taskDefinitionService;
@Autowired
private UsersService usersService;
@Autowired
private QueueService queueService;
@Autowired
private ProjectMapper projectMapper;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@Autowired
private SchedulerService schedulerService;
@Autowired
private ScheduleMapper scheduleMapper;
// TODO replace this user to build in admin user if we make sure build in one could not be change
private final User dummyAdminUser = new User() {
{
setId(Integer.MAX_VALUE);
setUserName("dummyUser");
setUserType(UserType.ADMIN_USER);
}
};
private final Queue queuePythonGateway = new Queue() {
{
setId(Integer.MAX_VALUE);
setQueueName("queuePythonGateway");
}
};
public String ping() {
return "PONG";
}
// TODO Should we import package in python client side? utils package can but service can not, why
// Core api
public Map<String, Object> genTaskCodeList(Integer genNum) {
return taskDefinitionService.genTaskCodeList(genNum);
}
public Map<String, Long> getCodeAndVersion(String projectName, String taskName) throws CodeGenerateUtils.CodeGenerateException {
Project project = projectMapper.queryByName(projectName);
Map<String, Long> result = new HashMap<>();
// project do not exists, mean task not exists too, so we should directly return init value
if (project == null) {
result.put("code", CodeGenerateUtils.getInstance().genCode());
result.put("version", 0L);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName);
if (taskDefinition == null) {
result.put("code", CodeGenerateUtils.getInstance().genCode());
result.put("version", 0L);
} else {
result.put("code", taskDefinition.getCode());
result.put("version", (long) taskDefinition.getVersion());
}
return result;
}
/**
* create or update process definition.
* If process definition do not exists in Project=`projectCode` would create a new one
* If process definition already exists in Project=`projectCode` would update it
*
* @param userName user name who create or update process definition
* @param projectName project name which process definition belongs to
* @param name process definition name
* @param description description
* @param globalParams global params
* @param schedule schedule for process definition, will not set schedule if null,
* and if would always fresh exists schedule if not null
* @param locations locations json object about all tasks
* @param timeout timeout for process definition working, if running time longer than timeout,
* task will mark as fail
* @param workerGroup run task in which worker group
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return create result code
*/
public Long createOrUpdateProcessDefinition(String userName,
String projectName,
String name,
String description,
String globalParams,
String schedule,
String locations,
int timeout,
String workerGroup,
String tenantCode,
String taskRelationJson,
String taskDefinitionJson,
ProcessExecutionTypeEnum executionType) {
User user = usersService.queryUser(userName);
Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST);
long projectCode = project.getCode();
Map<String, Object> verifyProcessDefinitionExists = processDefinitionService.verifyProcessDefinitionName(user, projectCode, name);
Status verifyStatus = (Status) verifyProcessDefinitionExists.get(Constants.STATUS);
long processDefinitionCode;
// create or update process definition
if (verifyStatus == Status.PROCESS_DEFINITION_NAME_EXIST) {
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, name);
processDefinitionCode = processDefinition.getCode();
// make sure process definition offline which could edit
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
Map<String, Object> result = processDefinitionService.updateProcessDefinition(user, projectCode, name, processDefinitionCode, description, globalParams,
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType);
} else if (verifyStatus == Status.SUCCESS) {
Map<String, Object> result = processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams,
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType);
ProcessDefinition processDefinition = (ProcessDefinition) result.get(Constants.DATA_LIST);
processDefinitionCode = processDefinition.getCode();
} else {
String msg = "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST.";
LOGGER.error(msg);
throw new RuntimeException(msg);
}
// Fresh process definition schedule
if (schedule != null) {
createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup);
}
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
return processDefinitionCode;
}
/**
* create or update process definition schedule.
* It would always use latest schedule define in workflow-as-code, and set schedule online when
* it's not null
*
* @param user user who create or update schedule
* @param projectCode project which process definition belongs to
* @param processDefinitionCode process definition code
* @param schedule schedule expression
* @param workerGroup work group
*/
private void createOrUpdateSchedule(User user,
long projectCode,
long processDefinitionCode,
String schedule,
String workerGroup) {
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
// create or update schedule
int scheduleId;
if (schedules.isEmpty()) {
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, DEFAULT_WARNING_TYPE,
DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
scheduleId = (int) result.get("scheduleId");
} else {
scheduleId = schedules.get(0).getId();
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, DEFAULT_WARNING_TYPE,
DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
}
schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
}
public void execProcessInstance(String userName,
String projectName,
String processDefinitionName,
String cronTime,
String workerGroup,
Integer timeout
) {
User user = usersService.queryUser(userName);
Project project = projectMapper.queryByName(projectName);
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
// make sure process definition online
processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(), ReleaseState.ONLINE);
executorService.execProcessInstance(user,
project.getCode(),
processDefinition.getCode(),
cronTime,
null,
DEFAULT_FAILURE_STRATEGY,
null,
DEFAULT_TASK_DEPEND_TYPE,
DEFAULT_WARNING_TYPE,
DEFAULT_WARNING_GROUP_ID,
DEFAULT_RUN_MODE,
DEFAULT_PRIORITY,
workerGroup,
DEFAULT_ENVIRONMENT_CODE,
timeout,
null,
null,
DEFAULT_DRY_RUN
);
}
// side object
public Map<String, Object> createProject(String userName, String name, String desc) {
User user = usersService.queryUser(userName);
return projectService.createProject(user, name, desc);
}
public Map<String, Object> createQueue(String name, String queueName) {
Result<Object> verifyQueueExists = queueService.verifyQueue(name, queueName);
if (verifyQueueExists.getCode() == 0) {
return queueService.createQueue(dummyAdminUser, name, queueName);
} else {
Map<String, Object> result = new HashMap<>();
// TODO function putMsg do not work here
result.put(Constants.STATUS, Status.SUCCESS);
result.put(Constants.MSG, Status.SUCCESS.getMsg());
return result;
}
}
public Map<String, Object> createTenant(String tenantCode, String desc, String queueName) throws Exception {
if (tenantService.checkTenantExists(tenantCode)) {
Map<String, Object> result = new HashMap<>();
// TODO function putMsg do not work here
result.put(Constants.STATUS, Status.SUCCESS);
result.put(Constants.MSG, Status.SUCCESS.getMsg());
return result;
} else {
Result<Object> verifyQueueExists = queueService.verifyQueue(queueName, queueName);
if (verifyQueueExists.getCode() == 0) {
// TODO why create do not return id?
queueService.createQueue(dummyAdminUser, queueName, queueName);
}
Map<String, Object> result = queueService.queryQueueName(queueName);
List<Queue> queueList = (List<Queue>) result.get(Constants.DATA_LIST);
Queue queue = queueList.get(0);
return tenantService.createTenant(dummyAdminUser, tenantCode, queue.getId(), desc);
}
}
public void createUser(String userName,
String userPassword,
String email,
String phone,
String tenantCode,
String queue,
int state) {
User user = usersService.queryUser(userName);
if (Objects.isNull(user)) {
Map<String, Object> tenantResult = tenantService.queryByTenantCode(tenantCode);
Tenant tenant = (Tenant) tenantResult.get(Constants.DATA_LIST);
usersService.createUser(userName, userPassword, email, tenant.getId(), phone, queue, state);
}
}
@PostConstruct
public void run() {
GatewayServer server = new GatewayServer(this);
GatewayServer.turnLoggingOn();
// Start server to accept python client RPC
server.start();
}
public static void main(String[] args) {
SpringApplication.run(PythonGatewayServer.class, args);
}
}

138
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.service.process;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
@ -27,8 +28,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static java.util.stream.Collectors.toSet;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -51,12 +50,12 @@ import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData;
@ -212,6 +211,7 @@ public class ProcessService {
* @param processDefinitionCacheMaps
* @return process instance
*/
@Transactional
public ProcessInstance handleCommand(Logger logger, String host, Command command, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
ProcessInstance processInstance = constructProcessInstance(command, host, processDefinitionCacheMaps);
// cannot construct process instance, return null
@ -634,7 +634,7 @@ public class ProcessService {
startParamMap.putAll(fatherParamMap);
// set start param into global params
if (startParamMap.size() > 0
&& processDefinition.getGlobalParamMap() != null) {
&& processDefinition.getGlobalParamMap() != null) {
for (Map.Entry<String, String> param : processDefinition.getGlobalParamMap().entrySet()) {
String val = startParamMap.get(param.getKey());
if (val != null) {
@ -696,8 +696,8 @@ public class ProcessService {
private Boolean checkCmdParam(Command command, Map<String, String> cmdParam) {
if (command.getTaskDependType() == TaskDependType.TASK_ONLY || command.getTaskDependType() == TaskDependType.TASK_PRE) {
if (cmdParam == null
|| !cmdParam.containsKey(Constants.CMD_PARAM_START_NODES)
|| cmdParam.get(Constants.CMD_PARAM_START_NODES).isEmpty()) {
|| !cmdParam.containsKey(Constants.CMD_PARAM_START_NODES)
|| cmdParam.get(Constants.CMD_PARAM_START_NODES).isEmpty()) {
logger.error("command node depend type is {}, but start nodes is null ", command.getTaskDependType());
return false;
}
@ -708,9 +708,8 @@ public class ProcessService {
/**
* construct process instance according to one command.
*
* @param command command
* @param host host
* @param processDefinitionCacheMaps
* @param command command
* @param host host
* @return process instance
*/
private ProcessInstance constructProcessInstance(Command command, String host, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
@ -883,7 +882,7 @@ public class ProcessService {
}
return processDefineLogMapper.queryByDefinitionCodeAndVersion(
processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
}
}
@ -929,9 +928,9 @@ public class ProcessService {
processInstance.setScheduleTime(complementDate.get(0));
}
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
}
/**
@ -949,7 +948,7 @@ public class ProcessService {
Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
// write sub process id into cmd param.
if (paramMap.containsKey(CMD_PARAM_SUB_PROCESS)
&& CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
&& CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
paramMap.remove(CMD_PARAM_SUB_PROCESS);
paramMap.put(CMD_PARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId()));
subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap));
@ -962,7 +961,7 @@ public class ProcessService {
ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
if (parentInstance != null) {
subProcessInstance.setGlobalParams(
joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
this.saveProcessInstance(subProcessInstance);
} else {
logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam);
@ -1009,7 +1008,7 @@ public class ProcessService {
private void initTaskInstance(TaskInstance taskInstance) {
if (!taskInstance.isSubProcess()
&& (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) {
&& (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) {
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
return;
@ -1064,12 +1063,12 @@ public class ProcessService {
public TaskInstance submitTask(TaskInstance taskInstance) {
ProcessInstance processInstance = this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
logger.info("start submit task : {}, instance id:{}, state: {}",
taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
//submit to db
TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
if (task == null) {
logger.error("end submit task to db error, task name:{}, process id:{} state: {} ",
taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
return task;
}
if (!task.getState().typeIsFinished()) {
@ -1135,7 +1134,7 @@ public class ProcessService {
}
}
logger.info("sub process instance is not found,parent task:{},parent instance:{}",
parentTask.getId(), parentProcessInstance.getId());
parentTask.getId(), parentProcessInstance.getId());
return null;
}
@ -1227,21 +1226,21 @@ public class ProcessService {
String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams);
int subProcessInstanceId = childInstance == null ? 0 : childInstance.getId();
return new Command(
commandType,
TaskDependType.TASK_POST,
parentProcessInstance.getFailureStrategy(),
parentProcessInstance.getExecutorId(),
subProcessDefinition.getCode(),
processParam,
parentProcessInstance.getWarningType(),
parentProcessInstance.getWarningGroupId(),
parentProcessInstance.getScheduleTime(),
task.getWorkerGroup(),
task.getEnvironmentCode(),
parentProcessInstance.getProcessInstancePriority(),
parentProcessInstance.getDryRun(),
subProcessInstanceId,
subProcessDefinition.getVersion()
commandType,
TaskDependType.TASK_POST,
parentProcessInstance.getFailureStrategy(),
parentProcessInstance.getExecutorId(),
subProcessDefinition.getCode(),
processParam,
parentProcessInstance.getWarningType(),
parentProcessInstance.getWarningGroupId(),
parentProcessInstance.getScheduleTime(),
task.getWorkerGroup(),
task.getEnvironmentCode(),
parentProcessInstance.getProcessInstancePriority(),
parentProcessInstance.getDryRun(),
subProcessInstanceId,
subProcessDefinition.getVersion()
);
}
@ -1278,7 +1277,7 @@ public class ProcessService {
*/
private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) {
ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(),
parentProcessInstance.getProcessDefinitionVersion());
parentProcessInstance.getProcessDefinitionVersion());
ProcessDefinition childDefinition = this.findProcessDefinitionByCode(childDefinitionCode);
if (childDefinition != null && fatherDefinition != null) {
childDefinition.setWarningGroupId(fatherDefinition.getWarningGroupId());
@ -1301,7 +1300,7 @@ public class ProcessService {
taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
} else {
if (processInstanceState != ExecutionStatus.READY_STOP
&& processInstanceState != ExecutionStatus.READY_PAUSE) {
&& processInstanceState != ExecutionStatus.READY_PAUSE) {
// failure task set invalid
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
@ -1354,9 +1353,9 @@ public class ProcessService {
// the task already exists in task queue
// return state
if (
state == ExecutionStatus.RUNNING_EXECUTION
|| state == ExecutionStatus.DELAY_EXECUTION
|| state == ExecutionStatus.KILL
state == ExecutionStatus.RUNNING_EXECUTION
|| state == ExecutionStatus.DELAY_EXECUTION
|| state == ExecutionStatus.KILL
) {
return state;
}
@ -1365,7 +1364,7 @@ public class ProcessService {
if (processInstanceState == ExecutionStatus.READY_PAUSE) {
state = ExecutionStatus.PAUSE;
} else if (processInstanceState == ExecutionStatus.READY_STOP
|| !checkProcessStrategy(taskInstance)) {
|| !checkProcessStrategy(taskInstance)) {
state = ExecutionStatus.KILL;
} else {
state = ExecutionStatus.SUBMITTED_SUCCESS;
@ -1389,7 +1388,7 @@ public class ProcessService {
for (TaskInstance task : taskInstances) {
if (task.getState() == ExecutionStatus.FAILURE
&& task.getRetryTimes() >= task.getMaxRetryTimes()) {
&& task.getRetryTimes() >= task.getMaxRetryTimes()) {
return false;
}
}
@ -1518,7 +1517,8 @@ public class ProcessService {
private void updateTaskDefinitionResources(TaskDefinition taskDefinition) {
Map<String, Object> taskParameters = JSONUtils.parseObject(
taskDefinition.getTaskParams(),
new TypeReference<Map<String, Object>>() { });
new TypeReference<Map<String, Object>>() {
});
if (taskParameters != null) {
// if contains mainJar field, query resource from database
// Flink, Spark, MR
@ -1756,7 +1756,8 @@ public class ProcessService {
return;
}
//if the result more than one line,just get the first .
Map<String, Object> taskParams = JSONUtils.parseObject(taskInstance.getTaskParams(), new TypeReference<Map<String, Object>>() {});
Map<String, Object> taskParams = JSONUtils.parseObject(taskInstance.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
Object localParams = taskParams.get(LOCAL_PARAMS);
if (localParams == null) {
return;
@ -1953,8 +1954,8 @@ public class ProcessService {
*/
public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) {
return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
dateInterval.getStartTime(),
dateInterval.getEndTime());
dateInterval.getStartTime(),
dateInterval.getEndTime());
}
/**
@ -1966,8 +1967,8 @@ public class ProcessService {
*/
public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) {
return processInstanceMapper.queryLastManualProcess(definitionCode,
dateInterval.getStartTime(),
dateInterval.getEndTime());
dateInterval.getStartTime(),
dateInterval.getEndTime());
}
/**
@ -1980,9 +1981,9 @@ public class ProcessService {
*/
public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) {
return processInstanceMapper.queryLastRunningProcess(definitionCode,
startTime,
endTime,
stateArray);
startTime,
endTime,
stateArray);
}
/**
@ -2188,10 +2189,10 @@ public class ProcessService {
AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType(), taskDefinition.getTaskParams());
if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
resourceIds = params.getResourceFilesList().
stream()
.filter(t -> t.getId() != 0)
.map(ResourceInfo::getId)
.collect(Collectors.toSet());
stream()
.filter(t -> t.getId() != 0)
.map(ResourceInfo::getId)
.collect(Collectors.toSet());
}
if (CollectionUtils.isEmpty(resourceIds)) {
return StringUtils.EMPTY;
@ -2211,7 +2212,7 @@ public class ProcessService {
taskDefinitionLog.setResourceIds(getResourceIds(taskDefinitionLog));
if (taskDefinitionLog.getCode() > 0 && taskDefinitionLog.getVersion() > 0) {
TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper
.queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
.queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
if (definitionCodeAndVersion != null) {
if (!taskDefinitionLog.equals(definitionCodeAndVersion)) {
taskDefinitionLog.setUserId(definitionCodeAndVersion.getUserId());
@ -2228,8 +2229,8 @@ public class ProcessService {
taskDefinitionLog.setCreateTime(now);
if (taskDefinitionLog.getCode() == 0) {
try {
taskDefinitionLog.setCode(SnowFlakeUtils.getInstance().nextId());
} catch (SnowFlakeException e) {
taskDefinitionLog.setCode(CodeGenerateUtils.getInstance().genCode());
} catch (CodeGenerateException e) {
logger.error("Task code get error, ", e);
return Constants.DEFINITION_FAILURE;
}
@ -2285,7 +2286,7 @@ public class ProcessService {
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
taskDefinitionLogMap = taskDefinitionLogs.stream()
.collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
.collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
}
Date now = new Date();
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
@ -2308,7 +2309,8 @@ public class ProcessService {
if (!processTaskRelationList.isEmpty()) {
Set<Integer> processTaskRelationSet = processTaskRelationList.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
Set<Integer> taskRelationSet = taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
if (CollectionUtils.isEqualCollection(processTaskRelationSet, taskRelationSet)) {
boolean result = CollectionUtils.isEqualCollection(processTaskRelationSet, taskRelationSet);
if (result) {
return Constants.EXIT_CODE_SUCCESS;
}
processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
@ -2322,9 +2324,9 @@ public class ProcessService {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
if (!processTaskRelationList.isEmpty()) {
Set<Long> processDefinitionCodes = processTaskRelationList
.stream()
.map(ProcessTaskRelation::getProcessDefinitionCode)
.collect(Collectors.toSet());
.stream()
.map(ProcessTaskRelation::getProcessDefinitionCode)
.collect(Collectors.toSet());
List<ProcessDefinition> processDefinitionList = processDefineMapper.queryByCodes(processDefinitionCodes);
// check process definition is already online
for (ProcessDefinition processDefinition : processDefinitionList) {
@ -2357,8 +2359,8 @@ public class ProcessService {
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
List<TaskDefinitionLog> taskDefinitionLogList = genTaskDefineList(processTaskRelations);
List<TaskDefinition> taskDefinitions = taskDefinitionLogList.stream()
.map(taskDefinitionLog -> JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinitionLog), TaskDefinition.class))
.collect(Collectors.toList());
.map(taskDefinitionLog -> JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinitionLog), TaskDefinition.class))
.collect(Collectors.toList());
return new DagData(processDefinition, processTaskRelations, taskDefinitions);
}
@ -2421,7 +2423,7 @@ public class ProcessService {
taskDefinitionLogs = genTaskDefineList(taskRelationList);
}
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = taskDefinitionLogs.stream()
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
List<TaskNode> taskNodeList = new ArrayList<>();
for (Entry<Long, List<Long>> code : taskCodeMap.entrySet()) {
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMap.get(code.getKey());
@ -2446,8 +2448,8 @@ public class ProcessService {
taskNode.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
taskNode.setEnvironmentCode(taskDefinitionLog.getEnvironmentCode());
taskNode.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
taskDefinitionLog.getTimeoutNotifyStrategy(),
taskDefinitionLog.getTimeout())));
taskDefinitionLog.getTimeoutNotifyStrategy(),
taskDefinitionLog.getTimeout())));
taskNode.setDelayTime(taskDefinitionLog.getDelayTime());
taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getCode).collect(Collectors.toList())));
taskNodeList.add(taskNode);

11
licenses/LICENSE-snowflake.txt

@ -0,0 +1,11 @@
Copyright 2010-2012 Twitter, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
Loading…
Cancel
Save