Implementation of Typical Operators
This section introduces the implementation details of three typical TiDB operators: Sort, HashAgg, and HashJoin.
Firstly, every operator should implement three basic interfaces of Executor
:
- Open - Initializes the operator, sets up the memory tracker/disk tracker, and other meta-info for the current operator.
- Next - Each call to
Next
returns a chunk of data. Returning an empty chunk indicates that the execution is complete for the current executor. Note thatNext
is not thread-safe. It's by design thatNext
is not called concurrently for all operators. - Close - Responsible for releasing all resources held by the executor.
Sort
The Sort operator is used to arrange the result set of a query in a specific order. In TiDB, the operator implementing sort is SortExec
. The fundamental concept behind SortExec
is to read all the data from its child executor and then sort the entire data set.
In Next
, it invokes fetchRowChunks
to read all the data from its child executor. fetchRowChunks
aims to store all the data in one SortedRowContainer
. The memory usage grows as the input data volume increases. To manage the memory usage, SortExec
has spill-to-disk support. The details of this spilling are encapsulated within SortedRowContainer
. Every time the insertion of a chunk into the current SortedRowContainer
returns ErrCannotAddBecauseSorted
, it indicates that the current SortedRowContainer
has been spilled. SortExec
will then create a new SortedRowContainer
and insert the chunk into this new container. Once there's no data coming from its child executor, SortExec
will sort the current SortedRowContainer
.
After fetchRowChunks
completes, Next
starts producing sorted results. Depending on whether a spill to disk was initiated, there are two methods to produce the final sorted outcomes:
- Spill not initiated: In this straightforward scenario, if there's no spill, since the entire
SortedRowContainer
gets sorted at the end offetchRowChunks
, duringNext
, it simply invokesGetSortedRowAndAlwaysAppendToChunk
to fetch the sorted data fromSortedRowContainer
. - Spill initiated: When a spill occurs, each spilling round produces an independent
SortedRowContainer
, stored inpartitionList
. InNext
, an external multi-way merge sort merges all partially sorted data streams into one final sorted data stream.
HashAgg
The HashAgg
operator uses a hash table to perform grouping and aggregation. In TiDB, the operator implementing hash aggregation is HashAggExec
.
HashAgg
has two execution modes: parallel and non-parallel execution. During the build stage, the planner is responsible for deciding the execution mode for a HashAgg
. A HashAgg
will operate in non-parallel execution mode if one of the following conditions is true:
- The aggregation function contains
distinct
. - The aggregation function (
GROUP_CONCAT
) containsorder by
. - The user explicitly sets both
hashAggPartialConcurrency
andhashAggFinalConcurrency
to 1.
Non-parallel Execution
Non-parallel execution mode performs aggregation in a single thread. unparallelExec
is the core function for non-parallel execution. In unparallelExec
, it first reads all the data from its child executor, then aggregates the data using execute. After execute
completes, unparallelExec
starts to generate results by traversing all the group-by keys, generating one row for each key by calling AppendFinalResult2Chunk for each aggregation function.
Parallel Execution
Parallel execution mode performs aggregation using multiple threads, dividing the aggregation into two stages:
- Partial stage: each thread aggregates a portion of the input data into partial results.
- Final stage: each thread aggregates the partial results into final results.
The flow of parallel execution is illustrated in the following graph:
+-------------+
| Main Thread |
+------+------+
^
|
+
+-+- +-+
| | ...... | | finalOutputCh
+++- +-+
^
|
+---------------+
| |
+--------------+ +--------------+
| final worker | ...... | final worker |
+------------+-+ +-+------------+
^ ^
| |
+-+ +-+ ...... +-+
| | | | | |
... ... ... partialOutputChs
| | | | | |
+++ +++ +++
^ ^ ^
+-+ | | |
| | +--------o----+ |
inputCh +-+ | +-----------------+---+
| | | |
... +---+------------+ +----+-----------+
| | | partial worker | ...... | partial worker |
+++ +--------------+-+ +-+--------------+
| ^ ^
| | |
+----v---------+ +++ +-+ +++
| data fetcher | +------> | | | | ...... | | partialInputChs
+--------------+ +-+ +-+ +-+
There are three types of threads that read data and execute the aggregation:
fetchChildData
: This thread's concurrency level is set to 1. It reads data from the child executor and places it intoinputCh
, serving as the input for each partial worker.HashAggPartialWorker
: The concurrency ofHashAggPartialWorker
is determined byhashAggPartialConcurrency
. This worker reads the input data, executes partial aggregation on the data, produces partial results, and sends them to the final worker.HashAggFinalWorker
: The concurrency ofHashAggFinalWorker
is set byhashAggFinalConcurrency
. This worker reads partial results, produces final results, and sends them tofinalOutputCh
.
Similar to Sort
, HashAgg
is also a memory-intensive operator. When HashAgg
runs in non-parallel execution mode, it supports spill-to-disk functionality (spill-to-disk in parallel execution mode is currently under development). Unlike Sort
, which spills all data to disk, the HashAgg
approach is different. In the current implementation, once a HashAgg
is flagged for spilling, for all subsequent inputs, if the group-by key of a row already exists in the current hash map, the row will be inserted into the hash map. If not, the row gets spilled to disk. Detailed workings of the HashAgg
spill can be explored here.
HashJoin
The HashJoin
operator uses a hash table to perform the join operation. In TiDB, the operator that implements hash join is HashJoinExec
.
HashJoin
constructs the results in two distinct stages:
- Fetch data from the build side child and build a hash table.
- Fetch data from the probe side child and probe the hash table using multiple join workers.
Build stage
The fetchAndBuildHashTable function orchestrates the build stage. Two threads are engaged in this work:
- fetchBuildSideRows reads data from the build side child and funnels it into the
buildSideResultCh
. - buildHashTableForList retrieves input data from
buildSideResultCh
and subsequently constructs the hash table based on this input.
Detailed mechanics of building the hash table are encapsulated within the hashRowContainer
. It's worth noting that, as of now, TiDB does not support the parallel building of hash tables.
Probe stage
The fetchAndProbeHashTable
function executes the probe stage. This stage engages two types of threads:
fetchProbeSideChunks
operates with a concurrency of 1. It reads data from the probe child and dispatches them to various probe workers.probeWorker
instances read data fromfetchProbeSideChunks
and probe concurrently. The concurrency level is determined byExecutorConcurrency
.
Each probeWorker
contains a joiner
, a core data structure implementing various join semantics. Every type of join in TiDB has its specialized joiner. The currently supported joiners include:
innerJoiner
- For inner joinleftOuterJoiner
- For left outer joinrightOuterJoiner
- For right outer joinsemiJoiner
- For semi joinantiSemiJoiner
- For anti semi joinantiLeftOuterSemiJoiner
- For anti left outer semi joinleftOuterSemiJoiner
- For left outer semi joinnullAwareAntiSemiJoiner
- For null aware anti semi joinnullAwareAntiLeftOuterSemiJoiner
- For null aware anti left outer semi join
The joiner
offers three foundational interfaces:
tryToMatchInners
- For each row from the probe side, it attempts to match the rows from the build side. Returns true if a match occurs and setsisNull
for the special join types:AntiSemiJoin
,LeftOuterSemiJoin
, andAntiLeftOuterSemiJoin
.tryToMatchOuters
- Exclusive to outer join scenarios where the outer side acts as the build hash table. For each row from the probe (inner) side, it attempts to match the rows from the build (outer) side.onMissMatch
- Used in semi join scenarios to manage cases where no rows from the build side match the probe row.
During the probeWorker
operation, it reads data from the probe side. For every probe row, it attempts to match against the hash table and saves the result into a result chunk. Most of these operations utilize the join2Chunk
function for probing. However, for outer joins that use the outer side as the build side, the function join2ChunkForOuterHashJoin
is called upon.
Within join2Chunk/join2ChunkForOuterHashJoin
, the probe work consists of three steps for each probe row:
- Quick tests are conducted before accessing the hash table to determine if a probe row won't find a match. For instance, in an inner join, if the join key contains null, the probe can bypass the hash table probing since null will never match any value. For rows that are non-matching, the
onMissMatch
function is invoked. - The hash table is looked up to identify potential matching rows.
- In the absence of potential matching rows, the
onMissMatch
function is invoked. Otherwise, thetryToMatch
function is executed.