How to move daily data from Google Cloud Storage to BigQuery using Matillion ETL and Cloud Run

Photo by Cris Ovalle on Unsplash

After several months working on a final approach to accomplish this task, I finally found one to do this in an efficient and fast way.

But, first, let me start with the requirements we got from one of our companies at Intercorp.

We are heavy users of Google Cloud Platform here, so as a Data Engineer; I have the pleasure to work with several of these companies to move their data to Google Cloud entirely.

So the requirements we got for this particular solution were the following ones:

  • The data will arrive to a Google Cloud Storage bucket daily
  • We have to apply some transformations to this data and then upload to their corresponding tables in BigQuery

In order to accomplish this task, we are using several services at the same time:

  • Google Cloud Storage for daily records storage
  • BigQuery for data analytics
  • Cloud Run for a service API to get the last file in the bucket folder of Google Cloud Storage
  • Matillion for BigQuery for the data transformation

I will divide this post in two parts:

  • Part I: Cloud Run API to get the last file of Google Cloud Storage bucket
  • Part II: Moving the data from Google Cloud Storage to BigQuery with their respective transformations

Part I: Cloud Run API to get the last file of Google Cloud Storage bucket

But you should be wondering why I’m talking about Cloud Run here. Let 's dive into why.

If you don’t know what Cloud Run is, I will give you a quick overview here:

It’s a platform to deploy entire applications on top of containers in a serverless faction.

The official description of the service by the Google Cloud team is the following:

Cloud Run is a managed compute platform that enables you to run stateless containers that are invocable via web requests or Pub/Sub events. Cloud Run is serverless: it abstracts away all infrastructure management, so you can focus on what matters most — building great applications.

I have to say it: Cloud Run is one of my favorite services from the entire Google Cloud ecosystem, and I use it almost daily to build fast REST-based APIs on top of Python, Flask and Firestore.

But, let’s stop the chat-chat and let’s dive in the code.

In order to deploy a Cloud Run application on Google Cloud, first, you need a Google Cloud account, a Google Cloud project, Python and the Google Cloud SDK.

Let’s start with the Google Cloud SDK:

  • Download the SDK from this link and install it in your computer
  • If everything is OK, you can test it with the following command: gcloud --version

It should get something like this:

gcloud --version command | a-data-driven-guy.com

Then, you need to install Python:

C:\Python64\python.exe --version 

Then, it’s time to code the “last file API” in Python. We will need three files for this:

  • app.py: the code of the Flask REST app
  • Dockerfile: the Docker file for the image to be built
  • sa.json: the Service Account file
  • requirements.txt: the dependencies file

In order to start coding, you will some libraries first:

C:\Python64\python.exe -m pip flask gunicorn google-cloud-storage

The code for app.py is the following one:

import logging
from flask import Flask
from flask import request, jsonify, render_template
# from firebase_admin import credentials, firestore, initialize_app

from google.oauth2 import service_account
from google.cloud import storage
from bson.json_util import dumps

app = Flask(__name__)
storage_client = storage.Client.from_service_account_json('sa.json')

@app.route('/')
# API Version 1.0
def index():
   """Welcome to Last File API Version 1.0."""
   button_text = "Get Last File"
   return render_template('main.html', button_text=button_text)

@app.route("/last_file_m_individual/", methods=["GET"])
def list_m_individual_files():
    """List all files in GCP bucket."""
    bucketName = request.args.get('bucketname')
    bucketFolder = request.args.get('bucketfolder')
    bucket = storage_client.get_bucket(bucketName)
    files = bucket.list_blobs(prefix=bucketFolder)
    fileList = [file.name for file in files if '.' in file.name]
    last_file_pep = fileList[-1]
    last_file_p = last_file_pep.split("/")
    last_file = last_file_p[-1]
    return last_file
    
@app.errorhandler(500)
def server_error(e):
   logging.exception('An error occurred during a request.')
   return 'An internal error occurred.', 500
   
if __name__ == "__main__":
   app.run(debug=True,host='0.0.0.0',port=int(os.environ.get('PORT', 8080)))

Then the Dockerfile file:

# Use the official lightweight Python image.
# https://hub.docker.com/_/python
FROM python:3.8-slim

# Copy local code to the container image.
COPY . ./

