I created the timescale gem and wrote an introductory post on how using the timescale gem with ruby.

Now, it’s time to learn more about the continuous aggregates feature. Accordingly, the Timescale website says:

Continuous aggregates are designed to make queries on very large datasets run faster. TimescaleDB continuous aggregates use PostgreSQL materialized views to continuously and incrementally refresh a query in the background, so that when you run the query, only the data that has changed needs to be computed, not the entire dataset.

This feature is a core feature of the TimescaleDB. If you’re already using TimescaleDB probably, you’ll find an excellent opportunity to use this.

We’re going to use the time_bucket function that was already explored in the previous post but now using applying the candlestick pattern.

Candlesticks are graphical representations of price movements for a given period. They are commonly formed by a financial instrument’s opening, high, low, and closing prices. Learn more.

Migration for the hypertable creation

Let’s continue with a minimal migration system to prove the concept before we jump into a more advanced migration scenario. In this example, we’ll start with creating the ticks table representing stock market events and then group the events by minute to show the candlestick pattern.

ActiveRecord::Base.connection.instance_exec do
  drop_table :ticks, force: :cascade

  hypertable_options = {
    time_column: 'time',
    chunk_time_interval: '1 day',
    compress_segmentby: 'symbol',
    compress_orderby: 'created_at',
    compression_interval: '7 days'

  create_table :ticks, hypertable: hypertable_options, id: false do |t|
    t.timestamp :time
    t.string :symbol
    t.decimal :price
    t.integer :volume

The previous code instruction only involves the hypertable creation, and \ the continuous aggregate steps will be covered soon.

Note that the drop_table statement uses force: :cascade as it will also destroy the respective view if it exists. This example is also intended to be for testing purposes only as it’s also dropping and recreating the table every time you run it.

ActiveRecord model for the hypertable

And here is the ActiveRecord model simplified for the example.

class Tick < ActiveRecord::Base
  self.table_name = 'ticks'
  self.primary_key = nil

  acts_as_hypertable time_column: 'time'

The only data we need to override for now is the time_column, but you can override anything in the official documentation.

Generating data

During this example, we’re not going to connect to a market data stream, but generate some fake data to just understand how to use it.

Let’s create fake data for the FAANG stocks.

Let’s define some helper variables that can help to generate the data:

OPERATION = [:+, :-]
RAND_VOLUME = -> { (rand(10) * rand(10)) * 100 }

Clarifying step by step:

Now defining the fake data:

def generate_fake_data(total: 100)
  previous_price = {}
  time = Time.now
  (total / FAANG.size).times.flat_map do
    FAANG.map do |symbol|
      time += rand(1)
      if previous_price[symbol]
        price = previous_price[symbol].send(OPERATION.sample, rand(10) / 100.0)
        price = 50 + rand(100)
      payload = { time: time, symbol: symbol, price: rand(), volume: RAND_VOLUME.() }
      previous_price[symbol] = price

It will generate a bunch of payloads that can be inserted.

The last step in generating the data is combining the data generated with the insert command. Again, we can go with insert_all, a faster method to persist the data using ActiveRecord.

batch = generate_fake_data total: 50
ActiveRecord::Base.logger = nil
Tick.insert_all(batch, returning: false)
ActiveRecord::Base.logger = Logger.new(STDOUT)

To confirm the data is of good quality, take a look on what are the generated prices:

FAANG.inject({}) do |h, s|
  h[s] = Tick.where( symbol: s).order(:time).pluck(:price).map(&:to_f)

The results will be a hash like this:

{"META"=>[142.0, 141.98, 141.96, 141.98, 141.98, 141.98, 141.96, 141.96, 141.97, 141.97],
 "AMZN"=>[103.0, 103.0, 103.01, 103.0, 103.01, 103.0, 103.0, 103.01, 103.03, 103.05],
 "AAPL"=>[82.0, 81.99, 81.98, 81.96, 81.96, 81.96, 81.97, 81.99, 81.97, 81.97],
 "NFLX"=>[60.0, 59.99, 60.0, 59.99, 59.99, 60.01, 60.0, 60.02, 60.02, 60.02],
 "GOOG"=>[148.0, 147.99, 147.98, 148.0, 148.01, 148.02, 148.03, 148.02, 148.02, 148.01]}

The data varies very slowly, as expected. Now, feel free to change the fake data generation to generate at least a few hours of data. 10k will generate at least a few hours.

batch = generate_fake_data total: 10_000

Querying the candlestick

Now, it’s time to focus on the Tick model again and add the OHLC method to return the candlestick from ActiveRecord. As it will return attributes not recognized by the model, let’s declare the accessors to be easier to read the values later.

class Tick < ActiveRecord::Base
  # skipping previous code for readability
  %w[open high low close].each{|name| attribute name, :decimal}

  scope :ohlc, -> (timeframe='1m') do
    select("time_bucket('#{timeframe}', time) as time,
      FIRST(price, time) as open,
      MAX(price) as high,
      MIN(price) as low,
      LAST(price, time) as close,
      SUM(volume) as volume").group("1,2")

Testing the data:

Tick.where(symbol: "GOOG").ohlc
[#<Tick:0x00007f9d62fdbb58 time: 2022-09-20 12:00:00 UTC, symbol: "GOOG", volume: 18600, open: 0.83e2, high: 0.8305e2, low: 0.8298e2, close: 0.8305e2>,
 #<Tick:0x00007f9d62fdba90 time: 2022-09-20 12:01:00 UTC, symbol: "GOOG", volume: 6000, open: 0.8305e2, high: 0.8307e2, low: 0.8305e2, close: 0.8306e2>,
 #<Tick:0x00007f9d62fdb9c8 time: 2022-09-20 12:02:00 UTC, symbol: "GOOG", volume: 55400, open: 0.8307e2, high: 0.8314e2, low: 0.8307e2, close: 0.8313e2>,
 #<Tick:0x00007f9d62fdb900 time: 2022-09-20 12:03:00 UTC, symbol: "GOOG", volume: 29300, open: 0.8315e2, high: 0.832e2, low: 0.8315e2, close: 0.832e2>,
 ... # more records here ]

Now testing with one-hour intervals:

Tick.order('time').where(symbol: "GOOG").ohlc('1h')
# => [#<Tick:0x00007f9d62f2ae98 time: 2022-09-20 12:00:00 UTC, symbol: "GOOG", volume: 1617800, open: 0.83e2, high: 0.8325e2, low: 0.829e2, close: 0.83e2>,
#   #<Tick:0x00007f9d62f2add0 time: 2022-09-20 13:00:00 UTC, symbol: "GOOG", volume: 1634600, open: 0.83e2, high: 0.832e2, low: 0.8261e2, close: 0.8261e2>,
#   #<Tick:0x00007f9d62f2ad08 time: 2022-09-20 14:00:00 UTC, symbol: "GOOG", volume: 783300, open: 0.826e2, high: 0.826e2, low: 0.8231e2, close: 0.8249e2>]

It seems like everything is working as expected, and the last step is finally to create the continuous aggregates 🎉

Creating the continuous aggregates

Every time the ohlc scope is invoked, it queries the raw data from the ticks table and groups the data to consume. So let’s go with a materialized view that will make the same for us and continuously aggregate the values.

Continuous aggregates will persist the data from past minutes and only compute the data for the still open candlesticks. So, for example, if you’re processing buckets of one hour, it will materialize the data from previous hours and only compute the data from the last hour.

ActiveRecord::Base.connection.instance_exec do
  create_continuous_aggregates('ohlc_1m', Tick.ohlc('1m'), with_data: true)

The create_continuous_aggregates method is available in the connection scope that is the same of the Rails Migrations. In the background, the following query is being executed:

WITH (timescaledb.continuous) AS
SELECT time_bucket('1m', time) as time,
      FIRST(price, time) as open,
      MAX(price) as high,
      MIN(price) as low,
      LAST(price, time) as close,
      SUM(volume) as volume FROM "ticks" GROUP BY 1,2

The last parameter, with_data, will automatically process the present data. If false, the refresh_continous_aggregates can do the job later. If you have too much data to process, and you’re adding it to a Rails migration, maybe that’s a good idea to process it in a background job to not block your deployment.

class Ohlc1m < ActiveRecord::Base
  self.table_name = 'ohlc_1m'
  attribute :time, :time
  attribute :symbol, :string
  %w[open high low close volume].each{|name| attribute name, :decimal}

  def readonly?

Comparing performance here is unfair as we don’t have enough data to see too much difference. But, testing with 10k, it’s already 4 times faster using pre-processed data:

Tick.where(symbol:"AAPL").ohlc('1m') # Tick Load (13.8ms) 
Ohlc1m.where(symbol:"AAPL").all      # Ohlc1m Load (3.6ms)

Scenic support

If you’re using the scenic views, it will work smoothly too. However, the scenic gem doesn’t support the WITH clause in the views, and the Timescale gem adds this support. It dumps the views with the WITH (timescaledb.continuous) statement in SQL that is skipped in the official gem.

Extra resources

Download the code from this tutorial here and test yourself!

Here are a few extra resources that can be useful if you’re hacking TimescaleDB with Ruby.

  1. Using TimescaleDB gem with Ruby
  2. Official Gem documentation
  3. Timescale Gem GitHub project
  4. Official examples from source code

Also, there is a fantastic video from Ryan Booz that dives deeper into Continuous Aggregates and how it compares to PostgreSQL materialized views.

Share → Twitter Facebook Linkedin

Hello there, my name is Jônatas Davi Paganini and this is my personal blog.
I'm developer advocate at Timescale and I also have a few open source projects on github.

Check my talks or connect with me via linkedin / twitter / github / instagram / facebook / strava / meetup.