With a better mastery of Cloud Functions, you can trigger a Dataprep job via API when a file lands in a Cloud Storage bucket
Ever dreamt about automating your entire data pipeline to load your data warehouse? Without automation each user needs to manually upload its data, then manually start a transformation job or wait for a scheduled task to be executed at a specific time. This is quite tedious and resources intensive.
After reading this article, you will be able to drag and drop a file in a folder, get your entire data pipeline executed and loaded in your data warehouse, and have up-to-date data in your reports and dashboards with a few simple clicks.
To illustrate this concept, we will take the lens of a retail store that wishes to update its database with new customers’ contact information and customer occupation on a daily basis. By manually uploading the new customer file into a folder at the end of each day, we aim for the data to be automatically updated from a Google Sheet that is maintained at a regional level.
This article is a step-by-step guide that will walk you through the process of triggering a Cloud Dataprep job when a new file appears in a Cloud Storage folder. This is made possible through Cloud Functions monitoring a file arrival and calling Cloud Dataprep API to start a specific job from a Dataprep flow.
1. Getting Started
To help you apply the process described below, we have provided you with all the resources you need to try it out. Here on Github, you will find the following:
- The source files: Customers.csv and the Google Sheet with the enriched data
- The Cloud Dataprep recipe that you can import as a new flow
- The Python code for the Cloud Function
All you need to give it a shot is a valid Google account and access to Cloud Dataprep, Cloud Functions, and BigQuery.
2. Create a Cloud Dataprep flow with a Dataset as a Parameter
First, you need to create a Cloud Dataprep flow that accepts a source dataset defined as a parameter. The source dataset name—defined as a variable—will be resolved at runtime via an Application Programming Interface (API) call from Cloud Functions as a parameter. You can define a source dataset as a variable/parameter when you create the source dataset or replace a source in an existing Dataprep flow.
In this existing dataflow, wherein we aim to combine a Google Sheet with a CSV file, we want to replace the CSV source file as a variable.
In the flow view, you must select the particular folder that you want to monitor and define the name of the file as a variable. You will access this page when you import a new dataset and click on the “Create Dataset with Parameters” button. You will likewise find it in the flow view by selecting the source dataset and in the menu “Replace with Dataset with Parameters”.
You need to click the dataset name and select “Add Variable” for the source dataset.
In this example, the Cloud Storage folder is /landingzone. The variable is named FileName ; this is also the parameter that will be called via API.
In this example, the Dataprep flow contains only one parameter. However, you can have multiple parameters for inputs, outputs, or even defined with a data transformation recipe.
You can select the Parameter tab in the flow view to see and edit all the flow parameters.
3. Configuring a Cloud Dataprep Recipe to be called via API
We want to trigger the “Customers Exceptions” recipe each time a file is uploaded on the landingzone folder. As such, we need to enable the Cloud Dataprep REST API for this particular recipe.
Thus, you must select the corresponding recipe and note the recipe ID provided in the recipe URL.
The Cloud Dataprep API documentation can be found here. This is where we need to use the /v4/jobgroups endpoint with a POST request type.
In our case, the recipe ID is 1427932. The Request Body for our API call will appear like this:
“value”: override value (string)
} ]} }}
The next thing you need is an Access Token for the Dataprep user who will make the API call. This will ensure that the user is authorized and authenticated to make the API call. To do this, go to the user settings for Cloud Dataprep and generate a new token if have not already done so.
NOTE: You need to be the Google Project owner to be able to generate the Access Token. You can use the account of the project owner or a personal gmail account when trying it out.
4. Create the Cloud Function to monitor the file arrival and call the Cloud Dataprep API
We can now create the Cloud Function that will be triggered when a new file appears in the Cloud Storage folder.
To create a Cloud Function from the Google Cloud console, click here.
The trigger type must be “Cloud Storage” and the type of object event upon which the function will be triggered is “Finalize/Create.”
You also need to specify the Cloud Storage bucket that you want to trigger the function from. In our example, we will use Python as the Runtime.
The Cloud Function code follows this logic:
- Retrieve the file name from the event object.
- Check if the file path is the folder to monitor (landingzone in our example).
- Call the Cloud Dataprep REST API endpoint with the recipe ID and the file name as values for the FileName parameter.
- Leverage the Dataprep user Access Token in the API call for authentication by Cloud Dataprep.
If you want to explore more about Cloud Functions and Cloud Storage management, I recommend that you go through this tutorial.
We have provided the Python code here. You will need to edit this accordingly by replacing the highlighted values with the one that you retrieved in your Cloud Dataprep project (section 2 and 3 of the blog):
|datataprep_auth_token = ‘xxxxxxxxxxxxxxx‘
dataprep_jobid = 99999999
|if context.event_type == ‘google.storage.object.finalize’ and newfilepath == ‘landingzone‘:|
You then need to deploy the Cloud Function. It will run and wait for files to be uploaded to trigger the Cloud Dataprep job. You can learn more about deploying and executing a Cloud Function here.
5. Testing the end-to-end process
You are now ready to test the end-to-end data pipeline by adding a new file in the Google Cloud bucket and monitoring it to ensure that the Cloud Dataprep job is executed.
In the Google Console, navigate to Storage, Browser, and go to the proper Google Cloud Storage bucket to upload the customer.csv file.
In the Cloud Dataprep Jobs window, wait (refresh) for the “Customer Exceptions” job to get started and executed to load the BigQuery table.
You can verify that the job is processing the proper file through this window.
Lastly, you can confirm proper execution details (API call with the parameter and Cloud Dataprep job status) by reviewing the Google Cloud Functions logs located here.
After following this step-by-step guide, you now have the fundamental principles required to automate an end-to-end data pipeline triggered when a file is uploaded in a particular folder.
You have learned about:
- Cloud Dataprep parameters
- Cloud Dataprep API
- Cloud Functions with storage monitoring and calling an API
- Monitoring Cloud Function and Cloud Dataprep execution with API call and parameter
You are now ready to automate your flow based on an event. You can also automate Cloud Dataprep leveraging an external scheduler if you want to include Cloud Dataprep in a broader perspective as part of an enterprise data logic for a particular initiative. Take a look at this blog post to learn how to leverage Cloud Composer (Google Cloud scheduler) for scheduling Cloud Dataprep jobs.