RUN pip install -r requirements.txt

ENV PYTHONUNBUFFERED True

# Run the web service on container startup. Here we use the gunicorn
# webserver, with one worker process and 8 threads.
# For environments with multiple CPU cores, increase the number of workers
# to be equal to the cores available.
#CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 app:app
CMD gunicorn --bind :$PORT app:app

And lastly, the requirements.txt file:

Flask
gunicorn
pymongo
google-cloud-storage

Then, you will need a Service Account here (the sa.json file).

This Service Account must have the permissions to read the files in the bucket you will be querying from this API.

After this, you have to enable the Cloud Run API in the Google Cloud project you are using. In order to do that, you have to do the following:

Cloud Run API | a-data-driven-guy.com

Then, it’s time for enabling the API:

Cloud Run API enabling | a-data-driven-guy.com

If everything is OK, then you need to deploy your API to Cloud Run.

To do that, we will use three commands:

gcloud config set project project-name

This will change to the Google Cloud project we are using for this.

Then, you need to build the Docker image with this command:

gcloud builds submit --tag gcr.io/project-name/service-name

This command will build the Docker image inside the Google Cloud infrastructure with all the requirements.

If everything is OK, it’s time to deploy the service:

gcloud run deploy service-name --image gcr.io/project-name/service-name --platform managed --region us-central1 --allow-unauthenticated

IMPORTANT: This command will deploy the application in an authenticated mode. This is recommended for development only. In order to deploy to production environments, it’s better to deploy with an authentication token; but this will be covered in another post.

If everything is OK, you can visit: https://console.cloud.google.com/run and see if your service is deployed there.

Cloud Run will generate a random domain name for your app using the following way: https://service-name-randomlettersnumbers-region-.a.run.app

Then, to test the API, you can use this code:

import requests
 
url = 'https://service-name-randomlettersnumbers-region-.a.run.app/last_file/'
bucketname = 'bucketnameaa'
bucketfolder = 'bucketfolderaa'
url_for = '{0}?bucketname={1}&bucketfolder={2}'.format(url, bucketname, bucketfolder)
r = requests.get(url_for)
result = r.text
print(result)

The API must return the last file in that bucket and in that particular folder of the bucket.

Now, let’s actually move the data from Google Cloud Storage to BigQuery

Part II: Move the daily data from Google Cloud Storage to BigQuery with Matillion

If you don’t know what is Matillion, here is a quick description:

Matillion ETL is an ETL/ELT tool built specifically for cloud database platforms including Amazon Redshift, Google BigQuery, Snowflake and Azure Synapse. It is a modern, browser-based UI, with powerful, push-down ETL/ELT functionality. With a fast setup, you are up and running in minutes

You can deploy Matillion on Google Cloud using the Google Cloud Marketplace:

Matillion in the Google Cloud Marketplace | a-data-driven-guy.com

In Matillion, there are two types of jobs: the orchestration job and the transformation job.

In this case, we will use an Orchestration job to move the data from Google Cloud Storage to BigQuery.

In order to do that, inside Matillion and create a new orchestration job. I will let an example here:

An orchestration job inside Matillion | a-data-driven-guy.com

The main components here are two:

  • Python script
  • Google Cloud Storage Load

The Python script component will query the API we built in order to get the last file of a folder inside the designated bucket, and it will fulfill a variable inside the Matillion environment.

Then, that variable will be used as a parameter for the Cloud Storage Load component.

Here’s the example of the Python Script component:Then, inside the Cloud Storage Load component, you need to use the variable prm_last_file_iden_party to get the last file, and upload it to the table in BigQuery.

Python Script component | a-data-driven-guy.com

Then, inside the Cloud Storage Load component, you need to use the variable prm_last_file_iden_party to get the last file, and upload it to the table in BigQuery.

Google Cloud Storage Load component | a-data-driven-guy.com

The key here is the Google Storage URL location. It must have the following format:

gs://bucketname/bucketfolder/$variablename

With this format, you will get the last file in the bucket folder, and upload it to the BigQuery table.

And voilá, in that way you can upload daily data from Google Cloud Storage to BigQuery using Matillion.

If you have any assistance with this, just send me an email and I will try to help you with it.

Image for Pinterest