Back to index
F1: A Distributed SQL Database That Scales
Jeff Shute, Radek Vingralek, Bart Samwel, Ben Handy, Chad Whipkey, Eric Rollins, Mircea Oancea, Kyle Littlefield, David Menestrina, Stephan Ellner, John Cieslewicz, Ian Rae, Traian Stancescu, Himani Apte
Google, UW-Madison
Summary by: Zuyu Zhang
One-line Summary
F1 (Filial 1 hybrid) is a distributed relational database system that provides the scalability of NoSQL systems, along with five novel features: “ distributed query engine, consistent secondary indexes, asynchronous schema changes, optimistic transactions, and automatic history logging, tracking and publishing ”.
Overview/Main Points
- Main Features
- Scalability
- Scale up by adding resources trivially and transparently, without data movement.
- Availability
- Survive from datacenter outages, planned maintenance, schema changes, etc.
- Fault-tolerant
- Strong Consistency
- Usability
- Support complex SQL queries and joins among heterogenous data sources (Spanner, Bigtable, CSV files, aggregated analytical data for AdWords)
- Indexes (secondary)
- Key Insights
- Explicit Data Clustering
- Tables with relational hierarchical schema and Columns with Protocol Buffer (structured) data types
- Improve data locality and reduce the number of RPCs for fetching remote data
- Parallelism
- Give an upper bound for the number of RPCs
- batch
- Asynchronous reads
- Object Relational Mapping library: a new ORM layer with a new stripped-down API (explicit object loading, parallel and asynchronous read access; neither join because loading all childing of an object with hierarchyically structured primary keys could be expressible as a single range read, nor serial reads (for loops whose interation has only one query) and implicityly traverse any relationships between records).
- Architecture
- F1 client library
- Interacts with users and supports tools like the command-line ad-hoc SQL shell.
- A full-fledged SQL interface, used for low-latency OLTP queries, large OLAP queries, and everything in between; supporting joining data from Spanner data store with other sources including Bigtable, CSV files, and the aggregated analytical data warehouse for AdWords.
- NoSQL interface: Key/Value based interface to access rows, used by ORM layer under the hood and client directly; batch retrieval of rows from multiple tables in a single call.
- Load Balancer
- Chooses an F1 server in a nearby datacenter to avoid unnecessary request latency.
- F1 Server
- Co-located in the same set of datacenters as the Spanner servers.
- processes data from remote and coordinates query execution.
- No data storage. Therefore, scalable without data movement.
- Stateless, unless for pessimistic transactons using locks.
- A client is bound to one F1 server during the toal transaction.
- Supports large-scale data processing through MapReduce framework.
- F1 Master
- Maintains slave pool membership.
- Monitors slave process health.
- Distributes the list of available slaves to F1 servers.
- Slave Pool (shared)
- Executes parts of distributed query plans.
- Spanner servers
- Provide extremely scalable data storage, synchronous cross-datacenter replication, and strong consistency and ordering.
- Retrieve data from Colossus in the same datacenter for both F1 server and MapReduce workers, and handle low-level storage issues like persistence, caching, replication, fault tolerance, data sharding and movement, location lookups, and transactions.
- Partition rows into clusters called directory using ancestry relationships. (Group → Directories → Fragments) Each group typically has one replica tablet per datacenter. (Groups may include readonly replicas) All tablets for a group store the same data. One replica tablet is elected as the Paxos leader, the entry point for all transactional activity for the group.
- What if the leader is not co-located with the F1 server that is serving the request? The F1 server has to connect to the remote leader replica. By designating one datacenter as a preferred leader location, the total cost, which consists of at least two round trips to the leader (reads followed by a commit), an extra read as a part of xact commit by F1 server
(to get old values for ChangeHistory, index updates, optimistic xact TS verification, and referential integrity checks), could reduce. 50 ms minimum latency for a round trip to central datacenter.
- Paxos-based synchronous cross-datacenter replication; relatively high commit latency (50-150 ms)
- Serializable pessimistic transactions using strict two-phase locking
- Multiple reads, taking S or X locks, followed by a single writes that upgrades locks and atomically commits the xact.
- All commits are synchronously replicated using Paxos.
- Data updates across multiple groups, called participants uses two-phase commit (2PC) on top of Paxos, adding an extra network round trip so that doubles latency.
- 2PC scales well up to 10s of participants, but has frequent aborts and significant latency with 100s of participants.
- extremely scalable data storage; adding new servers results in data-redistribution (transparent to F1 servers)
- strong consistency
- ordering properties (TrueTime, globally total ordered timestamp for xact commits)
- multi-versioned consistent reads, including snapshot reads without read locks.
- For guaranteed non-blocking, globally consistent reads, using a globally safe timestamp (lags current time by 5-10s) ensures no in-flight or future xact would commit.
- Colossus File System (the successor to GFS), not a globally replicated service.
- Data Model
- Hierarchically Clustered Schema (logically), where the rows of a child table whose primary key has a foreign key to its parent table as a prefix are physically interleaved with a row of the parent table (root row) in a single Spanner directory.
- + Reading a child table for a given root row forms a single range read, rather than reading each row individually using an index.
- + Most AdWords xacts updates data for a directory (a root table and clustered related tables) at a time
- Join
- Data return by Spanner in Interleaved Order (a pre-order depth-first traversal), ordered by primary key prefix.
- Join between rows from the two tables uses a simple ordered join, due to primary key order for row store.
- Join between sibling table: one join uses cluster join, while the others use alternate algorithm.
- How to store child rows sorted by primary key to facilitate fast data modification without moving rest of data?
- has several root tables, and supports a completely flat MySQL-style schema.
- Protocol Buffers (PB)
- Structured data types in a given schema and the binary encoding format for RPCs and file formats.
- Actually a serialized copy of protocol messages; protocol message object refers to an in-memory object representing the parsed message.
- Typed field: required, optional, repeated (variable-length arrays), or even nested.
- Use repeated fields over child tables when # of child records has a low upper bound to avoid overhead of complexity of storing and joining multiple child records.
- The entire PB is effectively treated as one blob by Spanner.
- PB valued columns could be logically read and written as atomic units
- "Many tables consist of just a single PB column. Other tables split their data across a handful of columns, partitioning the fields according to access patterns."
- Group together fields that are usually accessed together
- Separate fields with static and frequently updated data
- Allow specifying different read/write permissions per column
- Allow concurrent updates to different columns
- Indexing: transactional and fully consistent
- Local index (one physical storage layout), such as (CustomerId, Keyword), whose key has the root row primary key as a prefix stores in the same Spanner directory as the root row.
- Global index (the other physical storage layout), such as (Keyword): large, has high aggregate update rates, sharded across many directories and stored on multiple Spanner servers. Adding a single extra participant for xact that updates a global index; use 2PC but have the scalable issues.
- Store as separate tables in Spanner, keyed by a concatenation of the index key (Scalar columns or PB fields including repeated ones) and the indexed table's primary key.
- Locking Granularity
- row-level locking by default
- Increasing concurrency by adding column-level locking (compatible with row-level locking?) for tables with concurrent writers to update a different set of columns
- Reducing concurrency by using a lock column in a parent table that covers columns in a child table to avoid insertion phantoms for specific predicates or enforce other business logic constrains (e.g. a limit on keyword count per AdGroup, and make keyword distint) correctly and easier.
- Transactions
- Multiple reads, with optionally a following single write (ACID, hard requirements on data integrity and full consistency).
- Snapshot transactions (default for SQL queries & MapReduces)
- Read-only xacts with repeatable data under a fixed Spanner snapshot timestamp(TrueTime).
- Globally consistent result.
- Optimistic transactions (default for F1 clients)
- arbitrarily long read phase (without Spanner locks) with a short write phase.
- Verification for row-level conflicts based on each row's last modification timestamp (automatically written in both optimistic & pessimistic xacts) stored in a hidden lock column
- Client sends back the last modification TS to an F1 server with the write operation.
- Then the F1 server creates a short-lived Spanner pessimistic xact and re-reads the last modification TS for all read rows.
- Aborts if two TS are different. O.w., F1 sends the writes on to Spanner to finish the commit.
- + Tolerate misbehaved clients
- reads never hold locks and never block writes
- long-lasting xacts (killed if idle within 10s, to avoid leaking locks) used for user interactions & xact debug via single-stepping
- server-side retriability (retry transparently in the F1 server)
- server failover (Client keeps all states associated with the xact, and retry after failures or for better load balance.)
- speculative writes from reads outside the xact (possibly in a MapReduce).
- Cons
- Insertion phantoms (may avoid by parent-table locks for insertion with predicates, due to no modification TS for non-exist rows)
- low throughput under high contention (increment a counter concurrently by multi-site clients).
- Pessimistic transactions
- Along with a stateful communication protocol using S or X locks, maps directly to Spanner xacts; abort if the F1 server restarts.
- + Tolerate high contention with compromising throughput by exclusive locks.
- If each commit takes 50 ms, at most 20 xacts per second are possible.
- Application-level changes such as batching updates would mitigate throughput slowdown.
- Cons
- long-running xact often cannot commit due to the idle-kill policy
- Cannot retry to perform the user's business login due to the required locking side-effects.
- Optimistic xact and pessimistic xact could mix together without losing ACID, while Snapshot xact is indepedent of any write xacts, and thus are always consistent.
- SQL Query Processing
- All data remote and arbitrarily partitioned with few ordering properties, and heavily-used batching and pipelining access mode; scheduling multiple data accesses in parallel often results in near-linear speedup because of distributed data storage (at a finer-grained level in CFS) with less likely contentions.
- Centralized execution (low-latency): short OLTP-style running on a F1 server.
- Disributed execution (high parallelism, always snapshot xacts): OLAP-style queries spreaded over the F1 slave pool. Allow for maximum read request concurrency and limit the space for row buffering, while losing the ordering properties of the input data.
- Distributed query plan overview: tens of plan parts, organized as DAG with data flowing up from the leaves to a single root node (query coordinator, the F1 server that received the query request and will perform any final aggregation, sorting, or filtering, and return the result, except for partitioned consumers), to execute the same query subplan.
- a link table
- lookup join operator using a secondary index key and a hash table for fast lookup, and taking advantages of batching.
- Read from the inner table using equality lookup keys.
- Retrieve rows from the outer table, extracting the lookup key values and deduplicating those keys, until gathered 50MB worth of data or 100,000 unique lookup key values.
- Perform a simultaneous lookup of all keys in the inner table.
- Return the requested data immediately in arbitrary order.
- repartition the data stream by a hash of the join keys, as Spanner partitiones data randomly and arbitrarily, no ordering properties exist (no explicit co-partition), without worries about network capacity and concepts like rack affinity due to scalable network switcher (but limits the size of an F1 cluster to the limits of HW switcher).
- another distributed hash join
- repartition the data stream by a hash of another key fields
- a distributed aggregation: aggregate locally inside small buffer, and then repartition the data by a hash of the grouping keys (spill hash table into disk if necessary), and finally perform a full aggregation.
- F1 SQL operators execute in memory, without checkpointing into disk, and stream data as much as possible. Failed queries retry transparently to hide failures. Queries that run for up to an hour are sufficiently reliable, but longer ones may experience too many failures.
- cluster join operator, using a merge-join-like algo, buffer one row from each table and return the joined results for a single Spanner request, as long as all tables fall on a single ancestry path in the table hierarchy. Join between sibling: cluster join + alternative join such as lookup join, which construct the join result piecemeal using bounded-size batches of lookup keys.
- Partitioned Consumers such as MapReduces could use sharded streams of data from the same query in parallel from multiple clients by requesting distributed data retrieval and connecting to and retrieving data from all of the endpoints that F1 returns. However, slowness in one distributed reader would slow other readers as the F1 query produces results for all readers in lock-step.
- Queries with PB valued columns (first class objects)
- Use path expressions to extract individual fields.
- Could process the entire protocol buffers.
- PROTO JOIN, a JOIN variant that joins by the implicit foreign key.
- Join a table with its virtual child table (PB repeated field) by the foreign key
- Filter the resulting combinations by the predicate inside the child table, and return requested field from that child table.
- In the read from the outer relation, retrieve the entire PB column containing the repeated field from Spanner, and then for each outer row, simply enumerate the repeated field instances(parse the contents of the PB fields) in memory and join them to the outer row.
- Support subqueries on repeated fields: each subquery iterates over repeated field values contained inside PB from the current row.
Schema Changes
- Asynchronous (on different servers at different times)
- fully non-blocking
- Batch schema changes in queue
- Enforce all servers at most two different active schemas by granting leases on the schema
- Subdividing schema changes into multiple phases (only delete → only update → read)
Change History
- ChangeBatch PB (the primary key (the associated root table key and the xact commit TS) and before and after values of changed columns for each updated row) written into normal F1 tables as children of each root table.
- A publish-and-subscribe system pushes notifications in root row level and trace updates (and applications do incremental processing). The publish happens in Spanner at least once after any series of changes to any root row. Subscribers remember a checkpoint (i.e. a high-water mark) for each root row and read all newer changes whenever receiving a notification.
(Spanner's TS ordering property guarantee that every change is processed exactly once.)
- Caching: A client that uses an in-memory cache based on db state, distributed across multiple servers, would receive the most recently updates by comparing ChangeHistory records and the commit TS of the last visible write for a given root row key, and also reads ChangeHistory records beyond its checkpoint and applies those changes to its cache if necessary.
Deployment and Results (latency & throughput)
- 5-way Paxos replication, with additional read-only replicas for snapshot reads and segregating OLTP and OLAP workloads.
- Reads latency of 5-10ms, and commit latency of 50-150ms (Paxos acknowledge for two nearest replicas; 2PC for multi-group commits doubles the minimum latency). AdWords web app averages about 200ms.
- For non-interactive app that apply bulk updates, optimize for throughput rather than latency, by structuring small xacts to single Spanner directories and using parallelism. (One app that updates billions of rows per day, run single-directory xacs of up to 500 rows each, and 500 xacts per second.). Protect downstream Change History consumers who cannot process changes fast enough.
- For query processing, focus on functionality and parity, not on absolute query performance. Central queries run in less than 10ms, and some do 10k or 100k queries per second.
Relevance
Future work
- How to make global indexes that uses 2PC more scalable without compromising consistency?
- How to mitigate slowdown of partitioned consumers due to the streaming nature of F1 queries and horizontal dependency caused by frequent hash repartitioning? (Use disk-backed buffering to break the dependency and allow clients to proceed indepedently)
- How to process a query involving in PB field parsing (decoding) and selection efficiently? (Pushing operations into Spanner? Use other indexes in Spanner?)
- Aim at adding checkpointing for some intermediate results of distributed queries executing longer than one hour, without hurting latency in the normal case without failures.
- How to improve CPU efficiency (an order of magnitude more CPU than the MySQL counterpart) of F1 server, with respect to the process of a query including data compressed on disk and go through several layers, decompressing, processing, recompressing, and sending over the network.