Skip to main content

Design a Database for Billions of Rows

The problem

You are building an analytics platform for a mobile gaming company. Every user action -- every tap, swipe, purchase, and level completion -- is recorded as an event. Each event has a timestamp, a user ID, a device type, and a variable set of attributes that differ by event type (a purchase event has price and item_id; a level-completion event has score and time_spent).

The numbers are large and growing fast:

  • 2 billion new events per day (roughly 25,000 writes per second sustained, with spikes to 100K/sec)
  • 500 TB of total data and growing
  • Events are append-only -- once written, they are never updated
  • Reads query by user ID and time range: "give me all events for user X in the last 7 days"
  • The schema varies between event types -- some events have 5 attributes, others have 50

A traditional relational database cannot handle this. A single MySQL instance tops out at a few terabytes and cannot sustain 25K writes/sec. Sharding a relational database is possible but painful -- you lose joins, transactions become distributed, and schema changes require coordinating across hundreds of shards.

You need a database designed from the ground up for this workload: write-heavy, append-mostly, read-by-row-key, variable schema, petabyte scale.

Key requirements to identify

RequirementWhat it implies
Billions of rows, petabyte scaleMust partition data across many machines automatically
Write-heavy, append-mostlyOptimize the write path -- sequential I/O, no random seeks on write
Variable schemaA column-family or wide-column model, not a rigid relational schema
Read by row key + time rangeRow key design is critical -- must support efficient range scans
No updates to existing dataImmutable on-disk structures (SSTables) work perfectly
High availabilityReplication across nodes and racks; tolerate machine failures

The design approach

The write path is the first thing to optimize. Writing directly to sorted on-disk files would require random I/O (inserting into the middle of a sorted structure). Instead, buffer writes in an in-memory sorted structure (a MemTable), and when it reaches a threshold, flush it as a sorted, immutable file (an SSTable) to disk. Writes to the MemTable are sequential and fast. A write-ahead log (WAL) ensures durability if the process crashes before the flush.

Reads are trickier. The data you need might be spread across the MemTable and dozens of SSTables on disk. You need to merge results from all of them. To avoid checking every SSTable for a key that does not exist in it, attach a Bloom filter to each SSTable -- a probabilistic structure that tells you "definitely not here" or "maybe here" in constant time.

Over time, you accumulate many small SSTables. Compaction merges them into fewer, larger files, discarding tombstones (delete markers) and reducing read amplification.

For the data model, think in terms of column families. Each row has a row key (e.g., userID), and each column family groups related columns. Within a column family, columns are sorted, and you can have millions of columns per row. This is ideal for time-series data: use userID as the row key and timestamp as the column name within a column family.

Think first
Before reading the solution, consider: how would you design the row key for time-series event data? What happens if you use just the user ID -- will the data be distributed evenly? What if some users generate 1000x more events than others?

How the industry solved it

Two systems pioneered this design, and one open-source system brought it to the masses:

Google's BigTable

Google built BigTable to store web indexing data, Google Earth imagery, and Google Analytics events -- all workloads with the same characteristics as yours. BigTable introduced the sorted-string-table (SSTable) storage engine and the column-family data model that became the foundation for an entire class of databases.

Start here: BigTable Introduction

BigTable's architecture separates the storage layer (GFS) from the compute layer (tablet servers). A master server assigns tablets (contiguous ranges of rows) to tablet servers. Each tablet server manages reads and writes for its assigned tablets, using MemTables and SSTables.

Deep dive: Partitioning and Architecture | SSTable | BigTable Components

Apache Cassandra

Cassandra combines BigTable's storage engine with Dynamo's distribution model. It uses SSTables and MemTables for the storage layer but replaces BigTable's single-master tablet assignment with Dynamo-style consistent hashing and gossip-based membership. This makes it fully decentralized -- no single point of failure.

Start here: Cassandra Introduction

Key differences between BigTable and Cassandra

AspectBigTableCassandra
ArchitectureSingle master assigns tabletsFully decentralized (Dynamo-style ring)
ConsistencyStrong (single master decides)Tunable (quorum-based)
Failure modelMaster is a potential bottleneckNo single point of failure
StorageGFS (distributed file system)Local disk per node
Open sourceNo (proprietary to Google)Yes (Apache project)

Deep dive into Cassandra's write and read paths: Write Operation | Read Operation

Key patterns used

PatternWhy it is neededReference
Write-Ahead LogDurability for in-memory MemTable -- recover writes after a crashPattern
Bloom FiltersSkip SSTables that definitely do not contain the requested keyPattern
Consistent HashingDistribute rows across nodes (Cassandra)Pattern
Gossip ProtocolMembership and failure detection (Cassandra)Pattern
QuorumTunable consistency for reads and writes (Cassandra)Pattern
HeartbeatDetect failed tablet servers (BigTable)Pattern
ChecksumVerify data integrity on diskPattern

Compaction and tombstones

Since SSTables are immutable, deletes are handled by writing a tombstone marker. During compaction, tombstones suppress the deleted data in older SSTables. Understanding compaction strategies (size-tiered vs. leveled) is critical for production tuning.

Deep dive: Compaction | Tombstones

Design Challenge

Variation: Design a time-series metrics database

You are building a monitoring system that ingests 1 million metrics per second from 100,000 servers. Each metric has a name, a set of tags (key-value pairs), a timestamp, and a numeric value. Queries are always by metric name + tag combination + time range. Data older than 90 days can be downsampled to hourly aggregates.
Hints (0/4)