Browse Source

Merge branch 'json_split' of https://github.com/apache/incubator-dolphinscheduler into json-split

pull/3/MERGE
lenboo 4 years ago
parent
commit
95bbdfedfe
  1. 5
      dolphinscheduler-api/pom.xml
  2. 5
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  3. 103
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtils.java
  4. 2
      dolphinscheduler-common/src/main/resources/common.properties
  5. 33
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtilsTest.java
  6. 166
      sql/dolphinscheduler_mysql.sql

5
dolphinscheduler-api/pom.xml

@ -39,6 +39,11 @@
<artifactId>dolphinscheduler-dao</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-common</artifactId>
</dependency>
<!--springboot-->
<dependency>
<groupId>org.springframework.boot</groupId>

5
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -1029,4 +1029,9 @@ public final class Constants {
* pstree, get pud and sub pid
*/
public static final String PSTREE = "pstree";
/**
* snow flake, data center id, this id must be greater than 0 and less than 32
*/
public static final String SNOW_FLAKE_DATA_CENTER_ID = "data.center.id";
}

103
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtils.java

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Objects;
public class SnowFlakeUtils {
// start timestamp
private static final long START_TIMESTAMP = 1609430400000L; //2021-01-01
// Number of digits
private static final long SEQUENCE_BIT = 12;
private static final long MACHINE_BIT = 5;
private static final long DATA_CENTER_BIT = 5;
// Maximum value
private static final long MAX_DATA_CENTER_NUM = ~(-1L << DATA_CENTER_BIT);
private static final long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT);
// The displacement to the left
private static final long MACHINE_LEFT = SEQUENCE_BIT;
private static final long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
private static final long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT;
private final int dataCenterId;
private final int machineId;
private long sequence = 0L;
private long lastTimestamp = -1L;
private SnowFlakeUtils() throws SnowFlakeException {
this.dataCenterId = PropertyUtils.getInt(Constants.SNOW_FLAKE_DATA_CENTER_ID, 1);
if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) {
throw new IllegalArgumentException(String.format("dataCenterId can't be greater than %d or less than 0", MAX_DATA_CENTER_NUM));
}
try {
this.machineId = Math.abs(Objects.hash(InetAddress.getLocalHost().getHostName())) % 32;
} catch (UnknownHostException e) {
throw new SnowFlakeException(e.getMessage());
}
}
private static SnowFlakeUtils instance = null;
public static synchronized SnowFlakeUtils getInstance() throws SnowFlakeException {
if (instance == null) {
instance = new SnowFlakeUtils();
}
return instance;
}
public synchronized long nextId() throws SnowFlakeException {
long currStmp = nowTimestamp();
if (currStmp < lastTimestamp) {
throw new SnowFlakeException("Clock moved backwards. Refusing to generate id");
}
if (currStmp == lastTimestamp) {
sequence = (sequence + 1) & MAX_SEQUENCE;
if (sequence == 0L) {
currStmp = getNextMill();
}
} else {
sequence = 0L;
}
lastTimestamp = currStmp;
return (currStmp - START_TIMESTAMP) << TIMESTAMP_LEFT
| dataCenterId << DATA_CENTER_LEFT
| machineId << MACHINE_LEFT
| sequence;
}
private long getNextMill() {
long mill = nowTimestamp();
while (mill <= lastTimestamp) {
mill = nowTimestamp();
}
return mill;
}
private long nowTimestamp() {
return System.currentTimeMillis();
}
public static class SnowFlakeException extends Exception {
public SnowFlakeException(String message) {
super(message);
}
}
}

2
dolphinscheduler-common/src/main/resources/common.properties

@ -75,3 +75,5 @@ datasource.encryption.salt=!@#$%^&*
# Network IP gets priority, default inner outer
#dolphin.scheduler.network.priority.strategy=default
# 0<dataCenterId<32
#data.center.id=1

