Browse Source

update SnowFlake (#5321)

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 4 years ago committed by GitHub
parent
commit
49b234609a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 6
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
  3. 22
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtils.java
  4. 2
      dolphinscheduler-common/src/main/resources/common.properties
  5. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  6. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  7. 8
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -1414,7 +1414,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
((ObjectNode) locationsJN.get(id)).put("name", newName);
taskNode.setName(taskNode.getName() + suffix);
taskNode.setCode("0");
taskNode.setCode(0L);
});
processData.setTasks(taskNodeList);
String processDefinitionJson = JSONUtils.toJsonString(processData);

6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java

@ -42,7 +42,7 @@ public class TaskNode {
/**
* task node code
*/
private String code;
private Long code;
/**
* task node version
@ -329,11 +329,11 @@ public class TaskNode {
this.delayTime = delayTime;
}
public String getCode() {
public Long getCode() {
return code;
}
public void setCode(String code) {
public void setCode(Long code) {
this.code = code;
}

22
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtils.java

@ -17,36 +17,25 @@
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Objects;
public class SnowFlakeUtils {
// start timestamp
private static final long START_TIMESTAMP = 1609430400000L; //2021-01-01
private static final long START_TIMESTAMP = 1609430400L; //2021-01-01
// Number of digits
private static final long SEQUENCE_BIT = 12;
private static final long MACHINE_BIT = 5;
private static final long DATA_CENTER_BIT = 5;
// Maximum value
private static final long MAX_DATA_CENTER_NUM = ~(-1L << DATA_CENTER_BIT);
private static final long SEQUENCE_BIT = 13;
private static final long MACHINE_BIT = 2;
private static final long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT);
// The displacement to the left
private static final long MACHINE_LEFT = SEQUENCE_BIT;
private static final long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
private static final long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT;
private final int dataCenterId;
private static final long TIMESTAMP_LEFT = SEQUENCE_BIT + MACHINE_BIT;
private final int machineId;
private long sequence = 0L;
private long lastTimestamp = -1L;
private SnowFlakeUtils() throws SnowFlakeException {
this.dataCenterId = PropertyUtils.getInt(Constants.SNOW_FLAKE_DATA_CENTER_ID, 1);
if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) {
throw new IllegalArgumentException(String.format("dataCenterId can't be greater than %d or less than 0", MAX_DATA_CENTER_NUM));
}
try {
this.machineId = Math.abs(Objects.hash(InetAddress.getLocalHost().getHostName())) % 32;
} catch (UnknownHostException e) {
@ -78,7 +67,6 @@ public class SnowFlakeUtils {
}
lastTimestamp = currStmp;
return (currStmp - START_TIMESTAMP) << TIMESTAMP_LEFT
| dataCenterId << DATA_CENTER_LEFT
| machineId << MACHINE_LEFT
| sequence;
}
@ -92,7 +80,7 @@ public class SnowFlakeUtils {
}
private long nowTimestamp() {
return System.currentTimeMillis();
return System.currentTimeMillis() / 1000;
}
public static class SnowFlakeException extends Exception {

2
dolphinscheduler-common/src/main/resources/common.properties

@ -79,8 +79,6 @@ datasource.encryption.salt=!@#$%^&*
# Network IP gets priority, default inner outer
#dolphin.scheduler.network.priority.strategy=default
# 0<dataCenterId<32
#data.center.id=1
# use sudo or not, if set true ,executing user is tenant user and deploy user need sudo permissions ; if set false, executing user is the deploy user, don't need sudo permissions.
sudo.enable=true

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -440,7 +440,7 @@ public class DagHelper {
Map<Long, TaskNode> taskNodeMap = new HashMap<>();
taskNodeList.forEach(taskNode -> {
taskNodeMap.putIfAbsent(Long.parseLong(taskNode.getCode()), taskNode);
taskNodeMap.putIfAbsent(taskNode.getCode(), taskNode);
});
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -475,7 +475,7 @@ public class MasterExecThread implements Runnable {
TaskInstance taskInstance = findTaskIfExists(nodeName);
if (taskInstance == null) {
taskInstance = new TaskInstance();
taskInstance.setTaskCode(Long.parseLong(taskNode.getCode()));
taskInstance.setTaskCode(taskNode.getCode());
taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
// task name
taskInstance.setName(nodeName);

8
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2325,7 +2325,7 @@ public class ProcessService {
}
Map<String, TaskDefinition> taskDefinitionMap = new HashMap<>();
for (TaskNode taskNode : taskNodes) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(StringUtils.strDigitToLong(taskNode.getCode(), -1L));
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(taskNode.getCode());
if (taskDefinition == null) {
try {
long code = SnowFlakeUtils.getInstance().nextId();
@ -2484,7 +2484,7 @@ public class ProcessService {
taskNodeMap.compute(processTaskRelation.getPostTaskCode(), (k, v) -> {
if (v == null) {
v = new TaskNode();
v.setCode(processTaskRelation.getPostTaskCode() + "");
v.setCode(processTaskRelation.getPostTaskCode());
v.setVersion(processTaskRelation.getPostTaskVersion());
v.setConditionResult(processTaskRelation.getConditionParams());
List<PreviousTaskNode> preTaskNodeList = new ArrayList<>();
@ -2504,7 +2504,7 @@ public class ProcessService {
taskNodeMap.forEach((k, v) -> {
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMap.get(k);
v.setId(locationMap.get(taskDefinitionLog.getName()));
v.setCode(taskDefinitionLog.getCode() + "");
v.setCode(taskDefinitionLog.getCode());
v.setName(taskDefinitionLog.getName());
v.setDesc(taskDefinitionLog.getDescription());
v.setType(taskDefinitionLog.getTaskType().toUpperCase());
@ -2547,7 +2547,7 @@ public class ProcessService {
taskRelationList.forEach(relation -> taskCodeMap.putIfAbsent(relation.getPostTaskCode(), relation.getPostTaskVersion()));
taskNode.setCode(String.valueOf(taskDefinition.getCode()));
taskNode.setCode(taskDefinition.getCode());
taskNode.setVersion(taskDefinition.getVersion());
taskNode.setName(taskDefinition.getName());
taskNode.setName(taskDefinition.getName());

Loading…
Cancel
Save