MasterServer provides monitoring services based on netty.
#### The Service Mainly Includes:
- **DistributedQuartz** distributed scheduling component, which is mainly responsible for the start and stop operations of scheduled tasks. When quartz start the task, there will be a thread pool inside the Master responsible for the follow-up operation of the processing task;
- **MasterSchedulerService** is a scanning thread that regularly scans the `t_ds_command` table in the database, runs different business operations according to different **command types**;
- **WorkflowExecuteRunnable** is mainly responsible for DAG task segmentation, task submission monitoring, and logical processing of different event types;
- **Distributed Quartz** distributed scheduling component, which is mainly responsible for the start and stop operations of schedule tasks. When Quartz starts the task, there will be a thread pool inside the Master responsible for the follow-up operation of the processing task.
- **TaskExecuteRunnable** is mainly responsible for the processing and persistence of tasks, and generates task events and submits them to the event queue of the process instance;
- **MasterSchedulerThread** is a scanning thread that regularly scans the **command** table in the database and runs different business operations according to different **command types**.
- **EventExecuteService** is mainly responsible for the polling of the event queue of the process instances;
- **MasterExecThread** is mainly responsible for DAG task segmentation, task submission monitoring, and logical processing to different command types.
- **StateWheelExecuteThread** is mainly responsible for process instance and task timeout, task retry, task-dependent polling, and generates the corresponding process instance or task event and submits it to the event queue of the process instance;
- **MasterTaskExecThread** is mainly responsible for the persistence to tasks.
- **FailoverExecuteThread** is mainly responsible for the logic of Master fault tolerance and Worker fault tolerance;
* **WorkerServer**
@ -46,8 +52,12 @@
Server provides monitoring services based on netty.
#### The Service Mainly Includes:
- **Fetch TaskThread** is mainly responsible for continuously getting tasks from the **Task Queue**, and calling **TaskScheduleThread** corresponding executor according to different task types.
- **WorkerManagerThread** is mainly responsible for the submission of the task queue, continuously receives tasks from the task queue, and submits them to the thread pool for processing;
- **TaskExecuteThread** is mainly responsible for the process of task execution, and the actual processing of tasks according to different task types;
- **RetryReportTaskStatusThread** is mainly responsible for regularly polling to report the task status to the Master until the Master replies to the status ack to avoid the loss of the task status;
* **ZooKeeper**
@ -55,18 +65,13 @@
We have also implemented queues based on Redis, but we hope DolphinScheduler depends on as few components as possible, so we finally removed the Redis implementation.
* **Task Queue**
Provide task queue operation, the current queue is also implement base on ZooKeeper. Due to little information stored in the queue, there is no need to worry about excessive data in the queue. In fact, we have tested the millions of data storage in queues, which has no impact on system stability and performance.
* **Alert**
* **AlertServer**
Provide alarm related interface, the interface mainly includes **alarm** two types of alarm data storage, query and notification functions. Among them, there are **email notification** and **SNMP (not yet implemented)**.
Provides alarm services, and implements rich alarm methods through alarm plugins.
* **API**
The API interface layer is mainly responsible for processing requests from the front-end UI layer. The service uniformly provides RESTful APIs to provide request services to external.
Interfaces include workflow creation, definition, query, modification, release, logoff, manual start, stop, pause, resume, start execution from specific node, etc.
* **UI**
@ -102,41 +107,6 @@ Problems in centralized thought design:
- In fact, truly decentralized distributed systems are rare. Instead, dynamic centralized distributed systems are constantly pouring out. Under this architecture, the managers in the cluster are dynamically selected, rather than preset, and when the cluster fails, the nodes of the cluster will automatically hold "meetings" to elect new "managers" To preside over the work. The most typical case is Etcd implemented by ZooKeeper and Go language.
- The decentralization of DolphinScheduler is that the Master and Worker register in ZooKeeper, for implement the centerless feature to Master cluster and Worker cluster. Use the ZooKeeper distributed lock to elect one of the Master or Worker as the "manager" to perform the task.
#### Distributed Lock Practice
DolphinScheduler uses ZooKeeper distributed lock to implement only one Master executes Scheduler at the same time, or only one Worker executes the submission of tasks.
1. The following shows the core process algorithm for acquiring distributed locks:
- If there is no sub-process in a DAG, when the number of data in the Command is greater than the threshold set by the thread pool, the process directly waits or fails.
- If a large DAG nests many sub-processes, there will produce a "dead" state as the following figure:
In the above figure, MainFlowThread waits for the end of SubFlowThread1, SubFlowThread1 waits for the end of SubFlowThread2, SubFlowThread2 waits for the end of SubFlowThread3, and SubFlowThread3 waits for a new thread in the thread pool, then the entire DAG process cannot finish, and the threads cannot be released. In this situation, the state of the child-parent process loop waiting is formed. At this moment, unless a new Master is started and add threads to break such a "stalemate", the scheduling cluster will no longer use.
It seems a bit unsatisfactory to start a new Master to break the deadlock, so we proposed the following three solutions to reduce this risk:
1. Calculate the sum of all Master threads, and then calculate the number of threads required for each DAG, that is, pre-calculate before the DAG process executes. Because it is a multi-master thread pool, it is unlikely to obtain the total number of threads in real time.
2. Judge whether the single-master thread pool is full, let the thread fail directly when fulfilled.
3. Add a Command type with insufficient resources. If the thread pool is insufficient, suspend the main process. In this way, there are new threads in the thread pool, which can make the process suspended by insufficient resources wake up to execute again.
Note: The Master Scheduler thread executes by FIFO when acquiring the Command.
So we choose the third way to solve the problem of insufficient threads.
#### Fault-Tolerant Design
Fault tolerance divides into service downtime fault tolerance and task retry, and service downtime fault tolerance divides into master fault tolerance and worker fault tolerance.
@ -188,11 +158,11 @@ Here we must first distinguish the concepts of task failure retry, process failu
Next to the main point, we divide the task nodes in the workflow into two types.
- One is a business node, which corresponds to an actual script or process command, such as shell node, MR node, Spark node, and dependent node.
- One is a business task, which corresponds to an actual script or process command, such as Shell task, SQL task, and Spark task.
- Another is a logical node, which does not operate actual script or process command, but only logical processing to the entire process flow, such as sub-process sections.
- Another is a logical task, which does not operate actual script or process command, but only logical processing to the entire process flow, such as sub-process task, dependent task.
Each **business node** can configure the number of failed retries. When the task node fails, it will automatically retry until it succeeds or exceeds the retry times. **Logical node** failure retry is not supported, but the tasks in the logical node support.
**Business node** can configure the number of failed retries. When the task node fails, it will automatically retry until it succeeds or exceeds the retry times. **Logical node** failure retry is not supported.
If there is a task failure in the workflow that reaches the maximum retry times, the workflow will fail and stop, and the failed workflow can be manually re-run or process recovery operations.
@ -225,55 +195,29 @@ In the early schedule design, if there is no priority design and use the fair sc
- The queue field in the t_ds_user table stores the queue_name information in the t_ds_queue table, t_ds_tenant stores queue information using queue_id column. During the execution of the process definition, the user queue has the highest priority. If the user queue is null, use the tenant queue.
- The user_id field in the t_ds_datasource table shows the user who create the data source. The user_id in t_ds_relation_datasource_user shows the user who has permission to the data source.
- The queue field in the `t_ds_user` table stores the `queue_name` information in the `t_ds_queue` table, `t_ds_tenant` stores queue information using `queue_id` column. During the execution of the process definition, the user queue has the highest priority. If the user queue is null, use the tenant queue.
- The `user_id` field in the `t_ds_datasource` table shows the user who create the data source. The user_id in `t_ds_relation_datasource_user` shows the user who has permission to the data source.
- User can have multiple projects, user project authorization completes the relationship binding using project_id and user_id in t_ds_relation_project_user table.
- The user_id in the t_ds_projcet table represents the user who create the project, and the user_id in the t_ds_relation_project_user table represents users who have permission to the project.
- The user_id in the t_ds_resources table represents the user who create the resource, and the user_id in t_ds_relation_resources_user represents the user who has permissions to the resource.
- The user_id in the t_ds_udfs table represents the user who create the UDF, and the user_id in the t_ds_relation_udfs_user table represents a user who has permission to the UDF.
- A project has multiple process definitions, a process definition can generate multiple process instances, and a process instance can generate multiple task instances.
- The t_ds_schedulers table stores the specified time schedule information for process definition.
- The data stored in the t_ds_relation_process_instance table is used to deal with the sub-processes of a process definition, parent_process_instance_id field represents the id of the main process instance who contains child processes, process_instance_id field represents the id of the sub-process instance, parent_task_instance_id field represents the task instance id of the sub-process node.
- The process instance table and the task instance table correspond to the t_ds_process_instance table and the t_ds_task_instance table, respectively.
---
## Core Table Schema
### t_ds_process_definition
| Field | Type | Comment |
| --- | --- | --- |
| id | int | primary key |
| name | varchar | process definition name |
| version | int | process definition version |
| release_state | tinyint | process definition release state:0:offline,1:online |
| project_id | int | project id |
| user_id | int | process definition creator id |
| process_definition_json | longtext | process definition JSON content |
| description | text | process definition description |
| global_params | text | global parameters |
| flag | tinyint | whether process available: 0 not available, 1 available |
| locations | text | Node location information |
| connects | text | Node connection information |
| receivers | text | receivers |
| receivers_cc | text | carbon copy list |
| create_time | datetime | create time |
| timeout | int | timeout |
| tenant_id | int | tenant id |
| update_time | datetime | update time |
| modify_by | varchar | define user modify the process |
| resource_ids | varchar | resource id set |
### t_ds_process_instance
| Field | Type | Comment |
| --- | --- | --- |
| id | int | primary key |
| name | varchar | process instance name |
| process_definition_id | int | process definition id |
| state | tinyint | process instance Status: 0 successful commit, 1 running, 2 prepare to pause, 3 pause, 4 prepare to stop, 5 stop, 6 fail, 7 succeed, 8 need fault tolerance, 9 kill, 10 wait for thread, 11 wait for dependency to complete |
| worker_group | varchar | worker group who assign the task |
- User can have multiple projects, user project authorization completes the relationship binding using `project_id` and `user_id` in `t_ds_relation_project_user` table.
- The `user_id` in the `t_ds_projcet` table represents the user who create the project, and the `user_id` in the `t_ds_relation_project_user` table represents users who have permission to the project.
- The `user_id` in the `t_ds_resources` table represents the user who create the resource, and the `user_id` in `t_ds_relation_resources_user` represents the user who has permissions to the resource.
- The `user_id` in the `t_ds_udfs` table represents the user who create the UDF, and the `user_id` in the `t_ds_relation_udfs_user` table represents a user who has permission to the UDF.
| warning_type | tinyint | alarm type: 0 no alarm, 1 alarm if process success, 2: alarm if process failed, 3: warning whatever results |
| warning_group_id | int | warning group id |
| schedule_time | datetime | schedule time |
| start_time | datetime | start time |
| executor_id | int | executor id |
| dependence | varchar | dependence column |
| update_time | datetime | update time |
| process_instance_priority | int | process instance priority: 0 highest,1 high,2 medium,3 low,4 lowest |
| worker_group_id | int | worker group who assign the task |
- A process definition corresponds to multiple task definitions, which are associated through `t_ds_process_task_relation` and the associated key is `code + version`. When the pre-task of the task is empty, the corresponding `pre_task_node` and `pre_task_version` are 0.
- A process definition can have multiple process instances `t_ds_process_instance`, one process instance corresponds to one or more task instances `t_ds_task_instance`.
- The data stored in the `t_ds_relation_process_instance` table is used to handle the case that the process definition contains sub-processes. `parent_process_instance_id` represents the id of the main process instance containing the sub-process, `process_instance_id` represents the id of the sub-process instance, `parent_task_instance_id` represents the task instance id of the sub-process node. The process instance table and the task instance table correspond to the `t_ds_process_instance` table and the `t_ds_task_instance` table, respectively.
容错后处理:ZooKeeper Master容错完成之后则重新由DolphinScheduler中Scheduler线程调度,遍历 DAG 找到”正在运行”和“提交成功”的任务,对”正在运行”的任务监控其任务实例的状态,对”提交成功”的任务需要判断Task Queue中是否已经存在,如果存在则同样监控任务实例的状态,如果不存在则重新提交任务实例。