forked from GoogleCloudPlatform/python-docs-samples
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdataflowtemplateoperator_tutorial.py
More file actions
85 lines (73 loc) · 3.44 KB
/
dataflowtemplateoperator_tutorial.py
File metadata and controls
85 lines (73 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_dataflow_dag]
"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.
This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
created.
For more info on zones where Dataflow is available see:
https://cloud.google.com/dataflow/docs/resources/locations
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
from airflow.utils.dates import days_ago
bucket_path = models.Variable.get("bucket_path")
project_id = models.Variable.get("project_id")
gce_zone = models.Variable.get("gce_zone")
default_args = {
# Tell airflow to start one day ago, so that it runs as soon as you upload it
"start_date": days_ago(1),
"dataflow_default_options": {
"project": project_id,
# Set to your zone
"zone": gce_zone,
# This is a subfolder for storing temporary files, like the staged pipeline job.
"tempLocation": bucket_path + "/tmp/",
},
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
# The id you will see in the DAG airflow page
"composer_dataflow_dag",
default_args=default_args,
# The interval with which to schedule the DAG
schedule_interval=datetime.timedelta(days=1), # Override to match your needs
) as dag:
start_template_job = DataflowTemplatedJobStartOperator(
# The task id of your job
task_id="dataflow_operator_transform_csv_to_bq",
# The name of the template that you're using.
# Below is a list of all the templates you can use.
# For versions in non-production environments, use the subfolder 'latest'
# https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
# Use the link above to specify the correct parameters for your template.
parameters={
"javascriptTextTransformFunctionName": "transformCSVtoJSON",
"JSONPath": bucket_path + "/jsonSchema.json",
"javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
"inputFilePattern": bucket_path + "/inputFile.txt",
"outputTable": project_id + ":average_weather.average_weather",
"bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
},
)
# [END composer_dataflow_dag]