Scheduled Emailing of BigQuery Data

Darpan Patel
4 min readMay 21, 2021

Out of all the major public cloud, Google Cloud Platform(GCP) is known for providing same infrastructure and technology that Google itself is running their service on. We have seen significant growth in adaption of Data Analytics, Machine Learning and AI based use cases on the GCP due to performance and out of box solutions that is provided on top of a server-less Data Warehouse product, none other than BigQuery, complemented with GPU/TPU powered VMs.

In this blog post, I am going to describe one simple use of BigQuery data. Often business team or analyst team requires basic reports from the data that is in the Data Warehouse. Or it can be the case where you need to send report to external vendor with the data that you have matched and stored in your BQ table on a regular interval. There are simple steps to do so, right? We can run a query to get the required report, then export the data into CSV and send it via email. But I don’t like much repetitive work and instead prefer to automate the same step so that without me keeping track of time and task in mind, I can spend some time writing such blog! ;)

Let’s look at how you can make use of GCP and natively achieve objective. You may have assumed right solution to automate i.e., to write a code. We will make use of Python BigQuery API which will query data and then create CSV file with the data we got as result of the query. Now, how to send an email? Make use of free or paid version of SendGrid API. Deploy the code in cloud functions with integrated code. Simple enough.

There are 2 approaches that you can take depending upon frequency of sending email or requirement.

  1. In this approach, you define CRON schedule using Cloud Scheduler that publishes to Cloud Pub/Sub topic which in turn will be used by subscriber, Cloud Function in our case to execute code. This will send email at set regular frequency defined in scheduler.
Send email at fixed time

2. In this approach, you want to send email more dynamically based on whenever data is inserted into particular table. Cloud logging logs each and every activity for us. Create filter which identifies as insertion event for a particular table. Then use that filter to create logging router which publishes to Cloud Pub/Sub. In turn Pub/Sub will trigger Cloud Function to execute code.

Send email on data insert job

Below I am providing Python code that you can make use of in your environment the little tweaks. I have imported some extra libraries but you can get rid off them from code. You will require SendGrid’s API key which you can create from their website by signing up. Link of the page where you can create it is — https://app.sendgrid.com/guide/integrate/langs/python

# using SendGrid's Python Library
# https://github.com/sendgrid/sendgrid-python
import os
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail, Attachment, FileContent, FileName,FileType, Disposition, ContentId
import urllib.request as urllib
import base64
import json
from google.cloud import bigquery
import csv
from datetime import date, datetime, timezone, timedelta
from dateutil import parser
import tenacity
client = bigquery.Client()last_mth_table1_sql = """
SELECT * from `<proj-id>.<dataset>.<table-1>`
where Date between PARSE_DATE('%Y%m%d',FORMAT_DATE('%Y%m%d',DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)))
and PARSE_DATE('%Y%m%d',FORMAT_DATE('%Y%m%d',DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)));
"""
last_mth_table2_sql = """
SELECT * from `<proj-id>.<dataset>.<table-2>`
where Date between PARSE_DATE('%Y%m%d',FORMAT_DATE('%Y%m%d',DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)))
and PARSE_DATE('%Y%m%d',FORMAT_DATE('%Y%m%d',DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)));
"""
def main(data,context):#list of emails to whom you want to send an email
to_emails = [
('
bob.user1@xyz.com','user1, Bob'),
('
tom.user2@abc.com', 'user2, Tom')
]

message = Mail(
from_email='youremail@domain.com', #use your from_email address
to_emails=to_emails,
subject='Monthly Reports',
html_content='Hi,<br/><br/> Please see the attached monthly Reports for the so and so.<br/><br/> Thank you')

last_mth_table1_job = client.query(last_mth_table1_sql)
last_mth_table1_rows = last_mth_table1_job.result()

last_mth_table2_job = client.query(last_mth_table2_sql)
last_mth_table2_rows = last_mth_table2_job.result()

with open(f'/tmp/table1.csv','w') as file:
writer = csv.writer(file)
writer.writerow(['col1', 'col2', 'col3', 'coln'])
writer.writerows(last_mth_table1_rows)
file.close()

with open(f'/tmp/table1.csv','rb') as f:
data = f.read()
f.close()
encoded = base64.b64encode(data)
attachment1 = Attachment()
attachment1.file_content = FileContent(encoded.decode())
attachment1.file_type = FileType('text/csv')
attachment1.file_name = FileName('file1'+'.csv')
attachment1.disposition = Disposition('attachment')

with open(f'/tmp/table2.csv','w') as file:
writer = csv.writer(file)
writer.writerow(['col1', 'col2', 'col3', 'coln'])
writer.writerows(last_mth_table2_rows)
file.close()

with open(f'/tmp/table2.csv','rb') as f:
data = f.read()
f.close()
encoded = base64.b64encode(data)
attachment2 = Attachment()
attachment2.file_content = FileContent(encoded.decode())
attachment2.file_type = FileType('text/csv')
attachment2.file_name = FileName('file2'+'.csv')
attachment2.disposition = Disposition('attachment')

message.attachment = [attachment1, attachment2]

try:
sg = SendGridAPIClient(os.environ.get('SENDGRID_API_KEY'))
response = sg.send(message)
print(response.status_code)
print(response.body)
print(response.headers)
except Exception as e:
print(e)

return

requirements.txt file below

sendgrid==6.1.0google-cloud-bigquery==1.22.0DateTime==4.3python-dateutil==2.8.1tenacity==6.0.0

If you want to follow logging router path to identify insertion event in particular table, you can use below filter to create logging router:

resource.type="bigquery_dataset" resource.labels.dataset_id="<your-dataset-name>" protoPayload.resourceName="projects/<your-project-id>/datasets/<your-dataset-name>/tables/<your-table-name>" protoPayload.authenticationInfo.principalEmail="<your-service-account-which-inserted-data>"
protoPayload.methodName="google.cloud.bigquery.v2.JobService.InsertJob"

I have written other blog to automate unzipping of cloud storage file as they arrive here.

If you have DF pipelines and wants to trigger it as source data is available in GCS then you can find it here

I hope this simple piece helps you. Feel free to drop me message on my email — darpan.3073@yahoo.com for any concern or queries!

--

--

Darpan Patel

Technology enthusiast | Cloud and Data Engineer | Turning complex to simpler | Meditator | Curious | Learner | Helper