33
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/SnowFlakeUtilsTest.java

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.junit.Test;
public class SnowFlakeUtilsTest {
@Test
public void testNextId() {
try {
for (int i = 0; i < 5; i++) {
System.out.println(SnowFlakeUtils.getInstance().nextId());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

166
sql/dolphinscheduler_mysql.sql

@ -394,35 +394,154 @@ CREATE TABLE `t_ds_error_command` (
-- ----------------------------
DROP TABLE IF EXISTS `t_ds_process_definition`;
CREATE TABLE `t_ds_process_definition` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
`name` varchar(255) DEFAULT NULL COMMENT 'process definition name',
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id',
`code` bigint(20) NOT NULL COMMENT 'encoding',
`name` varchar(200) DEFAULT NULL COMMENT 'process definition name',
`version` int(11) DEFAULT NULL COMMENT 'process definition version',
`description` text COMMENT 'description',
`project_code` bigint(20) NOT NULL COMMENT 'project code',
`release_state` tinyint(4) DEFAULT NULL COMMENT 'process definition release state:0:offline,1:online',
`project_id` int(11) DEFAULT NULL COMMENT 'project id',
`user_id` int(11) DEFAULT NULL COMMENT 'process definition creator id',
`process_definition_json` longtext COMMENT 'process definition json content',
`description` text,
`global_params` text COMMENT 'global parameters',
`locations` text COMMENT 'node location information',
`flag` tinyint(4) DEFAULT NULL COMMENT '0 not available, 1 available',
`locations` text COMMENT 'Node location information',
`connects` text COMMENT 'Node connection information',
`receivers` text COMMENT 'receivers',
`receivers_cc` text COMMENT 'cc',
`create_time` datetime DEFAULT NULL COMMENT 'create time',
`timeout` int(11) DEFAULT '0' COMMENT 'time out',
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
`modify_by` varchar(255) DEFAULT NULL,
`resource_ids` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `process_definition_unique` (`name`,`project_id`),
KEY `process_definition_index` (`project_id`,`id`) USING BTREE
PRIMARY KEY (`id`,`code`),
UNIQUE KEY `process_unique` (`name`,`project_code`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of t_ds_process_definition
-- ----------------------------
-- ----------------------------
-- Table structure for t_ds_process_definition_log
-- ----------------------------
CREATE TABLE `t_ds_process_definition_log` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id',
`code` bigint(20) NOT NULL COMMENT 'encoding',
`name` varchar(200) DEFAULT NULL COMMENT 'process definition name',
`version` int(11) DEFAULT NULL COMMENT 'process definition version',
`description` text COMMENT 'description',
`project_code` bigint(20) NOT NULL COMMENT 'project code',
`release_state` tinyint(4) DEFAULT NULL COMMENT 'process definition release state:0:offline,1:online',
`user_id` int(11) DEFAULT NULL COMMENT 'process definition creator id',
`global_params` text COMMENT 'global parameters',
`locations` text COMMENT 'node location information',
`flag` tinyint(4) DEFAULT NULL COMMENT '0 not available, 1 available',
`receivers` text COMMENT 'receivers',
`receivers_cc` text COMMENT 'cc',
`timeout` int(11) DEFAULT '0' COMMENT 'time out',
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
`operator` int(11) DEFAULT NULL COMMENT 'operator user id',
`operate_time` datetime DEFAULT NULL COMMENT 'operate time',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for t_ds_task_definition
-- ----------------------------
CREATE TABLE `t_ds_task_definition` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id',
`code` bigint(20) NOT NULL COMMENT 'encoding',
`name` varchar(200) DEFAULT NULL COMMENT 'task definition name',
`version` int(11) DEFAULT NULL COMMENT 'task definition version',
`description` text COMMENT 'description',
`project_code` bigint(20) NOT NULL COMMENT 'project code',
`user_id` int(11) DEFAULT NULL COMMENT 'task definition creator id',
`task_type` varchar(30) DEFAULT NULL COMMENT 'job type',
`task_params` text COMMENT 'job custom parameters',
`run_flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available',
`task_priority` tinyint(4) DEFAULT NULL COMMENT 'job priority',
`worker_group` varchar(200) DEFAULT NULL COMMENT 'worker grouping',
`fail_retry_times` int(11) DEFAULT NULL COMMENT 'number of failed retries',
`fail_retry_interval` int(11) DEFAULT NULL COMMENT 'failed retry interval',
`timeout_flag` tinyint(1) DEFAULT '0' COMMENT 'timeout flag:0 close, 1 open',
`timeout_notify_strategy` tinyint(4) DEFAULT NULL COMMENT 'timeout notification policy: 0 warning, 1 fail',
`timeout_duration` int(11) DEFAULT '0' COMMENT 'timeout length',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
PRIMARY KEY (`id`,`code`),
UNIQUE KEY `task_unique` (`name`,`project_code`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for t_ds_task_definition_log
-- ----------------------------
CREATE TABLE `t_ds_task_definition_log` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id',
`code` bigint(20) NOT NULL COMMENT 'encoding',
`name` varchar(200) DEFAULT NULL COMMENT 'task definition name',
`version` int(11) DEFAULT NULL COMMENT 'task definition version',
`description` text COMMENT 'description',
`project_code` bigint(20) NOT NULL COMMENT 'project code',
`user_id` int(11) DEFAULT NULL COMMENT 'task definition creator id',
`task_type` varchar(30) DEFAULT NULL COMMENT 'job type',
`task_params` text COMMENT 'job custom parameters',
`run_flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available',
`task_priority` tinyint(4) DEFAULT NULL COMMENT 'job priority',
`worker_group` varchar(200) DEFAULT NULL COMMENT 'worker grouping',
`fail_retry_times` int(11) DEFAULT NULL COMMENT 'number of failed retries',
`fail_retry_interval` int(11) DEFAULT NULL COMMENT 'failed retry interval',
`timeout_flag` tinyint(1) DEFAULT '0' COMMENT 'timeout flag:0 close, 1 open',
`timeout_notify_strategy` tinyint(4) DEFAULT NULL COMMENT 'timeout notification policy: 0 warning, 1 fail',
`timeout_duration` int(11) DEFAULT '0' COMMENT 'timeout length',
`operator` int(11) DEFAULT NULL COMMENT 'operator user id',
`operate_time` datetime DEFAULT NULL COMMENT 'operate time',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for t_ds_process_task_relation
-- ----------------------------
CREATE TABLE `t_ds_process_task_relation` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id',
`name` varchar(200) DEFAULT NULL COMMENT 'relation name',
`version` int(11) DEFAULT NULL COMMENT 'relation version',
`project_code` bigint(20) NOT NULL COMMENT 'project code',
`process_definition_code` bigint(20) NOT NULL COMMENT 'process code',
`pre_project_code` bigint(20) NOT NULL COMMENT 'pre process code',
`pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code',
`condition_type` tinyint(2) DEFAULT NULL COMMENT 'condition type : 0 none, 1 judge 2 delay',
`condition_params` text COMMENT 'condition params(json)',
`post_project_code` bigint(20) NOT NULL COMMENT 'post process code',
`post_task_code` bigint(20) NOT NULL COMMENT 'post task code',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for t_ds_process_task_relation_log
-- ----------------------------
CREATE TABLE `t_ds_process_task_relation_log` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id',
`name` varchar(200) DEFAULT NULL COMMENT 'relation name',
`version` int(11) DEFAULT NULL COMMENT 'relation version',
`project_code` bigint(20) NOT NULL COMMENT 'project code',
`process_definition_code` bigint(20) NOT NULL COMMENT 'process code',
`pre_project_code` bigint(20) NOT NULL COMMENT 'pre process code',
`pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code',
`condition_type` tinyint(2) DEFAULT NULL COMMENT 'condition type : 0 none, 1 judge 2 delay',
`condition_params` text COMMENT 'condition params(json)',
`post_project_code` bigint(20) NOT NULL COMMENT 'post process code',
`post_task_code` bigint(20) NOT NULL COMMENT 'post task code',
`operator` int(11) DEFAULT NULL COMMENT 'operator user id',
`operate_time` datetime DEFAULT NULL COMMENT 'operate time',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for t_ds_process_definition_version
-- ----------------------------
@ -444,7 +563,7 @@ CREATE TABLE `t_ds_process_definition_version` (
PRIMARY KEY (`id`),
UNIQUE KEY `process_definition_id_and_version` (`process_definition_id`,`version`) USING BTREE,
KEY `process_definition_index` (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=84 DEFAULT CHARSET=utf8;
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of t_ds_process_definition
@ -457,7 +576,8 @@ DROP TABLE IF EXISTS `t_ds_process_instance`;
CREATE TABLE `t_ds_process_instance` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
`name` varchar(255) DEFAULT NULL COMMENT 'process instance name',
`process_definition_id` int(11) DEFAULT NULL COMMENT 'process definition id',
`process_definition_version` int(11) DEFAULT NULL COMMENT 'process definition version',
`process_definition_code` bigint(20) not NULL COMMENT 'process definition code',
`state` tinyint(4) DEFAULT NULL COMMENT 'process instance Status: 0 commit succeeded, 1 running, 2 prepare to pause, 3 pause, 4 prepare to stop, 5 stop, 6 fail, 7 succeed, 8 need fault tolerance, 9 kill, 10 wait for thread, 11 wait for dependency to complete',
`recovery` tinyint(4) DEFAULT NULL COMMENT 'process instance failover flag:0:normal,1:failover instance',
`start_time` datetime DEFAULT NULL COMMENT 'process instance start time',
@ -474,22 +594,17 @@ CREATE TABLE `t_ds_process_instance` (
`schedule_time` datetime DEFAULT NULL COMMENT 'schedule time',
`command_start_time` datetime DEFAULT NULL COMMENT 'command start time',
`global_params` text COMMENT 'global parameters',
`process_instance_json` longtext COMMENT 'process instance json(copy的process definition 的json)',
`flag` tinyint(4) DEFAULT '1' COMMENT 'flag',
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`is_sub_process` int(11) DEFAULT '0' COMMENT 'flag, whether the process is sub process',
`executor_id` int(11) NOT NULL COMMENT 'executor id',
`locations` text COMMENT 'Node location information',
`connects` text COMMENT 'Node connection information',
`history_cmd` text COMMENT 'history commands of process instance operation',
`dependence_schedule_times` text COMMENT 'depend schedule fire time',
`process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance priority. 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`worker_group` varchar(64) DEFAULT NULL COMMENT 'worker group id',
`timeout` int(11) DEFAULT '0' COMMENT 'time out',
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
`var_pool` longtext COMMENT 'var_pool',
PRIMARY KEY (`id`),
KEY `process_instance_index` (`process_definition_id`,`id`) USING BTREE,
KEY `process_instance_index` (`process_definition_code`,`id`) USING BTREE,
KEY `start_time_index` (`start_time`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
@ -504,10 +619,11 @@ DROP TABLE IF EXISTS `t_ds_project`;
CREATE TABLE `t_ds_project` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
`name` varchar(100) DEFAULT NULL COMMENT 'project name',
`code` bigint(20) NOT NULL COMMENT 'encoding',
`description` varchar(200) DEFAULT NULL,
`user_id` int(11) DEFAULT NULL COMMENT 'creator id',
`flag` tinyint(4) DEFAULT '1' COMMENT '0 not available, 1 available',
`create_time` datetime DEFAULT NULL COMMENT 'create time',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
PRIMARY KEY (`id`),
KEY `user_id_index` (`user_id`) USING BTREE
@ -715,8 +831,9 @@ DROP TABLE IF EXISTS `t_ds_task_instance`;
CREATE TABLE `t_ds_task_instance` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
`name` varchar(255) DEFAULT NULL COMMENT 'task name',
`task_definition_version` int(11) DEFAULT NULL COMMENT 'task definition version',
`task_type` varchar(64) DEFAULT NULL COMMENT 'task type',
`process_definition_id` int(11) DEFAULT NULL COMMENT 'process definition id',
`process_definition_code` bigint(20) NOT NULL COMMENT 'process definition code',
`process_instance_id` int(11) DEFAULT NULL COMMENT 'process instance id',
`task_json` longtext COMMENT 'task content json',
`state` tinyint(4) DEFAULT NULL COMMENT 'Status: 0 commit succeeded, 1 running, 2 prepare to pause, 3 pause, 4 prepare to stop, 5 stop, 6 fail, 7 succeed, 8 need fault tolerance, 9 kill, 10 wait for thread, 11 wait for dependency to complete',
@ -736,9 +853,6 @@ CREATE TABLE `t_ds_task_instance` (
`task_instance_priority` int(11) DEFAULT NULL COMMENT 'task instance priority:0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`worker_group` varchar(64) DEFAULT NULL COMMENT 'worker group id',
`executor_id` int(11) DEFAULT NULL,
`first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time',
`delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time',
`var_pool` longtext COMMENT 'var_pool',
PRIMARY KEY (`id`),
KEY `process_instance_id` (`process_instance_id`) USING BTREE,
KEY `task_instance_index` (`process_definition_id`,`process_instance_id`) USING BTREE,

Loading…
Cancel
Save