Building ETL Pipeline with BigQuery and Dataflow

A comprehensive guide to data ETL process using GCP services

Featured image

Image Reference



Overview

Modern data pipelines are essential for organizations to transform raw data into actionable insights. Following our previous post about BigQuery, this guide explores how to build a robust ETL (Extract, Transform, Load) pipeline using Google Cloud Platform’s managed services, focusing on moving data from MongoDB to BigQuery and finally to Google Sheets for analysis and visualization.

This comprehensive tutorial will guide you through building a fully automated data pipeline with the following flow:

MongoDB → BigQuery → Google Sheets
Understanding ETL

ETL (Extract, Transform, Load) is a three-phase process for integrating data from multiple sources into a single data repository:

  • Extract: Retrieving data from source systems (databases, APIs, files)
  • Transform: Converting data to appropriate format with operations like filtering, sorting, aggregating, joining, cleaning, and validation
  • Load: Writing the transformed data to a target destination (data warehouse, database, file)

Cloud-based ETL solutions like the one we're building offer advantages including scalability, reduced maintenance, cost-effectiveness, and integration with other cloud services.



GCP Services Overview

Our ETL pipeline leverages several key Google Cloud Platform services:

Service Role in Pipeline Key Capabilities
Cloud Dataflow Data processing engine Batch/stream processing, auto-scaling, managed infrastructure
BigQuery Data warehouse Serverless storage and analytics, SQL interface, high-performance
Cloud Functions Serverless code execution Event-driven processing, automatic scaling, pay-per-use
Cloud Scheduler Job scheduling Managed cron jobs, reliable triggering, timezone support
Cloud Storage Function code storage Highly durable object storage, global access, integration with other services



Cloud Dataflow Deep Dive

What is Google Cloud Dataflow?

Google Cloud Dataflow is a fully managed service for executing Apache Beam pipelines within the Google Cloud Platform ecosystem. It handles both batch and streaming data processing scenarios with automatic resource management and dynamic work rebalancing.

Dataflow Architecture

When a Dataflow job is launched, the service:

  1. Takes your pipeline code (Apache Beam)
  2. Optimizes it into an execution graph
  3. Splits the workload into multiple steps
  4. Distributes these steps across Compute Engine instances
  5. Manages scaling, fault tolerance, and worker communication

This abstraction allows developers to focus on pipeline logic rather than infrastructure management.

Key Features and Benefits

  1. Unified Programming Model
    • Apache Beam SDK provides a single programming model for both batch and streaming
    • Same code can run on multiple execution engines
    • Supported languages include Java, Python, Go, and more
  2. No-Ops Data Processing
    • Fully managed service with automatic provisioning and scaling
    • Hands-off infrastructure management
    • Dynamic work rebalancing for optimal throughput
    • Built-in monitoring and logging
  3. Advanced Processing Capabilities
    • Native handling of late and out-of-order data
    • Exactly-once processing guarantees
    • Windowing for time-based aggregations
    • Watermarking for tracking event time progress
  4. Performance and Scalability
    • Horizontal scaling to handle varying workloads
    • Fusion optimization to minimize overhead
    • Auto-tuning of worker resources
    • Support for persistent disks and SSDs for performance-sensitive operations
graph TD A[Beam SDK Code] --> B[Dataflow Service] B --> C{Job Execution} C --> D[Worker Pool] D --> E[Worker 1] D --> F[Worker 2] D --> G[Worker N] B <-->|Metrics/Logs| H[Monitoring & Logging] B <-->|Status/Control| I[Job Management]

Sample Dataflow Pipeline Code

Here’s a simplified example of a Python Dataflow pipeline that reads from MongoDB, performs transformations, and writes to BigQuery:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.mongodbio import ReadFromMongoDB
from apache_beam.io.gcp.bigquery import WriteToBigQuery

