Rollup And Pre-Aggregates

While TSDB is designed to store original, full resolution data as long as there is space, queries for wide time ranges or over many tag combinations can be quite painful. Such queries can take a long time to complete or, in the worst case, kill TSDs with out-of-memory exceptions. As of OpenTSDB 2.4, a set of new APIs allow for storing and querying lower resolution data to answer such queries much quicker. This page will give you an overview of what rollups and pre-aggregates are, how they work in TSDB and how best to use them. Look in the API's section for specific implementation details.

Note

OpenTSDB does not itself calculate and store rollup or pre-aggregated data. There are multiple ways to compute the results but they all have benefits and drawbacks depending on the scale and accuracy requirements. See the Generating Rollups and Pre-Aggregates section discussing how to create this data.

Example Data

To help describe the lower resolution data, lets look at some full resolution (also known as raw data) example data. The first table defines the time series with a short-cut identifier.

Series ID Metric Tag 1 Tag 2 Tag 3
ts1 system.if.bytes.out host=web01 colo=lga interface=eth0
ts2 system.if.bytes.out host=web02 colo=lga interface=eth0
ts3 system.if.bytes.out host=web03 colo=sjc interface=eth0
ts4 system.if.bytes.out host=web04 colo=sjc interface=eth0

Notice that they all have the same metric and interface tag, but different host and colo tags.

Next for some data written at 15 minute intervals:

Series ID 12:00 12:15 12:30 12:45 13:00 13:15 13:30 13:45
ts1 1 4 -3 8 2 -4 5 2
ts2 7 2 8 -9 4   1 1
ts3 9 3 -2 -1 6 3 8 2
ts4   2 5 2 8 5 -4 7

Notice that some data points are missing. With those data sets, lets look at rollups first.

Rollups

A "rollup" is defined, in OpenTSDB, as a single time series aggregated over time. It may also be called a "time-based aggregation". Rollups help to solve the problem of looking at wide time spans. For example, if you write a data point every 60 seconds and query for one year of data, a time series would return more than 525k individual data points. Graphing that many points could be pretty messy. Instead you may want to look at lower resolution data, say 1 hour data where you only have around 8k values to plot. Then you can identify anomalies and drill down for finer resolution data.

If you have already used OpenTSDB to query data, you are likely familiar with downsamplers that aggregate each time series into a smaller, or lower resolution, value. A rollup is essentially the result of a downsampler stored in the system and called up at will. Each rollup (or downsampler) requires two pieces of information:

  • Interval - How much time is "rolled" up into the new value. For example, 1h for one hour of data or 1d for a day of data.
  • Aggregation Function - What arithmetic was performed on the underlying values to arrive at the new value. E.g. sum to add all of the values or max to store the largest.

Warning

When storing rollups, it's best to avoid functions such as average, median or deviation. When performing further downsampling or grouping aggregations, such values become meaningless. Instead it's much better to always store the sum and count from which, at least, the average can be computed at query time. For more information, see the section below.

The timestamp of a rolled-up data point should snap to the top of the rollup interval. E.g. if the rollup interval is 1h then it contains 1 hour of data and should snap to the top of the hour. (As all timestamps are written in Unix Epoch format, defined as the UTC timezone, this would be the start of an hour UTC time).

Rollup Example

Given the series above, lets store the sum and count with an interval of 1h.

Series ID 12:00 13:00
ts1 SUM 10 5
ts1 COUNT 4 4
ts2 SUM 8 6
ts2 COUNT 4 3
ts3 SUM 9 19
ts3 COUNT 4 4
ts4 SUM 9 16
ts4 COUNT 3 4

Notice that all timestamps align to the top of the hour regardless of when the first data point in the interval "bucket" appears. Also notice that if a data point is not present for an interval, the count is lower.

In general, you should aim to compute and store the MAX, MIN, SUM and COUNT for each time series when storing rollups.

Averaging Rollup Example

When rollups are enabled and you request a downsampler with the avg function from OpenTSDB, the TSD will scan storage for SUM and COUNT values. Then while iterating over the data it will accurately compute the average.

The timestamps for count and sum values must match. However, if the expected count value for a sum is missing, the sum will be kicked out of the results. Take the following example set from above where we're now missing a count data point in ts2.

Series ID 12:00 13:00
ts1 SUM 10 5
ts1 COUNT 4 4
ts2 SUM 8 6
ts2 COUNT 4  

The resulting avg for a 2h downsampling query would look like this:

Series ID 12:00
ts1 AVG 1.875
ts2 AVG 2

Pre-Aggregates

While rollups help with wide time span queries, you can still run into query performance issues with small ranges if the metric has high cardinality (i.e. the unique number of time series for the given metric). In the example above, we have 4 web servers. But lets say that we have 10,000 servers. Fetching the sum or average of interface traffic may be fairly slow. If users are often fetching the group by (or some think of it as the spatial aggregate) of large sets like this then it makes sense to store the aggregate and query that instead, fetching much less data.

Unlike rollups, pre-aggregates require only one extra piece of information:

  • Aggregation Function - What arithmetic was performed on the underlying values to arrive at the new value. E.g. sum to add all of the time series or max to store the largest.

In OpenTSDB, pre-aggregates are differentiated from other time series with a special tag. The default tag key is _aggregate (configurable via tsd.rollups.agg_tag_key). The aggregation function used to generate the data is then stored in the tag value in upper-case. Lets look at an example:

Pre-Aggregate Example

Given the example set at the top, we may want to look at the total interface traffic by colo (data center). In that case, we can aggregate by SUM and COUNT similarly to the rollups. The result would be four new time series with meta data like:

