This article explains how to use Cloud Composer to automate Cloud Dataprep flow migration between two workspaces. This process can be leveraged for your Cloud Data Warehouse project to move from development, test, and production following what is known as Continuous Integration and Continuous Delivery (CI/CD) pipeline in agile development.
At a high level, this pipeline will follow the process shown below:
On Github you can download the Cloud Composer Directed Acyclic Graph (DAG) example file that we will be explaining during this article.
We will be using a simple flow, with one Cloud Dataprep input and one recipe, as our example. You’ll be able to leverage this framework to extend it to support you specific environments.
Before creating this CI/CD pipeline, you should ensure that you have performed the following steps in your Google Cloud Platform (GCP) environment:
- You have Cloud Dataprep Premium enabled in each of your projects. Cloud Dataprep Premium is needed to benefit from the APIs needed in the automation process.
- You have “Owner” privileges on each of the GCP projects where Cloud Dataprep Premium is enabled. Using the Cloud Dataprep Premium APIs requires “Owner” privileges.
- You have enabled Cloud Composer in one of your projects. Note that Cloud Composer does not necessarily need to reside in the same project as Cloud Dataprep Premium.
2. Configure the variables in Cloud Composer
We need to set up four variables in Cloud Composer. These variables allow you to pass static values into a DAG.
First, we need to store the ID for the flow that we will be exporting from the development environment. From the Cloud Dataprep Premium UI, open the flow that you want to export. In the URL bar, you can find the flow ID, as shown below:
Copy this ID. Open the Cloud Composer Admin/Variables interface and create a new variable containing this value.
Next, we need to store an ID for the dataset in the production environment that will be associated with your flow. As part of a CI/CD pipeline, we want to fully automate the process of preparing the production flow for deployment. This means replacing the references to datasets in the development environment with dataset references in the production environment. Open your Cloud Dataprep production project and navigate to the source dataset that you want to use. In the URL bar, you can find the dataset ID, as shown below:
Copy the dataset ID. Open the Cloud Composer Admin/Variables interface and create a new variable containing this value. If you have multiple source datasets, you should store the IDs for each dataset as an array object inside a single Cloud Composer variable.
The final variables that we need to store in Cloud Composer are the API access tokens for the Cloud Dataprep development and production projects. You can find instructions about how to create API access tokens here: API Access Token documentation. Store each of these access tokens in Cloud Composer following this format: Bearer <token>
3. Set up an HTTP connection in Cloud Composer
In Cloud Composer, you need to create a new HTTP connection to point to the Cloud Dataprep API endpoint. Ensure that the connection points to https://api.clouddataprep.com.
Developing the Cloud Composer DAG
The full DAG includes seven tasks that export the flow from the development environment, save the flow in GCS, import the flow into the production environment, and programmatically identify the correct data sources in the production environment that should be associated with the flow.
1. Define variable values by their IDs
As the first step in the DAG, pass in each of the variables that you defined in Cloud Composer. This allows you to invoke the variables in each task in your DAG.
2. Create the task to export the flow and store it in a zip file in GCS
The first task that we want to perform is exporting the flow from the development project. To do this, you will invoke the getFlowPackage API: https://api.trifacta.com/dataprep-premium/index.html#operation/getFlowPackage
Cloud Composer automatically provisions a new GCS bucket for maintaining copies of the DAGs, and storing any input/output data from executed pipelines. The GCS folder that Cloud Composer can access is /home/airflow/gcs/data. The example task shown below writes the API response to the Cloud Composer GCS data folder. This will be the zip file containing the exported flow.
Note: As an extension to this task, you could potentially move the exported flow from GCS into a Git environment.
3. Create the task to import the flow to the production environment
Once you have saved the flow zip file to GCS, you need to invoke the importPackage API to import the flow zip file into the production Cloud Dataprep environment. Instructions for using this API can be found here: https://api.trifacta.com/dataprep-premium/index.html#operation/importPackage
As an output, this API call returns metadata about the imported flow, including the ID for the flow in the new environment. This metadata will be critical for the next steps in this pipeline. Our example DAG saves the API response to Xcom, which allows for communication between tasks in the pipeline.
4. Create the task to retrieve the list of data sources for the imported flow
When you import your flow into a new environment, the flow retains all of the references to the data sources from the original environment. This means that without replacing the data sources, the imported flow will not be able to execute. We need to replace each of the original data sources in the flow with the corresponding data sources in the new environment.
As a first step, you need to invoke the getFlowInputs API. This will return information about all of the data sources currently associated with the newly imported flow. Note that even though the data sources are invalid, they retain the correct connections with downstream objects.
The code shown below passes the flow ID into the getFlowInput API endpoint. The API call that you made in step 3 would have returned this flow ID as part of the response.
Refer to the API documentation (https://api.trifacta.com/dataprep-premium/index.html#operation/getFlowInputs) for details about the response from this call. Once again, we need to retain this response for use in downstream tasks.
5. Create the task to retrieve the list of nodes for the imported flow
Trifacta’s API for replacing data sources requires you to know the ID for the recipe directly connected to the data source that you want to replace. In order to determine this recipe ID, you need to connect information returned from two API calls: flow nodes and flow edges.
The first set of information that you need to return is the flow nodes data. Flow nodes assign unique IDs to all of the individual objects in a flow. Invoke the listFlows API with an embedded resource named flownodes to retrieve a list of all the nodes in a flow.
Documentation for the listFlows API can be found here: https://api.trifacta.com/dataprep-premium/index.html#operation/listFlows
This call will return a JSON object structured as shown below:
You will want to store this response for use in a downstream task.
6. Create the task to retrieve the list of edges for the imported flow
Next, we need to return information about flow edges, which identify how each node in a flow is connected to other nodes. Flow edges contain an input node ID, and an output node ID. The flow edges information is the final detail that we need to know in order to connect our data source to a recipe.
As in step 5, you will be invoking the listFlows API. However, this time you will want to embed a call for flowEdges.
This call returns the following JSON object:
Once again, store this response for use in a downstream task.
7. Create task to identify the recipe associated with the source dataset
Now that you have returned information about the flow nodes, flow edges, and imported dataset ID, you are ready to use these outputs to identify the ID for the recipe connected to your data source. You will need to chain the outputs together in the following way:
- When the parsingRecipe.id value returned from step 4 matches the recipe.id value returned from step 5, return the id for that flownode.
- When the flownode id matches the inputFlownode.id returned from step 6, return the outputFlownode id.
The output flownode ID becomes the value that you will enter in the swap datasource API call.
The following Python task shows how you would iteratively navigate through the JSON objects returned in steps 4, 5, and 6, and return the output flownode ID.
Note: This code assumes that you only had a single data source in your flow. If your flow contains more data sources, you would want to loop through the array of data sources returned in step 4, and match each data source to the corresponding parsingRecipeID, flownode ID, and output flownode ID.
8. Create the task to replace the source dataset in the imported flow
Use the recipe ID that you identified in step 7 to invoke the updateInputDataset API. This API call allows you to replace the existing dev source dataset references with the updated production source dataset references.
You can reference the API documentation here: https://api.trifacta.com/dataprep-premium/index.html#operation/updateInputDataset
9. Sequence the tasks
Finally, to invoke the end-to-end pipeline, you will need to sequence these tasks in your DAG definition:
You have now created an end-to-end CI/CD pipeline for exporting a flow from a development project, importing that flow into a production project, and replacing the data sources in the imported flow.
To further extend your CI/CD pipeline, you could trigger automated job execution through Cloud Composer and the Cloud Dataprep APIs. You can refer to this blog for details about how to trigger Cloud Dataprep jobs using Cloud Composer: https://cloud.google.com/blog/products/data-analytics/how-to-orchestrate-cloud-dataprep-jobs-using-cloud-composer