A Practical Comparison of TimescaleDB and InfluxDB

14 days ago

Most finance, stock market, and signal analysis software projects deal with the time-series domain extensively. There is a lot of specialized time-series databases that make an engineer's life easier when dealing with data whose primary feature is being timestamped. If you haven't ever dealt with these, trying to decide which solution out of many to choose can be daunting.

When I had this problem choosing a time-series database for my algorithmic trader project, I hated to see all the effort go to waste (yeah, I'm really good at doing that donkey with two haystacks thing) and decided to document the process in this post. This should be helpful for you if you're just starting with specialized time series databases and would like input from people who had the same problem.

Why and What

Let's define the use case so that you can compare it to yours and decide if the results are going to be useful for you.

I need to be able to use the database with an asynchronous Python backend, be able to deploy a containerized version to a VM running Dokku, write many datapoints from multiple clients and select data by datetime intervals and various custom tags (for example a session ID that the data belongs to), or currency tags since I will be storing datapoints like stock exchange trades.

Considering the above, my database shortlist boiled down to TimescaleDB vs InfluxDB, both excellent products with wide adoption and corporate backing.

This is by no means an expert-level tutoring material for someone who wants comprehensive insight into TimescaleDB or InfluxDB. It's more of an insight into a thought process of how you eyeball a solution without any previous experience with the tech behind it, and probably a starting point of sorts if you want to get going with a proper time-series database for an early-stage project and don't have a lot of time to spend analyzing all the alternatives.

Data Generation

The source of my data is a generator function that produces "ticks". A tick is a data point representing a trade of a financial asset that happened at some point in time and that has some data associated to it, usually the price at which the trade was executed and the volume of the asset. For example, if someone bought two bitcoins at 6500 US dollars per bitcoin at January 1 05:00 2020, the asset of this rather hipster trade would be BTCUSD, the volume would be 2, and the price point 6500. Such data is a typical return value of many live APIs that provide access to various markets worldwide. My generator for these ticks looks like this:

def generate_data(start_time: datetime) -> Generator[dict, None, None]:
    initial_price = 10.0
    initial_timestamp = start_time.timestamp()
    time_increment = 0
    skip_points_between = (initial_timestamp + 7 * 60, initial_timestamp + 20 * 60)
    volume = 0.4
    while True:
        price = initial_price + round(math.sin(time_increment / 1000) * 3, 2)
        current_timestamp = initial_timestamp + time_increment
        time_increment += 0.1
        if current_timestamp > skip_points_between[0] and current_timestamp < skip_points_between[1]:
            continue
        raw_point = dict(
            session_id=123,
            timestamp=current_timestamp,
            price=price,
            volume=volume,
            funds=price * volume,
            label="BTCUSD"
        )
        yield raw_point
utils.py

Let's point out some important properties of the datapoints yielded by this generator:

Part 1: Install and Deployment

It's no use trying out a system that you can't deploy, so let's start with how to connect a DB container to an app in production context.

Deployment: TimescaleDB

TimescaleDB is deployed to Dokku just like any Postgres database: with the dokku-postgres plugin. There are two tweaks to be made. First is specifying a custom image that contains TimescaleDB:

export POSTGRES_IMAGE="timescale/timescaledb"
export POSTGRES_IMAGE_VERSION="latest-pg12"
dokku postgres:create testdb
Actually executable TimescaleDB install script

The second tweak is fixing a certificate issue mentioned on Github. You need to copy certificates from another, standard Postgres container created with dokku postgres:create as described in the issue link above. After this, you can use the DSN string to connect to the database as per standard Dokku Postgres setup process.

Deployment: InfluxDB

There is no actively developed plugin for InfluxDB for Dokku. What I did was to use a ready-made image for InfluxDB 2.0 and deploy it as a separate Dokku application. Then, Docker options can be specified for Dokku to set up a docker network link for any app that wants to talk to the influxDB container.

One way to do this is to use the --link Docker option that will pass a link argument whenever the service named (this is basically what Dokku service plugins like dokku-postgres do):

dokku apps:create influxdb-test
sudo docker pull quay.io/influxdb/influxdb:2.0.0-beta
sudo docker tag quay.io/influxdb/influxdb:2.0.0-beta dokku/influxdb-test
dokku tags:deploy influxdb-test latest
dokku docker-options:add testapp run "--link  testdb:testdb"
Example, not actually executable

Since linking containers is a deprecated flag in Docker, another way to do this is using bridge networks, a Dokku command set updated in version 0.20+, to connect containers to a separate network along the lines of:

dokku apps:create influxdb-test
dokku apps:create app-test
...
dokku network:create influxdb-bridge
dokku network:set influxdb-test attach-post-deploy influxdb-bridge
dokku network:set app-test attach-post-deploy influxdb-bridge

# Influxdb should be accessible via http://influxdb-test.web:5000
Not tested, meant for your own further research

Deployment: Results

I didn't really like the TimescaleDB certificates hack; someone researching this from scratch might lose a lot of time figuring out why their TimescaleDB setup doesn't work before discovering the issue. Having a ready-to-go Dokku plugin for TimescaleDB beats the need to deploy and setup a separate application for InfluxDB, but this is a point for TimescaleDB only if you're using something in the ballpark of Dokku. Production deployment category ends in a tie for these two.

Part 2: Local Development

I'musing docker-compose to run local development. Service definitions turn out to be simple for both contenders:

  influxdb:
    image: quay.io/influxdb/influxdb:2.0.0-beta
    ports:
      - 9999:9999
    command: influxd run --bolt-path /var/lib/influxdb2/influxd.bolt --engine-path /var/lib/influxdb2/engine --store bolt
    volumes:
      - influxdb:/var/lib/influxdb2
For InfluxDB
  timescaledb:
    image: timescale/timescaledb:latest-pg12
    environment:
      - POSTGRES_PASSWORD=postgres
    ports:
      - 5433:5432
    volumes:
      - timescaledb-data:/var/lib/postgresql/data
For TimescaleDB

Getting information on having InfluxDB to persist data via Docker volumes was a bit tricky and included for posterity; see Community link. For TimescalDB this is trivial as it's an extension over Postgres.

Local Development: Results

This is a clean-cut and expected TimescalDB win in my book because it needed much less time to figure out, mostly simply due to being Postgres based.

Part 3: Writing Many Datapoints

Both databases contain extensive tooling for batch writes. Batch writing is a complex and interesting use case for a multitude of online and offline workflows like ETL (extract-transform-load), with a lot of stuff to go wrong and lots on configuration to get right. What I have in mind as a goal of this comparison isn't very conducting to batch writes since I want data to be available immediately upon acquisition from a third-party API, and unpredictable network waits can negate any speed boosts from writing data in bulk. Therefore, I'm going to only focus on getting many singular write events to work concurrently and on developer experience, i.e. how much effort it is to get a working solution.

Writing Many Datapoints: TimescaleDB

Getting writes to work with TimescaleDB consists of two steps, with the Python quickstart as primary source:

Creating a hypertable that contains tick datapoints goes like this:

async def test_timescaledb():
    CONNECTION = "postgres://postgres:postgres@timescaledb:5432/template1"
    conn = await asyncpg.connect(CONNECTION)
    create_db = "CREATE DATABASE testdb;"
    try:
        await conn.execute(create_db)
        print('Database created')
    except asyncpg.exceptions.DuplicateDatabaseError:
        print('Database already exists')
    await conn.close()
    CONNECTION = "postgres://postgres:postgres@timescaledb:5432/testdb"
    conn = await asyncpg.connect(CONNECTION)
    query_create_table = """CREATE TABLE IF NOT EXISTS ticks (
                                            timestamp TIMESTAMPTZ NOT NULL,
                                            session_id INTEGER,
                                            data_type INTEGER,
                                            label VARCHAR(50),
                                            price DOUBLE PRECISION,
                                            volume DOUBLE PRECISION,
                                            funds DOUBLE PRECISION
                                            );"""

    try:
        await conn.execute("DROP TABLE ticks;")
        print('Dropped table `ticks`')
        await conn.execute(query_create_table)
        print('Created table `ticks`')
    except asyncpg.exceptions.DuplicateTableError:
        print('table already exists')
    query_create_hypertable = "SELECT create_hypertable('ticks', 'timestamp');"
    try:
        await conn.execute(query_create_hypertable)
        print('created hypertable')
    except Exception:
        pass

    index_command = """CREATE UNIQUE INDEX unique_ticks
        ON ticks(timestamp, session_id, data_type, label, funds);"""
    try:
        await conn.execute(index_command)
        print('Created unique index')
    except Exception:
        print('Index not created')
test_timescaledb.py

This is adapted from the TimescaleDB Quickstart and was relatively straightforward to figure out, with two extra steps beyond typical Postgres initialization where you don't have any migrations and just recreate raw database (by connecting to template1, a service DB) and its tables:

The asynchronous test for write is more of a concurrency scaffolding than actual code that writes to database:

    iterations = 100000

    async def feed_data(q):
        data_source = generate_data(datetime.datetime(2020, 1, 1))
        nonlocal iterations
        counter = iterations
        print('starting feeding data')
        for raw_data in data_source:
            counter -= 1
            if counter < 0:
                print('finishing feeding data')
                break
            await q.put(raw_data)

    async def write_data(q):
        conn = await asyncpg.connect(CONNECTION)
        try:
            while True:
                datapoint = await q.get()
                datapoint['timestamp'] = datetime.datetime.fromtimestamp(datapoint['timestamp']).replace(tzinfo=pytz.UTC)
                vals = [(key, f'${i + 1}', value) for i, (key, value) in enumerate(datapoint.items())]
                keys, formats, values = zip(*vals)
                keys = ", ".join(keys)
                formats = ", ".join(formats)
                try:
                    await conn.execute(f'''
                        INSERT INTO ticks({keys}) VALUES({formats})
                        ON CONFLICT (timestamp, session_id, data_type, label, funds) DO UPDATE
                        SET price=EXCLUDED.price,
                            volume=EXCLUDED.volume;
                    ''', *values)
                except asyncpg.exceptions.UniqueViolationError:
                    pass
                q.task_done()
        except asyncio.CancelledError:
            await conn.close()
            raise

    start = timer()
    ticks_q = Queue(50000)
    producer = asyncio.create_task(feed_data(ticks_q.async_q))
    consumers = [asyncio.create_task(write_data(ticks_q.async_q)) for _ in range(90)]

    await producer
    print('---- done producing')

    await ticks_q.async_q.join()
    for c in consumers:
        c.cancel()

    print('---- done consuming')
    end1 = timer()
    elapsed_clean = round(end1 - start, 4)

    ticks_q = Queue(50000)
    producer = asyncio.create_task(feed_data(ticks_q.async_q))
    consumers = [asyncio.create_task(write_data(ticks_q.async_q)) for _ in range(90)]
    await producer
    print('---- done producing')

    await ticks_q.async_q.join()
    for c in consumers:
        c.cancel()

    end2 = timer()
    elapsed_upsert = round(end2 - end1, 4)
test_timescaledb.py

What happens here is:

The amount of coroutines executed concurrently needs to stay below 100 because that is Postres' default connections limit. Optimizations like changing the database defaults are beyond our scope here, so we'll have to make do with this.

Writing Many Datapoints: InfluxDB

Instead of the run-of-the-mill SQL database and table creation of TimescaleDB, InfluxDB is normally set up via a web interface onboarding that creates initial organization, user, and bucket. The onboarding process creates a token to authorize requests to the DB with. It was a bit of a mind twister to get it to execute automatically; my initial attempt was to run influx setup while influxd was running in the background, and then bring influxd to foreground. This works but you need to insert something like sleep 25 in order to let influxd initialize properly. This isn't remotely how things should be done when working with dockerized stack.

A better solution is to call api/v2/setup and provide it with a predefined token, which isn't really shown in the docs:

async def test_influxdb():
    url = "http://influxdb:9999"
    bucket = "ticks"
    org = "livewater"
    token = "token"
    async with aiohttp.ClientSession() as session:
        payload = {
            "bucket": bucket,
            "org": org,
            "username": "influx",
            "password": "influxdb",
            "token": token,
        }
        async with session.post(f"{url}/api/v2/setup", json=payload) as resp:
            result = await resp.json()
    auth = result.get("auth")
    token = auth["token"] if auth else token
test_influxdb.py

From here on, implementation is a relatively smooth sailing with an occasional hiccup like having to init client and write_api inside the thread to avoid concurrent write problems:

    iterations = 100000
    async def feed_data(q):
        data_source = generate_data(datetime.datetime(2020, 1, 1))
        nonlocal iterations
        counter = iterations
        print('starting feeding data')
        for raw_data in data_source:
            counter -= 1
            if counter < 0:
                print('finishing feeding data')
                break
            await q.put(raw_data)
        return None

    def write_data(q, i):
        nonlocal token
        client = InfluxDBClient(url="http://influxdb:9999", token=token)
        write_api = client.write_api(write_options=ASYNCHRONOUS)
        for datapoint in iter(q.get, None):
            datapoint['timestamp'] = datetime.datetime.fromtimestamp(datapoint['timestamp']).replace(tzinfo=pytz.UTC)
            p = influxdb_client.Point("tick") \
                .time(datapoint['timestamp']) \
                .tag("session_id", datapoint['timestamp']) \
                .tag("label", datapoint['label']) \
                .tag("data_type", datapoint['data_type']) \
                .tag("funds", datapoint['funds']) \
                .field("price", datapoint['funds']) \
                .field("volume", datapoint['volume'])
            write_api.write(bucket=bucket, org=org, record=p)
            q.task_done()
        write_api.__del__()

    TCOUNT = 20
    loop = asyncio.get_running_loop()
    start = timer()
    ticks_q = Queue(50000)
    producer = asyncio.create_task(feed_data(ticks_q.async_q))
    executor = concurrent.futures.ThreadPoolExecutor(TCOUNT)
    consumers = [loop.run_in_executor(executor, write_data, ticks_q.sync_q, x) for x in range(TCOUNT)]

    await producer
    print('---- done producing')
    for _ in consumers:
        await ticks_q.async_q.put(None)
    await asyncio.wait({*consumers})

    print('---- done consuming')
    end1 = timer()
    elapsed_clean = round(end1 - start, 4)
test_influxdb.py

The main thing to know about concurrent writes to InfluxDB are:

Writing Many Datapoints: Results

100.0000k datapoints, write from scratch: 65.4766(s) elapsed.
100.0000k datapoints, on-conflict upsert: 54.4164(s) elapsed.
TimescaleDB Results
100.0000k datapoints, write from scratch: 44.3193(s) elapsed.
100.0000k datapoints, on-conflict upsert: 47.4069(s) elapsed.
InfluxDB Results

This is weak InfluxDB win on execution time and a tie on developer experience. Switching to batch execution for InfluxDB is as easy as changing one identifier; if you're not familiar with async execution in Python, you might want to brush up on that, but otherwise, for TimescaleDB, it was nice to be able to just use tested and tried asyncpg, and for InfluxDB, the line protocol and clean implementation of tags seem very natural to use. Or maybe it's just me, I love tags.

Part 4: Reading Aggregated Data

To test reading the timestamped datapoints, I wanted to implement the basic function that a lot of financial software might have: aggregating the tick data in our datapoints into OHLCV candles. A candle is a data structure that contains, at a predefined time step, the price of an asset at the start of an interval between two step points, prices at the highest and lowest points within the interval, closing price, and sum of sizes of all ticks that belong to the interval. I gave myself around half a day's worth of time to research and implement this for each database.

Reading Aggregated Data: TimescaleDB

This is the all the code that's needed to construct an OHLCV aggregation:

    query = """
        SELECT
            time_bucket('{minutes} minutes', timestamp) AS time,
            first(price, timestamp) as open,
            max(price) as high,
            min(price) as low,
            last(price, timestamp) as close,
            sum(volume) as volume
        FROM ticks
        WHERE session_id = 123
        GROUP BY time
        ORDER BY time ASC;
    """
    rows30 = await conn.fetch(query.format(minutes=30))
    for row in rows30:
		print(f"\t\
Time {row['time']}\t\
O{row['open']}\t\
C{row['close']}\t\
V{row['volume']}\t\
")
    await conn.close()
test_timescaledb.py

I have included some typical constraints that come in handy in real code, like filtering by session ID. The aggregation results can be read directly from rows, which is, again, basically how you would use asyncpg with a normal Postgres backend. Research and implmentation took a fraction of the time alloted to the task, which bodes well for DX going forward with the product in a larger project.

2020-01-01 00:00:00+00:00       O10.0   H13.0   L10.0   C12.92  V4080
2020-01-01 00:30:00+00:00       O12.92  H12.92  L8.67   C8.67   V7200
2020-01-01 01:00:00+00:00       O8.67   H8.67   L7.0    C7.68   V7200
2020-01-01 01:30:00+00:00       O7.68   H12.38  L7.68   C12.38  V7200
2020-01-01 02:00:00+00:00       O12.38  H13.0   L11.24  C11.24  V7200
2020-01-01 02:30:00+00:00       O11.24  H11.24  L7.07   C7.07   V7200
1min OHLCV aggregation: 0.3316(s) elapsed.
5min OHLCV aggregation: 0.1265(s) elapsed.
30min OHLCV aggregation: 0.1239(s) elapsed.
TimescaleDB aggregated read results

This printout shows several important points about our implementation:

Reading Aggregated Data: InfluxDB

In this part I ran into an unpleasant surprise. In the time alloted for the task, I couldn't figure out a proper custom aggregation expression written in InfluxDB's query language, Flux. The non-finished query looks something like this:

    query = f"""
        open = from(bucket:"{bucket}")
        |> range(start: 0, stop: now())
        |> filter(fn: (r) =>
            r._measurement == "tick" and
            r._field == "price"
        )
        |> aggregateWindow(
            every: 30m, fn: first
        )
        |> map(fn: (r) =>
            ({{r with _field: "open"}})
        )
        |> yield(name: "first")
        volume = from(bucket: "{bucket}")
        |> range(start: 0, stop: now())
        |> filter(fn: (r) =>
            r._measurement == "tick" and
            r._field == "volume"
        )
        |> aggregateWindow(
            every: 30m, fn: sum
        )
        |> map(fn: (r) =>
            ({{r with _field: "volume"}})
        )
        |> yield(name: "volume")
        union(tables: [open, volume])
        |> yield()
    """
test_influxdb.py – broken code, for illustration only

The idea seems to be to either union separate aggregations or write your own custom aggregation but both getting the logic right and figuring out whatever is going on with the Flux language implementation examples proved to be not possible within the given time bounds of half a day's work.

I'm sure there is rational reasoning behind needing to learn a new language in order to be able to use InfluxDB 2.0 but that's beyond the scope of this comparison. It's certainly possible to just use the older versions for current projects in production.

Note: for versions 1.8 and lower, this read task would probably result in an implementation similar to TimescaleDB since InfluxDB <1.8 supports an SQL-like query language.

Reading Aggregated Data: Results

In the context of our target of a quick and good enough implementation, this results in TimescaleDB win due to significantly more complexity involved in doing the same task with InfluxDB.

Results

Disclaimer: this is not a rigorous benchmark, see considerations in the "Why and What" section above.

My research for my specific needs resulted in a win for TimescaleDB from the developer experience (DX) point of view. I would argue that DX accounts for what matters the most at the early stages of project development. It's pretty old to say that you want to iterate quickly and be productive from the get go (who doesn't, regardless of project stage or size?), but what really makes the difference here is a developer's luxury to afford to be sure that the new library or product won't throw them curveballs as they start using it in more advanced ways. In my eyes, TimescaleDB, empowered by Postgres' maturity, does exactly that. This little test drive allowed me to put my mind at peace about which DB backend to go forward with, and provided a fun experience in writing practical reports about tooling research. Hope it was useful for you!