KdbplusForMortals/segments
1.4 Segments
1.4.0 Overview
Recall that a query against a partitioned table executes sequentially across the required partitions. I/O bandwidth limits performance on large partitioned tables because retrieving data from physical storage is much slower that processing that data once it is in memory. Simply put, the q process will spend time waiting for data, do a flurry of calculations, only to wait on the next chunk of data. The way to address this is to take advantage of parallel I/O and concurrent processing. To this end, we aim to spread a table across multiple I/O channels to facilitate parallel retrieval.
1.4.1 Segmented Tables
Segmentation is an additional level of structure on top of partitioning. Segmentation spreads a partitioned table’s records across multiple storage locations. Each subset of records, called a segment, is a directory that contains a collection of partition directories. The segment directories are presumably on different I/O channels so that data retrieval can occur without contention.
You can use any criteria to decompose partition slices, as long as you end up with conforming record subsets that are disjoint and complete—i.e., they reconstitute the original table with no omissions or duplication. The decomposition can be along rows, along partitions or by some combination thereof, but it cannot occur along columns.
Note: You must ensure that the segments conform and are complete and disjoint, since kdb+ will not check this when you write the data files. In particular, overlapping segments will result in duplicate records in query results and an incomplete decomposition will result in dropped records.
Big Picture (3): We think of a segmented table as a three-dimensional persisted form: the table is cut vertically by splaying, sliced horizontally by partitions and is additionally spread across physical locations in segments. The primary purpose of the third dimension is to allow operations against the tables to take advantage of parallel I/O and concurrent processing. Following is an abstract representation of segmentation:
segment1
column* | column* | … | |
partition* | ▪ | ▪ | … |
partition* | ▪ | ▪ | … |
… | … | … | … |
segment2
column* | column* | … | |
partition* | ▪ | ▪ | … |
partition* | ▪ | ▪ | … |
… | … | … | … |
segment3
column* | column* | … | |
partition* | ▪ | ▪ | … |
partition* | ▪ | ▪ | … |
… | … | … | … |
Here each ▪ represents a (partial) column of a partition slice as an opaque entity.
1.4.1.1 Contrasting Partitions and Segments
Although partitioning and segmentation are similar in that they both group a table’s records into directories, there are fundamental differences. Remember, segmentation sits above partitioning, so all segmented tables are partitioned but the converse is not true.
Partitioned Table | Segmented Table | |
Record location | All partitions (and hence all records) reside under the root directory. | None of the segments (and hence no records) reside under the root. |
I/O channels | All partitions (and hence all records) reside on a single I/O channel. | The segments (and hence the records) should reside on multiple I/O channels. |
Processing | Partitions loaded and processed sequentially in aggregation queries. | Given appropriate slaves and cores, aggregate queries load segments in parallel and process them concurrently. |
Decomposition | Partition by grouping rows on the values of a virtual column of underlying integral type. | Segment by any criteria yielding disjoint and complete decomposition. |
Symbols | Cannot partition on a symbol column | Can segment along a symbol column |
Virtual Column | Partition column not stored. Virtual column values inferred from directory names | No special column associated with segmentation (virtual column from underlying partition still present) |
1.4.1.2 Segment Layout Examples
In contrast to the partitioned table layout in which partitions reside under the root, the segment directories must not reside under the root. The only portion of a segmented table (other than the sym file for enumerated symbol columns) that lives in the root is a file called par.txt, which contains the paths of the physical locations of the segments, one segment path per line.
Here is how the abstract segmentation discussed above would be laid out on the file system:
/db [sym] par.txt =============== <- channel 1 /segment1 /partition* /table* /table* … /partition* /table* /table* … =============== <- channel 2 /segment2 /partition* /table* /table* … /partition* /table* /table* … =============== …
To make this more concrete, we demonstrate how to segment daily trades (and eventually quotes) in several useful ways. To begin, we create segments by bucketing trades into alternating days of the week:
/1 <- drive 1 /2009.01.01 /t <- day’s trades /2009.01.03 /t <- day’s trades … ============= /2 <- drive 2 /2009.01.02 /t <- day’s trades /2009.01.04 /t <- day’s trades …
This segmentation represents grouping of partitions, so it is orthogonal to the partitioning. It is clearly complete and disjoint and is easily generalizable to round-robining every n business days.
We could alternatively create segments by splitting the daily slices into records with symbols starting with a-m and those starting with n-z. Here we are decomposing based on the values in a symbol column, which we could not do with simple partitioning.
/am <- drive 1 /2009.01.01 /t <- day’s trades for syms a-m /2009.01.02 /t <- day’s trades for syms a-m … ============= /nz <- drive 2 /2009.01.01 /t <- day’s trades for syms n-z /2009.01.02 /t <- day’s trades for syms n-z …
This segmentation clearly results in complete, disjoint subsets. It is not orthogonal to the partitioning because a single day's trades span multiple segments. It is easily generalizable to n segments by splitting the alphabet into n roughly equal portions.
Alternately, we can create segments by splitting the daily slices into trades from NYSE and trades from NASDAQ.
/nyse <- drive 1 /2009.01.01 /t <- day’s trades for nyse /2009.01.02 /t <- day’s trades for nyse … ============= /nasd <- drive 2 /2009.01.01 /t <- day’s trades for nasd /2009.01.02 /t <- day’s trades for nasd …
The segmentation is also not orthogonal to the partitioning since a single day's trades span multiple segments. It is clearly complete and disjoint and is easily generalizable to multiple exchanges—e.g., all Asian exchanges.
Finally, we provide an example of a non-uniform segmentation, in that some partitions span segments and others do not. The A segment contains trades from the beginning of day 1 until lunchtime of day 2. The B segment contains the trades from lunchtime of day 2 through the end of day 3. The C segment contains all trades from day 4.
/seg A <- drive 1 /2009.01.01 /t <- entire day’s trades /2009.01.02 /t <- morning trades ============= /seg B <- drive 2 /2009.01.02 /t <- afternoon trades /2009.01.03 /t <- entire day’s trades ============= /seg C <- drive 3 /2009.01.04 /t <- entire day’s trades
This segmentation is disjoint and complete and it is not orthogonal to the partitioning.
1.4.2 Creating Segments
As with partitions, there is no one-size-fits-all utility to create segments. Instead, you write a q program that places the appropriate subset of each partition slice into the correct segment directory. You can create historical segments and partitions at the same time by including logic in your load script to extract slice subsets and place them in the appropriate directory on the appropriate drive.
Along with creating the partitions, you must also specify the locations of the segments in an ASCII text file par.txt located in the root. Each line of par.txt contains the path of one segment directory; symlinks are acceptable.
Important: The segment paths must not be under the root. Beginners sometimes make this mistake and the result is like listening to Vogon poetry.
We illustrate the process by creating segmented tables for each example in the previous section. Our intent is to demonstrate how to create the segments and see that execution of the query pieces the segment results back together. We run our simple queries without concern for placing the segments on multiple drives and running multiple slaves, as we are not (yet) considering performance.
1.4.2.1 Trades Segmented by Day of Week
Start with a fresh /db. This segmentation groups partitions in alternating days as an example of round-robining. It resides in the following directory structure:
/1 /2009.01.01 /t /2009.01.03 /t /2 /2009.01.02 /t /2009.01.04 /t
The corresponding par.txt is:
/1 /2
A day’s trade table has the schema:
([] ti:`time$(); sym:`sym$(); p:`float$())
Segment construction is simple: put successive days in alternating segments.
`:/1/2009.01.01/t/ set ([] ti:09:30:00 09:31:00; s:`:/db/sym?`ibm`t; p:101 17f) `:/2/2009.01.02/t/ set ([] ti:09:30:00 09:31:00; s:`:/db/sym?`ibm`t; p:101.5 17.5) `:/1/2009.01.03/t/ set ([] ti:09:30:00 09:31:00; s:`:/db/sym?`ibm`t; p:103 16.5f) `:/2/2009.01.04/t/ set ([] ti:09:30:00 09:31:00; s:`:/db/sym?`ibm`t; p:102 17f) `:/db/par.txt 0: ("/1"; "/2") `:/db/par.txt \l /db select from t where date within 2009.01.01 2009.01.04 date ti s p ----------------------------- 2009.01.01 09:30:00 ibm 101 2009.01.01 09:31:00 t 17 2009.01.02 09:30:00 ibm 101.5 2009.01.02 09:31:00 t 17.5 2009.01.03 09:30:00 ibm 103 2009.01.03 09:31:00 t 16.5 2009.01.04 09:30:00 ibm 102 2009.01.04 09:31:00 t 17
1.4.2.2 Trades Segmented by Symbol Range
Start with a fresh /db. The segments for symbols a-m and n-z reside in the directory structure:
/am /2009.01.01 /2009.01.02 /nz /2009.01.01 /2009.01.02
The corresponding par.txt is:
/am /nz
A day’s trade table has the schema:
([] ti:`time$(); sym:`sym$(); p:`float$())
Do not confuse the column sym with the enumeration sym or the file named sym. They are all distinct.
We make a utility function to extract trades by symbol range:
extr:{[t;r] select from t where (`$1#'string sym) within r}
We spread each day’s trades across the two segments based on values in the column sym of type symbol. Observe that we store sym along with the other columns.
t1:([] ti:09:30:00 09:31:00; sym:`:/db/sym?`ibm`t; p:101 17f) `:/am/2009.01.01/t/ set extr[t1;`a`m] `:/am/2009.01.01/t/ `:/nz/2009.01.01/t/ set extr[t1;`n`z] `:/nz/2009.01.01/t/ t2:([] ti:09:30:00 09:31:00; sym:`:/db/sym?`ibm`t; p:101.5 17.5) `:/am/2009.01.02/t/ set extr[t2;`a`m] `:/am/2009.01.02/t/ `:/nz/2009.01.02/t/ set extr[t2;`n`z] `:/nz/2009.01.02/t/
Now we create par.txt and map the table:
`:/db/par.txt 0: ("/am"; "/nz") `:/db/par.txt \l /db select from t where date within 2009.01.01 2009.01.02 date ti sym p ------------------------------- 2009.01.01 09:30:00 ibm 101 2009.01.01 09:31:00 t 17 2009.01.02 09:30:00 ibm 101.5 2009.01.02 09:31:00 t 17.5
1.4.2.3 Trades Segmented by Exchange
Start with a fresh /db. The exchange segments reside in the following directory structure:
/nyse /2009.01.01 /t /2009.01.02 /t /nasd /2009.01.01 /t /2009.01.02 /t
The corresponding par.txt is:
/nyse /nasd
The trade table has the schema:
([] ti:`time$(); sym:`sym$(); p:`float$(); ex:`sym$())
Segment construction is completely analogous to that of the previous example.
extr:{[t;e] select from t where ex=e} t1:([] ti:09:30:00 09:31:00; s:`:/db/sym?`ibm`aapl; p:101 17f; ex:`:/db/sym?`n`o) `:/nyse/2009.01.01/t/ set extr[t1;`n] `:/nyse/2009.01.01/t/ `:/nasd/2009.01.01/t/ set extr[t1;`o] `:/nasd/2009.01.01/t/ t2:([] ti:09:30:00 09:31:00; s:`:/db/sym?`aapl`ibm; p:143 102f; ex:`:/db/sym?`o`n) `:/nyse/2009.01.02/t/ set extr[t2;`n] `:/nyse/2009.01.02/t/ `:/nasd/2009.01.02/t/ set extr[t2;`o] `:/nasd/2009.01.02/t/ `:/db/par.txt 0: ("/nyse"; "/nasd") `:/db/par.txt \l /db select from t where date within 2009.01.01 2009.01.02 date ti s p ex ------------------------------- 2009.01.01 09:30:00 ibm 101 n 2009.01.01 09:31:00 g 17 o 2009.01.02 09:31:00 ibm 102 n 2009.01.02 09:30:00 aapl 143 o
1.4.2.4 Trades Segmented across Partitions
Start again with a fresh /db. The segmentation layout is:
/seg A /2009.01.01 /2009.01.02 /seg B /2009.01.02 /2009.01.03 /seg C /2009.01.04
so part.txt is:
/A /B /C
Segment construction is straightforward.
t1:([] ti:09:30:00 12:31:00; s:`:/db/sym?`ibm`t; p:101 17f) `:/A/2009.01.01/t/ set t1 `:/A/2009.01.01/t/ t2:([] ti:09:31:00 12:32:00; s:`:/db/sym?`ibm`t; p:102 18f) `:/A/2009.01.02/t/ set select from t2 where ti<=12:00:00 `:/A/2009.01.02/t/ `:/B/2009.01.02/t/ set select from t2 where ti>12:00:00 `:/B/2009.01.02/t/ t3:([] ti:09:33:00 12:33:00; s:`:/db/sym?`ibm`t; p:103 19f) `:/B/2009.01.03/t/ set t3 `:/B/2009.01.03/t/ t4:([] ti:09:34:00 12:35:00; s:`:/db/sym?`ibm`t; p:104 20f) `:/C/2009.01.04/t/ set t4 `:/C/2009.01.04/t/ `:/db/par.txt 0: ("/A";"/B";"/C") `:/db/par.txt \l /db select from t where date within 2009.01.01 2009.01.04 date ti s p --------------------------- 2009.01.01 09:30:00 ibm 101 2009.01.01 12:31:00 t 17 2009.01.02 09:31:00 ibm 102 2009.01.02 12:32:00 t 18 2009.01.03 09:33:00 ibm 103 2009.01.03 12:33:00 t 19 2009.01.04 09:34:00 ibm 104 2009.01.04 12:35:00 t 20
1.4.3 Multiple Segmented Tables
Two tables that share the same partitioning are not required to share the same segmentation decomposition, but they often do. This is commonly the case with tables such as trades and quotes that are used together.
We demonstrate a shared segmentation for daily trades and quotes that incidentally addresses a problem of some level-two tick sources—e.g., OPRA data can contain more than 2 billion (i.e., 2*10003) records per day. Since a list in q cannot currently have more that 2 billion items, the resulting partial columns in the slice would be too large. Our solution is to segment by the first character of the symbol, dividing the alphabet into (roughly) equal ranges.
Note: As of this writing (Jul 2010), kdb+ does not perform map-reduce optimization for joins—e.g., lj, aj or wj. If you need to perform large joins, consider setting up a dedicated join server, as the performance characteristics of joins are significantly different from those of aggregations. By carefully constructing your segmentation and using peach, you can nudge kdb+ into parallel I/O and concurrent processing for joins.
Our actual segmentation layout is designed to allow parallel retrieval for aj and wj. We first observe that both perform an equijoin on a single symbol as well as the date virtual column. Also observe that they perform a non-equijoin on the time column, so we need all the time values for a given symbol and date in one segment. Consequently, if we segment by symbol range, we can parallelize aj or wj across symbols.
Here is a simple version of this scheme:
//drive1 <- one day’s trades and quotes /a_m <- segment for first portion of alphabet /2009.01.01 <- the specific day /t <- that day’s trades for symbols a-m /q <- that day’s quotes for symbols a-m /2009.01.02 <- the specific day /t <- that day’s trades for symbols a-m /q <- that day’s quotes for symbols a-m ================= //drive2 <- one day’s trades and quotes /n_z /2009.01.01 <- the specific day /t <- that day’s trades for symbols n-z /q <- that day’s quotes for symbols n-z /2009.01.02 <- the specific day /t <- that day’s trades for symbols n-z /q <- that day’s quotes for symbols n-z ================= …
Here is the corresponding par.txt:
//drive1/a_m //drive2/n_z …
This segmentation is clearly disjoint and complete. From the above observations, we can use peach to achieve parallel I/O for aj and wj. One caution: were a query to return all quotes for a given day, the result would be too large to fit in a list if the (unsegmented) quote table exceeds 2 billion rows. You should guard against this in a real application.
Setting up the segments and partitions is a simple matter of getting the details right. In our simplified example, we pretend that the directories /am and /nz reside on different drives.
extr:{[t;r] select from t where (`$1#'string sym) within r} / day 1 t:([] ti:09:30:00 09:31:00; sym:`:/db/sym?`ibm`t; p:101 17f) q:([] ti:09:29:59 09:29:59 09:30:00; sym:`:/db/sym?`ibm`t`ibm; p:100.5 17 101) `:/am/2009.01.01/t/ set extr[t;`a`m] `:/am/2009.01.01/t/ `:/nz/2009.01.01/t/ set extr[t;`n`z] `:/nz/2009.01.01/t/ `:/am/2009.01.01/q/ set extr[q;`a`m] `:/am/2009.01.01/q/ `:/nz/2009.01.01/q/ set extr[q;`n`z] `:/nz/2009.01.01/q/ / day 2 t:([] ti:09:30:00 09:31:00; sym:`:/db/sym?`t`ibm; p:17.1 100.9) q:([] ti:09:29:59 09:29:59 09:30:00; sym:`:/db/sym?`t`ibm`t; p:17 100.8 17.1) `:/am/2009.01.02/t/ set extr[t;`a`m] `:/am/2009.01.02/t/ `:/nz/2009.01.02/t/ set extr[t;`n`z] `:/nz/2009.01.02/t/ `:/am/2009.01.02/q/ set extr[q;`a`m] `:/am/2009.01.02/q/ `:/nz/2009.01.02/q/ set extr[q;`n`z] `:/nz/2009.01.02/q/ `:/db/par.txt 0: ("/am"; "/nz") `:/db/par.txt dr:2009.01.01 2009.01.02 select from t where date within dr date ti sym p ----------------------------- 2009.01.01 09:30:00 ibm 101 2009.01.01 09:31:00 t 17 2009.01.02 09:31:00 ibm 100.9 2009.01.02 09:30:00 t 17.1 select from q where date within dr date ti sym p ----------------------------- 2009.01.01 09:29:59 ibm 100.5 2009.01.01 09:30:00 ibm 101 2009.01.01 09:29:59 t 17 2009.01.02 09:29:59 ibm 100.8 2009.01.02 09:29:59 t 17 2009.01.02 09:30:00 t 17.1
The naïve way to perform an asof join loads all requisite data at once.
aj[`date`sym`ti;select from t where date within dr; select from q where date within dr] date ti sym p ----------------------------- 2009.01.01 09:30:00 ibm 101 2009.01.01 09:31:00 t 17 2009.01.02 09:31:00 ibm 100.8 2009.01.02 09:30:00 t 17.1
Provided multiple data channels and cores are available to kdb+, we start kdb+ with multiple slaves and mimic its map-reduce strategy to take advantage of parallel data load against our segmentation.
>q -s 2 KDB+ ... aj1:{aj[`sym`ti;select from t where date=d; select from q where date=d]} raze aj1 peach 2009.01.01 2009.01.02 date ti sym p ----------------------------- 2009.01.01 09:30:00 ibm 101 2009.01.01 09:31:00 t 17 2009.01.02 09:31:00 ibm 100.8 2009.01.02 09:30:00 t 17.1
1.4.4 Query Execution on Segmented Tables (Advanced)
As you would expect, query execution against a segmented table involves additional complexity beyond partitioned table execution. The goal is to scale out by taking advantage of parallel I/O and concurrent processing. We would ideally like to achieve 100% saturation of the I/O channels and 100% utilization of each core. How do we approach these levels on a kdb+ server?
1.4.4.1 Preliminaries
The prime directive in kdb+ design and tuning is that a vector calculation on in-memory data is much faster than retrieving that data from storage. This suggests our first two objectives:
(1) Maximize the number of independent I/O channels to retrieve data in parallel.
(2) Maximize server memory in order to allocate each slave thread as much memory as it needs.
Suppose we have satisfied these two objectives and we have storage devices attached to the kdb+ server over n independent I/O channels. Because kdb+ can process segments in parallel, we are led to our next observation.
(3) Create n segments that spread the table across the channels in order to maximize I/O parallelization.
The precise form of segmentation will depend on the actual data and queries, but we ignore this detail in this discussion.
To ensure that data can be processed from all n channels simultaneously and that no two threads contend for data, we are led to:
(4) Have (at least) n slave threads—henceforth abbreviated slaves.
1.4.4.2 Execution over Segments
Here we describe how kdb+ executes a query against a segmented table in our scenario of n segments and n slaves. Essentially, it decomposes the query into two steps via map-reduce:
- map: a revised form of the original query that executes on each segment
- reduce: aggregate the segment results
Note: The use of map-reduce allows kdb+ to perform preliminary calculations as close to the data as possible and to perform aggregation centrally at the last step.
To begin, kdb+ compares the query’s requisite partitions to the segment layout in par.txt and determines the footprint of the target partitions on each segment. The result is a nested list, each item being the partition list for one segment.
To execute the map step, kdb+ creates a revised query containing the map sub-operation from the original query, which it dispatches to all n slaves via peach. Each slave is provided the partition list for one segment and is directed to compute the revised query for its segment. For example, the revised query for avg is:
- compute the sum and count of the sublist
Armed with this knowledge, we examine execution within one slave, where the revised query is applied against a segment’s partition footprint. Recalling §1.3.4, kdb+ sequentially applies the map sub-operations of the original query across the targeted partitions to obtain partition result tables that it collects into a list representing one segment result.
Now we stand back and examine execution across the slaves by squinting to make partition detail disappear. At this level, the original query’s map step has n slaves retrieving segment data in parallel and concurrently calculating segment results. Once all slaves complete, the nested list of segment results is flattened and reordered by partition value. Finally, kdb+ employs the original query reduce step to combine the full list of ordered partition results into the query result table. Whew!
Note: Kdb+ treats a vanilla partitioned table (i.e., without a par.txt) as having a single segment. The astute reader will realize that the description in §1.3.4 is actually the degenerate case of this section.
1.4.4.3 Slaves and Cores
Executing the query via map-reduce provides good progress toward our original objective of I/O saturation. The slaves can load data from all n channels in parallel without contention. We now investigate channel and core utilization.
Note: Kdb+ will only use as many slaves to process a query as there are segments in the query footprint.
I/O-bound scenario: Assuming that the query has light calculation compared to data retrieval (common in kdb+), having n slaves on n cores is close to optimal: most of the time, all n slaves will be waiting for data. When a partition load completes, there will be a brief burst of computation, followed by another long load. So we conclude:
- n channels => n segments => n slaves => n cores
Balanced I/O-compute scenario: Now we consider the case in which the query’s calculation is intensive. While one slave is waiting on data, another slave on its core could be crunching; conversely, while one slave is crunching another slave on its core could be loading data. Therefore, to maximize channel and core utilization, we actually want 2n slaves on n cores. In view of the above note, we conclude that we should have 2n segments, two per channel.
- n channels => 2n segments => 2n slaves => n cores
On average, there will be one slave per core loading data and one slave per core crunching the data it has just loaded.
These conclusions rely on many implicit assumptions that we have glossed over. In practice, you should view them as guidelines, with an eye towards feeding data to kdb+ as fast as possible. The optimum configuration for your situation will depend on your particular query mix. For example, queries that do VWAP calculations are closer to the first scenario, whereas queries doing regression analysis are closer to the second.
A good strategy is to construct your initial configuration using one of the above scenarios. Then load a close approximation of your anticipated data and query mix, and simulate a realistic user load. Observe the I/O saturation and CPU utilization and adjust the number of slaves and cores allocated to the q process accordingly.
1.4.5 Examples
Our examples are based on simulated trade data randomly generated to match one month of US equity data spanning August and September 2009. The data is in the table trade whose most significant columns time, sym, tp and ts represent respectively arrival time, instrument symbol, trade price and trade size columns.
The trade data is partitioned by date. The following output is generated from a version of kdb+ that is instrumented to show the intermediate steps during query execution.
The first example query shows how the simple aggregate avg tp is decomposed into the pair sum and count in the map step, followed by division of the sums of sums by the sum of counts for the reduce step. Here the data is not segmented--i.e., there is no par.txt file.
The added instrumentation exposes 3 stages of execution of a query:
- Analyze query:
- Decompose query into map and reduce components (if appropriate)
- Determine and apply partition constraints
- Map query onto segments and partitions (query plan)
- Execute map step, if appropriate
- Compute final result (reduce step)
q)select avg tp from trade where date in -3#date "--- map/reduce: input aggregates, map query, reduce query ---" (,`tp)!,(avg;`tp) `0`1!((sum;($["f"];`tp));(#:;`i)) (,`tp)!,(%;(sum;`0);(sum;`1)) "--- query plan: segments/partitions ---" (`trade;();()!();`0`1!((sum;($["f"];`tp));(#:;`i))) 2009.09.24 2009.09.25 2009.09.28 "--- partition query: query, segment, partition, result ---" (`trade;();()!();`0`1!((sum;($["f"];`tp));(#:;`i))) `:. 2009.09.24 +`0`1!(,1.419538e+10;,27914938) "--- partition query: query, segment, partition, result ---" (`trade;();()!();`0`1!((sum;($["f"];`tp));(#:;`i))) `:. 2009.09.25 +`0`1!(,1.419318e+10;,24485503) "--- partition query: query, segment, partition, result ---" (`trade;();()!();`0`1!((sum;($["f"];`tp));(#:;`i))) `:. 2009.09.28 +`0`1!(,1.388645e+10;,20162485) "--- query plan result ---" (+`0`1!(,1.419538e+10;,27914938);+`0`1!(,1.419318e+10;,24485503); +`0`1!(,1.388645e+10;,20162485)) "--- final result ---" tp -------- 582.5979
Note first that `0 and `1 are used as intermediate columns. Observe that avg tp is decomposed into a map step
`1:sum "f"$tp and `0:count i
and a reduce step
tp:(sum `0)%sum `1.
Also observe that expressions are displayed in their parse tree format and that count is expressed as its k equivalent: the monadic form of #.
The next example uses par.txt to segment the data into four segments: (/d/d1/data, /d/d2/data, /d/d3/data, /d/d4/data). It also uses a by date clause that eliminates the need to break up the query into map and reduce components (our data is partitioned by date). For clarity, we execute the query only on the last 16 partitions as well as on a subset of symbols. Observe that the "query plan" contains more information than that of the previous example: the query, a list of all partitions, a list of all segments, and a nested list of partitions belonging to segments.
q)select avg tp by date from trade where date in -16#date,sym in syms "--- query plan: segments/partitions ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) 2009.08.18 2009.08.19 2009.08.20 2009.08.21 2009.08.24 2009.08.25 2009.08.26 2009.08.27 2009.08.28 2009.08.31 2009.09.01 2009.09.02 2009.09.03 2009.09.04 2009.09.08 2009.09.09 `:/d/d1/data`:/d/d2/data`:/d/d3/data`:/d/d4/data ((`:/d/d1/data;2009.08.21 2009.08.25 2009.09.02); (`:/d/d2/data;2009.08.18 2009.08.26 2009.09.03); (`:/d/d3/data;2009.08.19 2009.08.27 2009.08.31 2009.09.04 2009.09.08); (`:/d/d4/data;2009.08.20 2009.08.24 2009.08.28 2009.09.01 2009.09.09)) "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d1/data 2009.08.21 +(,`tp)!,,15.42632 "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d1/data 2009.08.25 +(,`tp)!,,15.04996 "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d1/data 2009.09.02 +(,`tp)!,,14.16648 "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d2/data 2009.08.18 +(,`tp)!,,14.16883 ... (some output removed for brevity) ... "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d3/data 2009.09.08 +(,`tp)!,,15.59198 "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d4/data 2009.08.20 +(,`tp)!,,15.2657 "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d4/data 2009.08.24 +(,`tp)!,,14.75603 "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d4/data 2009.08.28 +(,`tp)!,,14.37194 "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d4/data 2009.09.01 +(,`tp)!,,13.25797 "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d4/data 2009.09.09 +(,`tp)!,,14.98316 "--- query plan result ---" (+(,`tp)!,,14.16883;+(,`tp)!,,15.05272;+(,`tp)!,,15.2657;+(,`tp)!,,15.42632; +(,`tp)!,,14.75603;+(,`tp)!,,15.04996;+(,`tp)!,,15.69218;+(,`tp)!,,15.53095; +(,`tp)!,,14.37194;+(,`tp)!,,14.32488;+(,`tp)!,,13.25797;+(,`tp)!,,14.16648; +(,`tp)!,,15.58938;+(,`tp)!,,16.1427;+(,`tp)!,,15.59198;+(,`tp)!,,14.98316) "--- final result ---" date | tp ----------| -------- 2009.08.18| 14.16883 2009.08.19| 15.05272 2009.08.20| 15.2657 2009.08.21| 15.42632 2009.08.24| 14.75603 2009.08.25| 15.04996 2009.08.26| 15.69218 2009.08.27| 15.53095 2009.08.28| 14.37194 2009.08.31| 14.32488 2009.09.01| 13.25797 2009.09.02| 14.16648 2009.09.03| 15.58938 2009.09.04| 16.1427 2009.09.08| 15.59198 2009.09.09| 14.98316
The above example was run with no slaves--i.e., on a single-threaded q process. Consequently, the queries run sequentially across segments and partitions.
Now observe what happens when we match the number of slaves to the number of segments in our data base by invoking q with -s 4.
q)select avg tp by date from trade where date in -16#date,sym in syms "--- query plan: segments/partitions ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) 2009.08.18 2009.08.19 2009.08.20 2009.08.21 2009.08.24 2009.08.25 2009.08.26 2009.08.27 2009.08.28 2009.08.31 2009.09.01 2009.09.02 2009.09.03 2009.09.04 2009.09.08 2009.09.09 `:/d/d1/data`:/d/d2/data`:/d/d3/data`:/d/d4/data ((`:/d/d1/data;2009.08.21 2009.08.25 2009.09.02); (`:/d/d2/data;2009.08.18 2009.08.26 2009.09.03); (`:/d/d3/data;2009.08.19 2009.08.27 2009.08.31 2009.09.04 2009.09.08); (`:/d/d4/data;2009.08.20 2009.08.24 2009.08.28 2009.09.01 2009.09.09)) "--- partition query: query, segment, partition, result ---" "--- partition query: query, segment, partition, result ---" "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d1/data 2009.08.21 "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d4/data 2009.08.20 (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d3/data 2009.08.19 (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d2/data 2009.08.18 +(,`tp)!,,15.55121 "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d3/data 2009.08.27 +(,`tp)!,,15.47055 +(,`tp)!,,15.21819 "--- partition query: query, segment, partition, result ---" "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d2/data 2009.08.26 (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d4/data 2009.08.24 +(,`tp)!,,14.81711 "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d1/data 2009.08.25 +(,`tp)!,,14.92875 "--- partition query: query, segment, partition, result ---" +(,`tp)!,,16.07275 "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d3/data 2009.08.31 +(,`tp)!,,15.55499 (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d4/data 2009.08.28 "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d2/data 2009.09.03 +(,`tp)!,,13.43061 +(,`tp)!,,15.29159 +(,`tp)!,,12.64993 "--- partition query: query, segment, partition, result ---" "--- partition query: query, segment, partition, result ---" "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d3/data `:/d/d4/data 2009.09.04 2009.09.01 +(,`tp)!,,176.6311 `:/d/d1/data 2009.09.02 +(,`tp)!,,151.7784 +(,`tp)!,,13.67089 "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d4/data 2009.09.09 +(,`tp)!,,179.799 "--- partition query: query, segment, partition, result ---" (`trade;,(in;`sym;`syms);(`symbol$())!`symbol$();(,`tp)!,(avg;`tp)) `:/d/d3/data 2009.09.08 +(,`tp)!,,193.7031 +(,`tp)!,,48.75286 "--- query plan result ---" (+(,`tp)!,,15.47055;+(,`tp)!,,15.55121;+(,`tp)!,,15.21819;+(,`tp)!,,14.81711; +(,`tp)!,,16.07275;+(,`tp)!,,15.29159;+(,`tp)!,,15.55499;+(,`tp)!,,14.92875; +(,`tp)!,,12.64993;+(,`tp)!,,13.43061;+(,`tp)!,,13.67089;+(,`tp)!,,151.7784; +(,`tp)!,,176.6311;+(,`tp)!,,179.799;+(,`tp)!,,193.7031;+(,`tp)!,,48.75286) "--- final result ---" date | tp ----------| -------- 2009.08.18| 15.47055 2009.08.19| 15.55121 2009.08.20| 15.21819 2009.08.21| 14.81711 2009.08.24| 16.07275 2009.08.25| 15.29159 2009.08.26| 15.55499 2009.08.27| 14.92875 2009.08.28| 12.64993 2009.08.31| 13.43061 2009.09.01| 13.67089 2009.09.02| 151.7784 2009.09.03| 176.6311 2009.09.04| 179.799 2009.09.08| 193.7031 2009.09.09| 48.75286
Now the output from four slaves running concurrently is interleaved. One slave executes for each segment, with each slave executing the query on its segment sequentially across its partitions. An appropriately configured server can take advantage of available I/O bandwith to speed up query execution using segments and slaves.
Prev: 1.3 Partitioned Tables Next: 1.5 Using .Q Utilities for Splayed and Partitioned Tables