The Microsoft Bing platform has built one of the largest distributed storages for Bing web search data, using its home grown ObjectStore service. The system hosts hundreds of petabyte data and processes hundreds of millions lookups per sec. Open source RocksDB is used as the storage engine. Multiple techniques are applied to efficiently store and process the massive data with sub-second data freshness. This blog will present those techniques and the results in production.
Web Search Data Scenario
In early 2018, the team started the effort to build a new platform based on ObjectStore with RocksDB for all Microsoft Bing web document storage and processing pipelines. The new platform is not just a replacement for the old one which served Bing offline data processing workloads for over 10 years, but also with a massive increase of data volume and improved freshness. It hosts hundreds of petabytes of data and handles 10s of billions of document processing per day. This new platform became the largest RocksDB deployment in Microsoft.
The following diagram shows a simplified architecture of web data platform. The processing of a document starts in Process Controller when it picks which documents should currently be processed. The first interaction is with Crawler where a subset of these documents is recrawled to get the latest content. Then documents are assigned to a Document Processing node which first reads the prior state of the document from Web Data table. Then it generates the in-index representation of the document and updates processing metadata to help in future reprocessing. All the newly created data is stored back into Web Data table. Index Building service then reads the in-index version of document from Web Data table and creates a merged index that is served to users. Injections (of offline generated data) are written directly to the Web Data table and picked up by Index Building service to add directly into merged index. Messaging Service is used to send information between different documents (e.g. Anchor text collection from source page to target page). Lookups of any document state/data are possible from Web Data table at any time and are used in web partner pipelines and for debugging.
At the same time, it does not mean we should use different machines for deploying two types of services, instead, physically co-hosting CPU-intensive computing services with IO-intensive table services can greatly improve resource utilization. Networking costs for transferring data between storage and compute can be also reduced with affinitization.
The following illustration shows cohosted Document Process Server and ObjectStore Table Server. Coprocessors are user-defined functions that are hosted and executed by Table Server, which allows running user code close to data. In the design, some light-weighted processing logics are executed in Coprocessors, like close-to-data filtering to find candidate documents to process.
Column store supports up to 64k of predefined columns with Bond as schematization provider.
Web documents can be classified as relatively hot or cold by update frequency, then stored into different column groups correspondingly. Each column group maps to a separated LSM-tree with a path assigned either on SSD or HDD.
Different from separating hot and cold data by multiple tables, single table with multiple column groups supports in row transaction across storages naturally, which allows documents switching between hot and cold storage easily. Also, batch accessing multiple column data for a document across storages is much cheaper.
Here is a Column Store performance comparison of sequential scan on HDD without cache. Tests were done after ingested 500K rows, and each row has 2 columns with 100KB average size. Test compares read throughputs by:
Using column group prevents reading unnecessary data and provides better performance.
Here is the illustration for RocksDB stack:
There are 3 major techniques to improve performance: JBOD, applying per storage compaction limiter and using disk prioritization management.
JBOD supports on RocksDB is done by overriding file system APIs from JBOD environment wrapper. The performance benefits on HDD are significant as IOPS is the bottleneck of HDD for most scenarios, by using JBOD, IOPS for individual disk can be used separately.
In a RocksDB random overwrite test with 3 HDDs, JBOD got more than 2x write throughputs compared with RAID0, and the later test using 9 HDDs JBOD shows almost linear improvements along with number of disks.
Column group is built on top of RocksDB column family (CF), and all the CFs for the same DB instance share components like WAL, flush, compaction schedulers and thread pool, etc. After we extend RocksDB to support configure CFs with different working paths across storages, we found a performance issue that SSD and HDD didn’t work well together, slow HDD compactions exhaust the compaction thread pool and block SSD compactions.
Enlarging thread pool is not an option, as HDD becomes much less performant with high concurrent compactions bounded by IOPS. Compaction limiter is designed as a sharable object across CF, controls the outstanding compaction tasks. Global control for max outstanding compaction tasks on individual drive is done by sharing per-drive compaction limiter across CFs of all DB instances. Comparing with uncontrolled mix compactions, a fine-tuned compaction limiter gave us 80% combined ingestion throughput gain. This perf number was measured with 3 HDDs as JBOD and 2 SSDs as RAID0.
Disk IO manager maintenances IO task as multi-level feedback queue with FCFS (first come first serve) as the algorithm for each level, and IO requests are pushed into the queue if predefined max outstanding IO or outstanding bytes are reached. When IO task completes, the next task is picked from highest priority queue by FCFS followed by lower priority queues until outstanding task limit is reached again. To prevent starvation for lower priority queue, the task will be promoted to higher queue after a certain waiting time.
There are also tremendous improvements done at data replication protocol. We will talk about them in the next blog post.
- Burton (Pu) Li, Jason (Zengzhong) Li, Max Sigalov, Dafan Liu, Knut Magne Risvik
The following diagram shows a simplified architecture of web data platform. The processing of a document starts in Process Controller when it picks which documents should currently be processed. The first interaction is with Crawler where a subset of these documents is recrawled to get the latest content. Then documents are assigned to a Document Processing node which first reads the prior state of the document from Web Data table. Then it generates the in-index representation of the document and updates processing metadata to help in future reprocessing. All the newly created data is stored back into Web Data table. Index Building service then reads the in-index version of document from Web Data table and creates a merged index that is served to users. Injections (of offline generated data) are written directly to the Web Data table and picked up by Index Building service to add directly into merged index. Messaging Service is used to send information between different documents (e.g. Anchor text collection from source page to target page). Lookups of any document state/data are possible from Web Data table at any time and are used in web partner pipelines and for debugging.
Compute and Storage Fabric
The capability of growing computing power and storage independently is highly demanded. Also having separate development and release cycles between computing and storage services are critical, hence the decoupling of storage and computing becomes the key design principle.At the same time, it does not mean we should use different machines for deploying two types of services, instead, physically co-hosting CPU-intensive computing services with IO-intensive table services can greatly improve resource utilization. Networking costs for transferring data between storage and compute can be also reduced with affinitization.
The following illustration shows cohosted Document Process Server and ObjectStore Table Server. Coprocessors are user-defined functions that are hosted and executed by Table Server, which allows running user code close to data. In the design, some light-weighted processing logics are executed in Coprocessors, like close-to-data filtering to find candidate documents to process.
Data Model
When we prototyped the web data project, a series of tables came up naturally to present lifespan of web documents, from crawled raw documents to processed documents. Some common data from different tables are duplicated, so to achieve consistency under concurrent data processing tasks, transaction support across tables is required. We settled on a Bigtable-like data model by joining all sub-tables by primary key as column groups (will be explained later), which could be used by many applications and pipelines. It improved resource efficiency by avoiding redundant data, and reduced management costs by using fewer tables. Also, when doing batch update on multiple datasets for a document, it uses cheap in-row transaction instead of distributed transactions across tables.Column Store
Web Data Table is persisted and served as Column Store, which is a distributed NoSQL database built on RocksDB supporting column-oriented operations. It provides an efficient way of accessing a portion of the data within a record and guarantees atomicity when multiple columns are updated or read across column groups. Partitioning is done by splitting ranges of URL hash. Logical storage hierarchy is shown as below:Column store supports up to 64k of predefined columns with Bond as schematization provider.
- Column family (different from column family of RocksDB’s terminology, term CF will be used for RocksDB column family in this document to avoid ambiguity) is a special column that includes a set of sub-columns indexed by a string key (e.g.: K1, K2, K3…). The number of sub-columns is arbitrary and can be different for different records. It is commonly used for storing various number of same typed facets for a record.
- Column group is set of columns with the same data locality.
Hot and cold data separation using column groups
Distributed storage engines are often optimized for high QPS and low latency or high throughputs and low-cost. Most popular engines can be configured towards either one of the characteristics. However, there are two major challenges to support web documents processing:- Hot documents can be refreshed in seconds, but the cold ones can be still for years, also the characteristics for the same documents can change dramatically over time.
- System cost is impossible to ignore when solving problems at web-scale.
Web documents can be classified as relatively hot or cold by update frequency, then stored into different column groups correspondingly. Each column group maps to a separated LSM-tree with a path assigned either on SSD or HDD.
Different from separating hot and cold data by multiple tables, single table with multiple column groups supports in row transaction across storages naturally, which allows documents switching between hot and cold storage easily. Also, batch accessing multiple column data for a document across storages is much cheaper.
Columnar processing
A major portion of document processing is done sequentially, which requires enormous range scans on some column data. Joining columns frequently accessed together as a separated column group enables efficient columnar processing.Here is a Column Store performance comparison of sequential scan on HDD without cache. Tests were done after ingested 500K rows, and each row has 2 columns with 100KB average size. Test compares read throughputs by:
- Read one of two columns by filter on the column Id.
- Assigning columns to two column groups and read on one column group.
Column Filter | Column Group | |
Sequential Scan | 15.6 MB/s | 22.0 MB/s |
Column Groups in RocksDB
Column groups are mapped to RocksDB’s CFs, which physically splits database into separated LSM-trees, at the same time atomic accesses are supported across CFs.Here is the illustration for RocksDB stack:
There are 3 major techniques to improve performance: JBOD, applying per storage compaction limiter and using disk prioritization management.
JBOD on RocksDB
JBOD (stands for Just-a-Bunch-Of-Disks) is one of the fundamental technologies we picked for Web Data table. Compared with RAID0, it gives the ability for software stack to access all disks independently. While RAID0 has lower development costs, throughput and IOPS capacity are much less than JBOD.JBOD supports on RocksDB is done by overriding file system APIs from JBOD environment wrapper. The performance benefits on HDD are significant as IOPS is the bottleneck of HDD for most scenarios, by using JBOD, IOPS for individual disk can be used separately.
In a RocksDB random overwrite test with 3 HDDs, JBOD got more than 2x write throughputs compared with RAID0, and the later test using 9 HDDs JBOD shows almost linear improvements along with number of disks.
3 HDDs RAID0 | 3 HDDs JBOD | 9 HDDs JBOD | |
Write throughputs | 40 MB/s | 100 MB/s | 285 MB/s |
Compaction Concurrent Limiter
RocksDB ingestion speed is limited by background compaction throughput. Writes will be throttled eventually if compaction cannot catch up with user write.Column group is built on top of RocksDB column family (CF), and all the CFs for the same DB instance share components like WAL, flush, compaction schedulers and thread pool, etc. After we extend RocksDB to support configure CFs with different working paths across storages, we found a performance issue that SSD and HDD didn’t work well together, slow HDD compactions exhaust the compaction thread pool and block SSD compactions.
Enlarging thread pool is not an option, as HDD becomes much less performant with high concurrent compactions bounded by IOPS. Compaction limiter is designed as a sharable object across CF, controls the outstanding compaction tasks. Global control for max outstanding compaction tasks on individual drive is done by sharing per-drive compaction limiter across CFs of all DB instances. Comparing with uncontrolled mix compactions, a fine-tuned compaction limiter gave us 80% combined ingestion throughput gain. This perf number was measured with 3 HDDs as JBOD and 2 SSDs as RAID0.
Disk Prioritization Management
To achieve better serving performance and IO utilization, Disk IO manager is introduced as a common layer on top of file system API, and it controls and prioritizes IO based on current outstanding requests. For example, RocksDB compaction writes yield to flush write, compaction reads yield to user read when disk is busy. Disk reads and writes for remote data recovery yield to all the other types of IO, and they are marked as the lowest priority.Disk IO manager maintenances IO task as multi-level feedback queue with FCFS (first come first serve) as the algorithm for each level, and IO requests are pushed into the queue if predefined max outstanding IO or outstanding bytes are reached. When IO task completes, the next task is picked from highest priority queue by FCFS followed by lower priority queues until outstanding task limit is reached again. To prevent starvation for lower priority queue, the task will be promoted to higher queue after a certain waiting time.
Summary
In this blog post, we introduced RocksDB usage in Microsoft Bing web data scenario which to our knowledge is one of the largest windows RocksDB deployment in industry. To meet the scalability and agility requirements, we built the storage fabric independent of computing fabric on top of ObjectStore table service. Several key techniques are applied, including column-based processing, hot-cold data tiering, JBOD, separate compaction filter between HDD and SSD, and disk prioritization, to make the system efficient and performant.There are also tremendous improvements done at data replication protocol. We will talk about them in the next blog post.
- Burton (Pu) Li, Jason (Zengzhong) Li, Max Sigalov, Dafan Liu, Knut Magne Risvik