Trigger Cloud Data Fusion job using Cloud Functions | Automated execution of CDAP pipeline on Google Cloud

Darpan Patel
4 min readMar 26, 2021

Over the last few years, there has been significant growth in data-based applications and use cases on Google Cloud Platform, and this growth is likely to continue. Google Cloud is one of the Big 3 cloud services, and its ever-expanding set of tools and solutions make it a top choice for most enterprises. Google Cloud BigQuery was also recently named a leader in Cloud Data Warehouse for 2021, Q1 in the Forrester’s report. You can get full report here.

In this blog, I will take you through the process of building an automation pipeline for your Data Warehousing solution on Google Cloud Platform.

Now without further ado, lets get on to the solution for triggering data fusion pipeline using cloud function. I will try to cover as much as I can in relation to the scenario I will describe below and every component is services offered by GCP.

It is very common scenario that file/data lands into landing area — typically bucket in cloud, Google Cloud Storage on GCP. We need to process this data by applying some transformation to it and load it to warehouse to be able to use it for insights or visualization. We have data pipelines already built in data fusion for series of transformation and loading it to warehouse. Instead of batch scheduling pipeline we want it to be bit more dynamic. Pipeline should sense that source data has arrived in bucket and that should trigger execution of job/pipeline. This will make it more automated and resilient. Where you have batch scheduled pipeline and for some reason, file arrives later than the fixed time, then it will add some manual work to load and process data. In cases where multiple workflows exist, this solution can save organizations considerable time.

At a high level, this solution looks like the below diagram. A file lands into the Google Cloud Storage bucket, which in turn triggers Cloud functions to execute code written in Python for invoking the Data Fusion pipeline using an API call.

automated solution

For those who are less familiar with Google Cloud services, Cloud Data Fusion is service offered on gcp for building data integration pipelines using code-free web UI without having to manage infrastructure. In short, it is an ETL tool similar to informatica but with the scale of Google Cloud infrastructure and no management burden of infrastructure. Cloud Data Fusion is supported by open source project CDAP and so it eliminates the vendor lock-in problem as well. Learn more about it on this link.

Cloud Functions is a server-less, event based compute platform which allows you to run code. It can support several languages but, for this particular solution, we will use python.

Before we proceed to the code, there’s a few things you will need to do:

  • create a service account that will need to be attached to cloud function and assign it below role
roles/datafusion.admin
  • create VPC connector and attach it to VPC connector section while creating cloud function because by default function can send request to internet but not to resources within VPC.
  • Create cloud function in the same region as you datafusion instance
  • Make sure trigger is set to bucket where file will be dropped and event is “finalize/create” if job needs to be triggered when file arrived
import requests
import os
METADATA_URL = 'http://metadata.google.internal/computeMetadata/v1/'
METADATA_HEADERS = {'Metadata-Flavor':'Google'}
#'default' value takes SA attached to function for access token
SERVICE_ACCOUNT = 'default'
def get_access_token(): url = '{}instance/service-accounts/{}/token'.format(METADATA_URL, SERVICE_ACCOUNT) # Request an access token from the metadata server.
r = requests.get(url, headers=METADATA_HEADERS)
print("Getting token")
print(r.raise_for_status())
# Extract the access token from the response.
access_token = r.json()['access_token']

return access_token

#below function is for getting pipeline name in case of multiple #folders in the buckets and needs to trigger particular pipeline
def getWorkFlowName(path): head_tail = os.path.split(path)
print(head_tail)
if head_tail[0] == "file-name":
return "pipeline-name-to-trigger"
elif head_tail[0] == "another-file-name":
return "Some-Other-Workflow-name"
else:
raise Exception("ERROR: No matching Workflow found for file:{0}".format(path))
def main(event, context):"""Triggered by a change to a Cloud Storage bucket.Args:event (dict): Event payload.context (google.cloud.functions.Context): Metadata for the event.""" file = event
print(f"Processing file: {file['name']}.")
token = get_access_token()
print(token)
#below line takes file name from folder path and pass it as arg
#to flow in datafusion
param = {"file_name" : os.path.split(file['name'])[1]}
print(param)
WKFLOW_NAME = getWorkFlowName(file['name'])
CDAP_ENDPOINT = "cdap-endpoint-link"
PIPELINE_URL = CDAP_ENDPOINT + "/v3/namespaces/default/apps/{}/workflows/DataPipelineWorkflow/start".format(WKFLOW_NAME) r = requests.post(PIPELINE_URL, json=param, headers={"Authorization":"Bearer {}".format(token)}) print(r.raise_for_status())

To get CDAP endpoint URL, run below commands in cloud shell.

export INSTANCE_ID = <datafusion-instance-name>export CDAP_ENDPOINT = $(gcloud beta data-fusion instances describe\
--location=<your-region> --format="value(apiEndpoint)"\
${INSTANCE_ID})

You will also need to use argument in your datafusion pipeline to provide the file name that needs to be processed. In above code, you are providing exactly that in parameter while making request to API. Pasting snippet from my data fusion pipeline to show how it accepts parameter in args and how you can use it in plugin.

I hope this helps you. Feel free to drop me message on my email — darpan.3073@yahoo.com for any concern or queries!

--

--

Darpan Patel

Technology enthusiast | Cloud and Data Engineer | Turning complex to simpler | Meditator | Curious | Learner | Helper