Series ID Metric Tag 1 Tag 2
ts1' system.if.bytes.out colo=lga _aggregate=SUM
ts2' system.if.bytes.out colo=lga _aggregate=COUNT
ts3' system.if.bytes.out colo=sjc _aggregate=SUM
ts4' system.if.bytes.out colo=sjc _aggregate=SUM

Notice that these time series have dropped the tags for host and interface. That's because, during aggregation, multiple, different values of the host and interface have been wrapped up into this new series so it no longer makes sense to have them as tags. Also note that we injected the new _aggregate tag in the stored data. Queries can now access this data by specifying an _aggregate value.

Note

With rollups enabled, if you plan to use pre-aggregates, you may want to help differentiate raw data from pre-aggregates by having TSDB automatically inject _aggregate=RAW. Just configure the tsd.rollups.tag_raw setting to true.

Now for the resulting data:

Series ID 12:00 12:15 12:30 12:45 13:00 13:15 13:30 13:45
ts1' 8 6 5 -1 6 -4 6 3
ts2' 2 2 2 2 2 1 2 2
ts3' 9 5 3 1 14 8 4 9
ts4' 1 2 2 2 2 2 2 2

Since we're performing a group by aggregation (grouping by colo) we have a value for each timestamp from the original data set. We are not downsampling or performing a rollup in this situation.

Warning

As with rollups, when writing pre-aggregates, it's best to avoid functions such as average, median or deviation. Just store sum and count

Rolled-up Pre-Aggregates

While pre-aggregates certainly help with high-cardinality metrics, users may still want to ask for wide time spans but run into slow queries. Thankfully you can roll up a pre-aggregate in the same way as raw data. Just generate the pre-aggregate, then roll it up using the information above.

Generating Rollups and Pre-Aggregates

Currently the TSDs do not generate the rollup or pre-aggregated data for you. The primary reason for this is that OpenTSDB is meant to handle huge amounts of time series data so individual TSDs are focused on getting their data into storage as quickly as possible.

Problems

Because of the (essentially) stateless nature of the TSDs, they likely won't have the full set of data available to perform pre-aggregates. E.g., our sample ts1 data may be written to TSD_A while ts2 is written to TSD_B. Neither can perform a proper group-by without reading the data out of storage. We also don't know at what time we should perform the pre-aggregation. We could wait for 1 minute and pre-aggregate the data but miss anything that came in after that minute. Or we could wait an hour and queries over the pre-aggregates won't have data for the last hour. And what happens if data comes in much later?

Additionally for rollups, depending on how users write data to TSDs, for ts1, we may receive the 12:15 data point on TSD_A but the 12:30 value arrives on TSD_B so neither has the data required for the full hour. Time windowing constraints also apply to rollups.

Solutions

Using rollups and pre-aggregates require some analysis and a choice between various trade-offs. Since some OpenTSDB users already have means in place for calculating this kind of data, we simply provide the API to store and query. However here are some tips on how to compute these on your own.

Batch Processing

One method that is commonly used by other time series databases is to read the data out of the database after some delay, calculate the pre-aggs and rollups, then write them. This is the easiest way of solving the problem and works well at small scales. However there are still a number of issues:

  • As data grows, queries for generating the rollups grow as well to the point where the query load impacts write and user query performance. OpenTSDB runs into this same problem when data compactions are enabled under HBase.
  • Also as data grows, more data means the batch processing time takes longer and must be sharded across multiple workers which can be a pain to coordinate and troubleshoot.
  • Late or historical data may not be rolled up unless some means of tracking is in place to trigger a new batch on old data.

Some methods of improving batch processing include:

  • Reading from replicated systems, e.g. if you setup HBase replication, you could have users query the master system and aggregations read from the replicated store.
  • Read from alternate stores. One example is to mirror all data to another store such as HDFS and run batch jobs against that data.

Queueing on TSDs

Another option that some databases use is to queue all of the data in memory in the process and write the results after a configured time window has passed. But because TSDs are stateless and generally users put a load balancer in front of their TSDs, a single TSD may not get the full picture of the rollup or pre-agg to be calculated (as we mentioned above). For this method to work, upstream collectors would have to route all of the data required for a calculation to a specific TSD. It's not a difficult task but the problems faced include:

  • Having enough RAM or disk space to spool the data locally on for each TSD.
  • If a TSD process dies, you'll either loose the data for the aggregation or it must be bootstrapped from storage.
  • Whenever the aggregation calculations are taking place, overall write throughput of the raw data can be affected.
  • You still have the late/historical data issue.
  • Since TSDB is JVM based, keeping all of that data in RAM and then running GC will hurt. A lot. (spooling to disk is better, but then you'll hit IO issues)

In general, queueing on a writer is a bad idea. Avoid the pain.

Stream Processing

A better way of dealing with rollups and pre-aggregates is to route the data into a stream processing system where it can be processed in near-real-time and written to the TSDs. It's similar to the Queuing on TSDs option but using one of the myriad stream processing frameworks (Storm, Flink, Spark, etc.) to handle message routing and in-memory storage. Then you simply write some code to compute the aggregates and spit the data out after a window has passed.

This is the solution used by many next-generation monitoring solutions such as that at Yahoo!. Yahoo is working to open source their stream processing system for others who need monitoring at massive scales and it plugs neatly into TSDB.

While stream processing is better you still have problems to deal with such as:

  • Enough resources for the stream workers to do their job.
  • A dead stream worker requires bootstrapping from storage.
  • Late/historical data must be handled.

Share

If you have working code for calculating aggregations, please share with the OpenTSDB group. If your solution is open-source we may be able to incorporate it in the OpenTSDB ecosystem.