Loading and transforming data to BigQuery at large scale

Adrian Witas
7 min readMar 4, 2020

Data Lake Platform

At Viant, the cloud ad server stack produces 1.7 M transaction log files, resulting in 70 billion records taking up with 80 TB of data daily,
where each server rotates log files every 3 minutes and uploads them to Cloud Storage.

Our goal was to ingest all data log under a few minutes to 100+ tables in a timely and cost-effective manner, with the ability to process incrementally incoming data every 15 min by various ETL processes. Since we collect and analyze raw logs, we use the time range decorators extensively, to incrementally aggregate and transform incoming data.

BigQuery ecosystem

BigQuery is a cloud-managed data warehouse allowing processing data with unparalleled performance. It provides two native API to insert data:

Load Data is free of charge service for flat-rate pricing projects, allowing 1000 load jobs per destination table per day, which at most aggressive scenarios gives the capability to load data every ~87 sec before reaching daily quota. This API supports compression and various data formats like CSV, JSON, AVRO, PARQUET, and ORC.

Streaming API allows you to stream one record at a time providing near real-time data availability. In practice, once API call finishes, the data goes to a streaming buffer, which is instantly available for the query engine. At some point, extraction workers collect data from streaming buffer to commit to the destination table. Using streaming API incurs additional charges.

On top of these BigQuery APIs, Google Cloud Platform offers a few managed services supporting data loading and synchronization:

BigQuery Data Transfer service allows scheduling recurring data loads from various data sources, including Google Cloud Storage, AWS S3, and many Software as a Service application such as Google Ads, Amazon Redshift, YouTube.

Google Cloud Dataflow is a cloud-managed data processing service for both batch and real-time data streaming applications. It allows you to clean, filter, and enrich data before it gets written to BigQuery. It utilizes Apache Beam pipelines to write ETL workflows.

Expectation

Without saying, the data ingestion process has to be transparent, robust, and reliable . Transparency addresses visibility concern that at any given time current, past, and scheduled jobs per destination table can be reported, including an end to end ingestion delay. Besides that, the data pumping process has to load every single data file unless if there is a problem with data corruption or incompatible schema. For example, if load jobs insert data from 2K files, but a few files are invalid, they have to be excluded automatically from the batch and reported in either corrupted or incompatible schema category. On top of that, any errors need to be classified as recoverable and non-recoverable to be handled gracefully. For example, 503 Service Unavailable requires retries, but 500 Internal Error may require restarting the whole ingestion process.

Initially, we hoped to use using Transfer service, and it becomes quickly apparent that scheduling restriction (once a day) and other limitations makes it not a good fit for our use case.

Cloud Dataflow would not address our case either since we would have to redesign our application to use Google Pub/Sub with the Apache Beam streaming pipeline. On top of that, a replaying past log would be problematic, not to mention costly price tags with the data amount in question.

What we have left with was to start exploring the direct BigQuery data pumping API. Streaming API looked like a perfect fit, besides high cost, we quickly hit another snag. Since our post-processing jobs relied on time range decorators, streamed data was not visible till streaming buffer extraction took place, which in some cases was up to 90 minutes.

Eventually, we have left with only one option: to use load Data API.

Hacking

Initially, we built a small ETL framework in Go Lang that converted our flat JSON log files stored in AWS S3 into rich AVRO destination format, distributing data dynamically to various tables based on data content criteria. While we develop it relatively quickly, maintaining and troubleshooting, in some cases, posed a severe challenge over a year-long period. We could not answer what the end to end data ingestion delay was at any given time. What worst in case of BigQuery load jobs capacity reduction or other underlying internal Cloud infrastructures issues, we were unable to pin it down quickly, resulting in numerous retries and additional cost. Finally, it was not cheap to run either, even with preemptive instances bill was up to $8K per day for ETL cluster alone.

Serverless

Last year we became excited about event-driven serverless architecture.
No more infrastructure management and scaling issues, just handling cloud event. Building a rule driven data ingestion POC with Cloud Functions triggered by Google Storage and Stack Driver based BigQuery event, convinced us that serverless is a way to go.

Basic ingestion rule example

Before going into production, we still had to add extra functionality like batching incoming data files, applying various post-load processing transformation, with tons of e2e testing. End to end testing allowed us to address many challenges down the line, and refactor the loader and transformer numerous time, enhancing serverless idempotency. Over time we also decided to drop reliance on Stack Driver notification and rewrite event dispatching all together to control workload distribution better.

Ingestion with post processing rule example

Here comes surprise!

One of the unexpected outcomes of the development process was the realization of the sheer benefits of using a temporary table and ingestion dedicated project(s). Instead of loading data directly to the destination table, you would first load it to a temp table followed by a free of charge copy operation or sequence of SQL based transformation all way down to the destination table.

This approach not only reduces 100K job quota limits on destination projects, but it allows fine-grain ingestion workload control across the various projects with just one deployment instance. Besides, we were also able to identify intermittent load jobs capacity reduction. Some of the load jobs with average execution under a minute were suddenly taking up to 4 hours to complete. Since our application creates data logs uniformly across the day, you would expect the same load characteristic all the time, especially that end users would never run any custom SQL on ingestion dedicated project.

After raising the issue with BigQuery team, we found that load jobs with flat-rate pricing model use ‘default-pipeline’ reservations, which is governed by fair scheduler allocating resources among competing load jobs across various projects. To guarantee ingestion speed for critical data, we started using the project with slot reservations in case of capacity reduction within default-pipeline.

Ingestion rule with project load balancer rule example.

More Transformation

One massive benefit of serverless is event-driven nature. Instead of using the scheduler to pull data and detect increments, the loader is at the beginning of the data processing pipeline. Once data arrived at the temporary table, it makes perfect sense to apply any required data and transformation and enrichment.

Side input & transformation ingestion rule example

Stand-alone loader & transformer

While using serverless loader and transformer is our main production usage, we’ve also developed stand-alone CLI sharing the same execution path with serverless loader & transformer to handle storage events. Not only does it allow to test ingestion rule locally or run one-offs, but it also can be used to stream data from any file system implementing Abstract File Storage interface.

Streaming CLI call example

Serverless efficiency

We developed a BigQuery loader and transformer with Go Lang and serverless architecture. To review our initial goal, not only have we been able to ingest our logs under 6 min at 99 percentile, but also we reduce ingestion costs from $8K to $15 per day. Bringing the annual ingestion cost of $2.9M to $5.5K. Finally, we centralized and migrated all our ingestion pipelines increasing efficiency, reliability, transparency, and visibility.

Daily serverless loader & transformer cost break down

BqTail project

Having faced any challenges discussed in this article makes it a perfect place to start using and BqTail loader and transformer.

BqTail is an open-source project, and contributors are welcome!

--

--