# Define pipeline options including GCP project and Dataflow runner
options = PipelineOptions([
    '--project=your-gcp-project',
    '--region=us-central1',
    '--runner=DataflowRunner',
    '--temp_location=gs://your-bucket/temp',
    '--staging_location=gs://your-bucket/staging'
])

# Schema for BigQuery table
schema = {
    'fields': [
        {'name': 'id', 'type': 'STRING'},
        {'name': 'name', 'type': 'STRING'},
        {'name': 'value', 'type': 'FLOAT'},
        {'name': 'timestamp', 'type': 'TIMESTAMP'}
    ]
}

# MongoDB connection URI
uri = "mongodb+srv://username:password@cluster.mongodb.net"

# Create and run the pipeline
with beam.Pipeline(options=options) as p:
    (p 
     | 'ReadFromMongoDB' >> ReadFromMongoDB(
         uri=uri,
         db='your_database',
         coll='your_collection')
     | 'TransformData' >> beam.Map(lambda doc: {
         'id': str(doc['_id']),
         'name': doc.get('name', ''),
         'value': float(doc.get('value', 0)),
         'timestamp': doc.get('created_at').timestamp()
     })
     | 'WriteToBigQuery' >> WriteToBigQuery(
         table='your-project:your_dataset.your_table',
         schema=schema,
         create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
         write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
    )



Cloud Functions and Scheduler

Cloud Functions

Google Cloud Functions is a lightweight, event-driven, serverless compute platform that allows you to create single-purpose, stand-alone functions that respond to cloud events without managing the underlying infrastructure.

When to Use Cloud Functions vs. Dataflow

In our ETL pipeline, we're using Cloud Functions for orchestration and simpler data movement tasks, while Dataflow would be better suited for:

  • Processing very large datasets (TB+)
  • Complex data transformations requiring parallel processing
  • Streaming data scenarios with windowing requirements
  • Pipelines that need sophisticated fault tolerance

Cloud Functions is ideal for event-triggered actions, lightweight processing, and gluing together different GCP services, as we're using it in this workflow.

Key Features

Cloud Scheduler

Google Cloud Scheduler is a fully managed enterprise-grade cron job scheduler that allows you to schedule virtually any job, including batch jobs, big data jobs, cloud infrastructure operations, and more.

Key Features

Example Code: MongoDB to BigQuery Function

Here’s a sample Cloud Function that extracts data from MongoDB and loads it into BigQuery:

import os
import pymongo
from google.cloud import bigquery
from datetime import datetime

def mongodb_to_bigquery(request):
    # MongoDB connection
    client = pymongo.MongoClient(os.environ.get('MONGODB_URI'))
    db = client[os.environ.get('MONGODB_DB')]
    collection = db[os.environ.get('MONGODB_COLLECTION')]
    
    # Get data from MongoDB
    cursor = collection.find({})
    
    # Transform data for BigQuery
    rows = []
    for document in cursor:
        # Convert MongoDB ObjectId to string
        document['_id'] = str(document['_id'])
        
        # Convert any datetime objects to string
        for key, value in document.items():
            if isinstance(value, datetime):
                document[key] = value.isoformat()
        
        rows.append(document)
    
    # BigQuery client
    client = bigquery.Client()
    table_id = f"{os.environ.get('BIGQUERY_PROJECT')}.{os.environ.get('BIGQUERY_DATASET')}.{os.environ.get('BIGQUERY_TABLE')}"
    
    # Load data into BigQuery
    errors = client.insert_rows_json(table_id, rows)
    
    if errors:
        return f"Encountered errors: {errors}", 500
    else:
        return f"Successfully loaded {len(rows)} rows to {table_id}", 200

Example Code: BigQuery to Google Sheets Function

Here’s a sample Cloud Function that extracts data from BigQuery and loads it into Google Sheets:

import os
import gspread
from google.cloud import bigquery
from google.oauth2 import service_account
from datetime import datetime

def bigquery_to_sheets(request):
    # BigQuery setup
    bq_client = bigquery.Client()
    
    # Query data from BigQuery
    query = f"""
    SELECT *
    FROM `{os.environ.get('BIGQUERY_PROJECT')}.{os.environ.get('BIGQUERY_DATASET')}.{os.environ.get('BIGQUERY_TABLE')}`
    LIMIT 1000
    """
    
    query_job = bq_client.query(query)
    rows = query_job.result()
    
    # Transform data for Google Sheets
    headers = [field.name for field in query_job.schema]
    data = [headers]
    for row in rows:
        data.append([str(row[field]) for field in headers])
    
    # Google Sheets setup
    credentials = service_account.Credentials.from_service_account_file(
        'bigquery.json',
        scopes=['https://www.googleapis.com/auth/spreadsheets']
    )
    gc = gspread.authorize(credentials)
    
    # Open the spreadsheet and write data
    sheet = gc.open_by_key(os.environ.get('SHEET_ID')).worksheet(os.environ.get('SHEET_NAME'))
    sheet.clear()
    sheet.update('A1', data)
    
    return f"Successfully updated Google Sheet with {len(data)-1} rows of data", 200



Complete ETL Workflow Implementation

Our complete ETL workflow follows this sequence:

flowchart TD Z["Enable API Service"] --> A["Create BigQuery Service Account and Grant Permission"] A --> B["Create Google Sheet and Grant Permission"] B --> C["Create BigQuery Dataset"] C --> D["Create Storage for Cloud-function"] D --> E1["mongodb → bigquery table workflow"] E1 --> F1["Compress Zip file (source code)"] E1 --> G1["Upload Zip file (source code) to bucket"] E1 --> H1["Create cloud function"] E1 --> I1["Create cloud scheduler"] D --> E2["bigquery table → googlesheet workflow"] E2 --> F2["Compress Zip file (source code)"] E2 --> G2["Upload Zip file (source code) to bucket"] E2 --> H2["Create cloud function"] E2 --> I2["Create cloud scheduler"]

Step-by-Step Implementation

1. Enable Required API Services

# Enable required APIs
gcloud services enable dataflow.googleapis.com
gcloud services enable bigquery.googleapis.com
gcloud services enable cloudfunctions.googleapis.com
gcloud services enable cloudscheduler.googleapis.com
gcloud services enable storage.googleapis.com
gcloud services enable sheets.googleapis.com

2. Set Up Service Accounts and Permissions

# Create service account for BigQuery operations
gcloud iam service-accounts create bigquery-sa \
    --display-name="BigQuery Service Account"

# Grant necessary permissions
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:bigquery-sa@$PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/bigquery.dataEditor"

gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:bigquery-sa@$PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/bigquery.jobUser"

# Create and download key for the service account
gcloud iam service-accounts keys create bigquery.json \
    --iam-account=bigquery-sa@$PROJECT_ID.iam.gserviceaccount.com

3. Create BigQuery Dataset and Tables

# Create dataset
bq mk --dataset $PROJECT_ID:analytics_data

# Create table with schema
bq mk --table \
    $PROJECT_ID:analytics_data.mongodb_data \
    _id:STRING,name:STRING,value:FLOAT,timestamp:TIMESTAMP

4. Set Up Cloud Storage for Function Code

# Create buckets for function code
gsutil mb -l us-central1 gs://$PROJECT_ID-functions

5. Prepare and Deploy MongoDB to BigQuery Function

# Create function directory and files
mkdir -p mongodb-to-bigquery
cd mongodb-to-bigquery

# Create requirements.txt
cat > requirements.txt << 'EOF'
pymongo==4.3.3
google-cloud-bigquery==3.9.0
EOF

# Create main.py with the function code
# (Use the MongoDB to BigQuery function code from above)

# Zip the function code
zip -r mongodb-to-bigquery.zip main.py requirements.txt

# Upload to Cloud Storage
gsutil cp mongodb-to-bigquery.zip gs://$PROJECT_ID-functions/

# Deploy the function
gcloud functions deploy mongodb-to-bigquery \
    --runtime python39 \
    --trigger-http \
    --allow-unauthenticated \
    --entry-point=mongodb_to_bigquery \
    --source=gs://$PROJECT_ID-functions/mongodb-to-bigquery.zip \
    --set-env-vars=MONGODB_URI=mongodb+srv://username:password@cluster.mongodb.net,MONGODB_DB=your_database,MONGODB_COLLECTION=your_collection,BIGQUERY_PROJECT=$PROJECT_ID,BIGQUERY_DATASET=analytics_data,BIGQUERY_TABLE=mongodb_data

6. Prepare and Deploy BigQuery to Google Sheets Function

# Create function directory and files
mkdir -p bigquery-to-sheets
cd bigquery-to-sheets

# Create requirements.txt
cat > requirements.txt << 'EOF'
google-cloud-bigquery==3.9.0
gspread==5.7.2
google-auth==2.16.2
EOF

# Copy the service account key
cp ../bigquery.json .

# Create main.py with the function code
# (Use the BigQuery to Google Sheets function code from above)

# Zip the function code
zip -r bigquery-to-sheets.zip main.py requirements.txt bigquery.json

# Upload to Cloud Storage
gsutil cp bigquery-to-sheets.zip gs://$PROJECT_ID-functions/

# Deploy the function
gcloud functions deploy bigquery-to-sheets \
    --runtime python39 \
    --trigger-http \
    --allow-unauthenticated \
    --entry-point=bigquery_to_sheets \
    --source=gs://$PROJECT_ID-functions/bigquery-to-sheets.zip \
    --set-env-vars=BIGQUERY_PROJECT=$PROJECT_ID,BIGQUERY_DATASET=analytics_data,BIGQUERY_TABLE=mongodb_data,SHEET_ID=your-sheet-id,SHEET_NAME=Sheet1

7. Set Up Cloud Scheduler Jobs

# Create scheduler for MongoDB to BigQuery (runs daily at 1 AM)
gcloud scheduler jobs create http mongodb-to-bigquery-job \
    --schedule="0 1 * * *" \
    --uri="https://us-central1-$PROJECT_ID.cloudfunctions.net/mongodb-to-bigquery" \
    --http-method=GET \
    --time-zone="America/New_York"

# Create scheduler for BigQuery to Google Sheets (runs daily at 2 AM)
gcloud scheduler jobs create http bigquery-to-sheets-job \
    --schedule="0 2 * * *" \
    --uri="https://us-central1-$PROJECT_ID.cloudfunctions.net/bigquery-to-sheets" \
    --http-method=GET \
    --time-zone="America/New_York"



Infrastructure as Code (IaC) Implementation

For production environments, it’s recommended to use Infrastructure as Code tools to automate and version your ETL pipeline setup. Below are two approaches using popular IaC tools.

Terraform Implementation

Terraform Benefits for ETL Pipelines

  • Version Control: Track changes to your infrastructure over time
  • Repeatability: Consistently deploy the same environment across stages
  • Modularity: Reuse components across different pipelines
  • Dependency Management: Automatically handle resource dependencies
  • State Management: Track the actual state of resources vs. desired state

The complete implementation is available at: terraform-infra-gcp

somaz-bigquery-project/
├── README.md
├── bigquery-to-google-sheet
│   ├── bigquery.json
│   ├── main.py
│   ├── main.py.all_data
│   ├── main.py.date
│   ├── main.py.single_db
│   ├── main.py.time_UTC
│   └── requirements.txt
├── bigquery.tfvars
├── cloud-storage.tf
├── locals.tf
├── mongodb-bigquery-googlesheet-workflow.tf
├── mongodb-to-bigquery
│   ├── MongoDB_to_BigQuery
│   ├── main.py
│   ├── main.py.local
│   ├── main.py.single_db
│   └── requirement.txt
├── provider.tf
├── terraform-backend.tf
└── variables.tf

Key Terraform resource types used:

Pulumi Implementation

Pulumi offers a more programmatic approach to infrastructure as code, allowing you to use familiar languages like Python:

The complete implementation is available at: pulumi-study

bigdata-flow-functions/
├── Pulumi.yaml
├── README.md
├── __main__.py
├── bigquery-to-google-sheet
│   ├── bigquery.json
│   ├── main.py
│   ├── main.py.all_data
│   ├── main.py.date
│   ├── main.py.single_db
│   ├── main.py.time_UTC
│   └── requirements.txt
├── bq_dataset.py
├── bq_sheet_archive.py
├── bq_sheet_function.py
├── config.py
├── mdb_bq_archive.py
├── mdb_bq_function.py
├── mongodb-to-bigquery
│   ├── main.py
│   ├── main.py.local
│   ├── main.py.single_db
│   └── requirements.txt
├── requirements.txt
├── scheduler_manager.py
├── storage.py
└── utils.py



Advanced Customization and Optimizations

Handling Different Data Types

The provided code examples can be customized for different MongoDB collections and data structures:

Scaling Considerations

As your data volumes grow, consider these optimizations:

  1. BigQuery Partitioning: Partition tables by date for improved query performance and cost
       CREATE TABLE analytics_data.mongodb_data
       PARTITION BY DATE(timestamp)
       AS SELECT * FROM analytics_data.mongodb_data_temp
    
  2. Batch Processing: Process data in manageable chunks
       # Process in batches of 1000 documents
       batch_size = 1000
       total_docs = collection.count_documents({})
       for i in range(0, total_docs, batch_size):
           cursor = collection.find({}).skip(i).limit(batch_size)
           # Process batch
    
  3. Error Handling: Implement robust error handling and retries
       from google.api_core.retry import Retry
          
       @Retry(predicate=Exception, deadline=60.0)
       def insert_with_retry(client, table_id, rows):
           errors = client.insert_rows_json(table_id, rows)
           if errors:
               raise Exception(f"Error inserting rows: {errors}")
           return len(rows)
    
  4. Monitoring: Set up alerts for pipeline failures
       from google.cloud import monitoring_v3
          
       def create_failure_alert(project_id):
           client = monitoring_v3.AlertPolicyServiceClient()
           # Create alert policy configuration
           # ...
    



Best Practices and Lessons Learned

Security Best Practices

  1. Principle of Least Privilege: Grant only required permissions to service accounts
  2. Secret Management: Store sensitive credentials securely using Secret Manager
  3. VPC Service Controls: Implement network security boundaries for sensitive data
  4. Audit Logging: Enable comprehensive logging for all data access

Performance Optimization

  1. BigQuery Best Practices:
    • Use partitioning and clustering
    • Filter on partitioned columns
    • Select only needed columns
    • Use approximate aggregation functions when possible
  2. Cloud Functions Performance:
    • Increase memory allocation for CPU-intensive tasks
    • Keep functions stateless and idempotent
    • Reuse connections to external services
  3. Cost Optimization:
    • Set up budget alerts
    • Use BigQuery flat-rate pricing for predictable workloads
    • Schedule less frequent updates for non-critical data



Next Steps and Future Improvements

This ETL pipeline can be extended in several ways:

  1. Real-time Processing: Implement streaming data ingestion using Pub/Sub and Dataflow
  2. Data Quality Checks: Add validation steps to ensure data integrity
  3. Advanced Analytics: Implement BigQuery ML for predictive analytics
  4. Visualization: Connect to Data Studio or Looker for rich dashboards
  5. CI/CD Pipeline: Set up automated testing and deployment

In future articles, we’ll explore these advanced topics in greater detail.



References