Integration¶
- Reverse Proxy
- Azure: Microsoft Azure
- AWS: Amazon Web Services
- Databricks
- GCP: Google Cloud Platform
- Qubole
Reverse Proxy¶
Airflow can be set up behind a reverse proxy, with the ability to set its endpoint with great flexibility.
For example, you can configure your reverse proxy to get:
https://lab.mycompany.com/myorg/airflow/
To do so, you need to set the following setting in your airflow.cfg:
base_url = http://my_host/myorg/airflow
Additionally if you use Celery Executor, you can get Flower in /myorg/flower with:
flower_url_prefix = /myorg/flower
Your reverse proxy (ex: nginx) should be configured as follow:
pass the url and http header as it for the Airflow webserver, without any rewrite, for example:
server { listen 80; server_name lab.mycompany.com; location /myorg/airflow/ { proxy_pass http://localhost:8080; proxy_set_header Host $host; proxy_redirect off; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; } }
rewrite the url for the flower endpoint:
server { listen 80; server_name lab.mycompany.com; location /myorg/flower/ { rewrite ^/myorg/flower/(.*)$ /$1 break; # remove prefix from http header proxy_pass http://localhost:5555; proxy_set_header Host $host; proxy_redirect off; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; } }
To ensure that Airflow generates URLs with the correct scheme when running behind a TLS-terminating proxy, you should configure the proxy to set the X-Forwarded-Proto header, and enable the ProxyFix middleware in your airflow.cfg:
enable_proxy_fix = True
Note: you should only enable the ProxyFix middleware when running Airflow behind a trusted proxy (AWS ELB, nginx, etc.).
Azure: Microsoft Azure¶
Airflow has limited support for Microsoft Azure: interfaces exist only for Azure Blob Storage and Azure Data Lake. Hook, Sensor and Operator for Blob Storage and Azure Data Lake Hook are in contrib section.
Azure Blob Storage¶
All classes communicate via the Window Azure Storage Blob protocol. Make sure that a Airflow connection of type wasb exists. Authorization can be done by supplying a login (=Storage account name) and password (=KEY), or login and SAS token in the extra field (see connection wasb_default for an example).
- WasbBlobSensor: Checks if a blob is present on Azure Blob storage.
- WasbPrefixSensor: Checks if blobs matching a prefix are present on Azure Blob storage.
- FileToWasbOperator: Uploads a local file to a container as a blob.
- WasbHook: Interface with Azure Blob Storage.
WasbBlobSensor¶
-
class
airflow.contrib.sensors.wasb_sensor.
WasbBlobSensor
(**kwargs)[source]¶ Bases:
airflow.sensors.base_sensor_operator.BaseSensorOperator
Waits for a blob to arrive on Azure Blob Storage.
Parameters: - container_name (str) – Name of the container.
- blob_name (str) – Name of the blob.
- wasb_conn_id (str) – Reference to the wasb connection.
- check_options (dict) – Optional keyword arguments that WasbHook.check_for_blob() takes.
WasbPrefixSensor¶
-
class
airflow.contrib.sensors.wasb_sensor.
WasbPrefixSensor
(**kwargs)[source]¶ Bases:
airflow.sensors.base_sensor_operator.BaseSensorOperator
Waits for blobs matching a prefix to arrive on Azure Blob Storage.
Parameters: - container_name (str) – Name of the container.
- prefix (str) – Prefix of the blob.
- wasb_conn_id (str) – Reference to the wasb connection.
- check_options (dict) – Optional keyword arguments that WasbHook.check_for_prefix() takes.
FileToWasbOperator¶
-
class
airflow.contrib.operators.file_to_wasb.
FileToWasbOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Uploads a file to Azure Blob Storage.
Parameters: - file_path (str) – Path to the file to load. (templated)
- container_name (str) – Name of the container. (templated)
- blob_name (str) – Name of the blob. (templated)
- wasb_conn_id (str) – Reference to the wasb connection.
- load_options (dict) – Optional keyword arguments that WasbHook.load_file() takes.
WasbHook¶
-
class
airflow.contrib.hooks.wasb_hook.
WasbHook
(wasb_conn_id='wasb_default')[source]¶ Bases:
airflow.hooks.base_hook.BaseHook
Interacts with Azure Blob Storage through the wasb:// protocol.
Additional options passed in the ‘extra’ field of the connection will be passed to the BlockBlockService() constructor. For example, authenticate using a SAS token by adding {“sas_token”: “YOUR_TOKEN”}.
Parameters: wasb_conn_id (str) – Reference to the wasb connection. -
check_for_blob
(container_name, blob_name, **kwargs)[source]¶ Check if a blob exists on Azure Blob Storage.
Parameters: - container_name (str) – Name of the container.
- blob_name (str) – Name of the blob.
- kwargs (object) – Optional keyword arguments that BlockBlobService.exists() takes.
Returns: True if the blob exists, False otherwise.
Return type: bool
-
check_for_prefix
(container_name, prefix, **kwargs)[source]¶ Check if a prefix exists on Azure Blob storage.
Parameters: - container_name (str) – Name of the container.
- prefix (str) – Prefix of the blob.
- kwargs (object) – Optional keyword arguments that BlockBlobService.list_blobs() takes.
Returns: True if blobs matching the prefix exist, False otherwise.
Return type: bool
-
delete_file
(container_name, blob_name, is_prefix=False, ignore_if_missing=False, **kwargs)[source]¶ Delete a file from Azure Blob Storage.
Parameters: - container_name (str) – Name of the container.
- blob_name (str) – Name of the blob.
- is_prefix (bool) – If blob_name is a prefix, delete all matching files
- ignore_if_missing – if True, then return success even if the
blob does not exist. :type ignore_if_missing: bool :param kwargs: Optional keyword arguments that
BlockBlobService.create_blob_from_path() takes.
-
get_file
(file_path, container_name, blob_name, **kwargs)[source]¶ Download a file from Azure Blob Storage.
Parameters: - file_path (str) – Path to the file to download.
- container_name (str) – Name of the container.
- blob_name (str) – Name of the blob.
- kwargs (object) – Optional keyword arguments that BlockBlobService.create_blob_from_path() takes.
-
load_file
(file_path, container_name, blob_name, **kwargs)[source]¶ Upload a file to Azure Blob Storage.
Parameters: - file_path (str) – Path to the file to load.
- container_name (str) – Name of the container.
- blob_name (str) – Name of the blob.
- kwargs (object) – Optional keyword arguments that BlockBlobService.create_blob_from_path() takes.
-
load_string
(string_data, container_name, blob_name, **kwargs)[source]¶ Upload a string to Azure Blob Storage.
Parameters: - string_data (str) – String to load.
- container_name (str) – Name of the container.
- blob_name (str) – Name of the blob.
- kwargs (object) – Optional keyword arguments that BlockBlobService.create_blob_from_text() takes.
-
read_file
(container_name, blob_name, **kwargs)[source]¶ Read a file from Azure Blob Storage and return as a string.
Parameters: - container_name (str) – Name of the container.
- blob_name (str) – Name of the blob.
- kwargs (object) – Optional keyword arguments that BlockBlobService.create_blob_from_path() takes.
-
Logging¶
Airflow can be configured to read and write task logs in Azure Blob Storage. See Writing Logs to Azure Blob Storage.
Azure CosmosDB¶
AzureCosmosDBHook communicates via the Azure Cosmos library. Make sure that a Airflow connection of type azure_cosmos exists. Authorization can be done by supplying a login (=Endpoint uri), password (=secret key) and extra fields database_name and collection_name to specify the default database and collection to use (see connection azure_cosmos_default for an example).
- AzureCosmosDBHook: Interface with Azure CosmosDB.
- AzureCosmosInsertDocumentOperator: Simple operator to insert document into CosmosDB.
- AzureCosmosDocumentSensor: Simple sensor to detect document existence in CosmosDB.
AzureCosmosDBHook¶
-
class
airflow.contrib.hooks.azure_cosmos_hook.
AzureCosmosDBHook
(azure_cosmos_conn_id='azure_cosmos_default')[source]¶ Bases:
airflow.hooks.base_hook.BaseHook
Interacts with Azure CosmosDB.
login should be the endpoint uri, password should be the master key optionally, you can use the following extras to default these values {“database_name”: “<DATABASE_NAME>”, “collection_name”: “COLLECTION_NAME”}.
Parameters: azure_cosmos_conn_id (str) – Reference to the Azure CosmosDB connection. -
create_collection
(collection_name, database_name=None)[source]¶ Creates a new collection in the CosmosDB database.
-
delete_collection
(collection_name, database_name=None)[source]¶ Deletes an existing collection in the CosmosDB database.
-
delete_document
(document_id, database_name=None, collection_name=None)[source]¶ Delete an existing document out of a collection in the CosmosDB database.
-
does_collection_exist
(collection_name, database_name=None)[source]¶ Checks if a collection exists in CosmosDB.
-
get_document
(document_id, database_name=None, collection_name=None)[source]¶ Get a document from an existing collection in the CosmosDB database.
-
get_documents
(sql_string, database_name=None, collection_name=None, partition_key=None)[source]¶ Get a list of documents from an existing collection in the CosmosDB database via SQL query.
-
AzureCosmosInsertDocumentOperator¶
-
class
airflow.contrib.operators.azure_cosmos_operator.
AzureCosmosInsertDocumentOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Inserts a new document into the specified Cosmos database and collection It will create both the database and collection if they do not already exist
Parameters: - database_name (str) – The name of the database. (templated)
- collection_name (str) – The name of the collection. (templated)
- document (dict) – The document to insert
- azure_cosmos_conn_id (str) – reference to a CosmosDB connection.
AzureCosmosDocumentSensor¶
-
class
airflow.contrib.sensors.azure_cosmos_sensor.
AzureCosmosDocumentSensor
(**kwargs)[source]¶ Bases:
airflow.sensors.base_sensor_operator.BaseSensorOperator
Checks for the existence of a document which matches the given query in CosmosDB. Example:
>>> azure_cosmos_sensor = AzureCosmosDocumentSensor(database_name="somedatabase_name", ... collection_name="somecollection_name", ... document_id="unique-doc-id", ... azure_cosmos_conn_id="azure_cosmos_default", ... task_id="azure_cosmos_sensor")
Azure Data Lake¶
AzureDataLakeHook communicates via a REST API compatible with WebHDFS. Make sure that a Airflow connection of type azure_data_lake exists. Authorization can be done by supplying a login (=Client ID), password (=Client Secret) and extra fields tenant (Tenant) and account_name (Account Name)
(see connection azure_data_lake_default for an example).
- AzureDataLakeHook: Interface with Azure Data Lake.
- AzureDataLakeStorageListOperator: Lists the files located in a specified Azure Data Lake path.
- AdlsToGoogleCloudStorageOperator: Copies files from an Azure Data Lake path to a Google Cloud Storage bucket.
AzureDataLakeHook¶
-
class
airflow.contrib.hooks.azure_data_lake_hook.
AzureDataLakeHook
(azure_data_lake_conn_id='azure_data_lake_default')[source]¶ Bases:
airflow.hooks.base_hook.BaseHook
Interacts with Azure Data Lake.
Client ID and client secret should be in user and password parameters. Tenant and account name should be extra field as {“tenant”: “<TENANT>”, “account_name”: “ACCOUNT_NAME”}.
Parameters: azure_data_lake_conn_id (str) – Reference to the Azure Data Lake connection. -
check_for_file
(file_path)[source]¶ Check if a file exists on Azure Data Lake.
Parameters: file_path (str) – Path and name of the file. Returns: True if the file exists, False otherwise. Return type: bool
-
download_file
(local_path, remote_path, nthreads=64, overwrite=True, buffersize=4194304, blocksize=4194304)[source]¶ Download a file from Azure Blob Storage.
Parameters: - local_path (str) – local path. If downloading a single file, will write to this specific file, unless it is an existing directory, in which case a file is created within it. If downloading multiple files, this is the root directory to write within. Will create directories as required.
- remote_path (str) – remote path/globstring to use to find remote files. Recursive glob patterns using ** are not supported.
- nthreads (int) – Number of threads to use. If None, uses the number of cores.
- overwrite (bool) – Whether to forcibly overwrite existing files/directories. If False and remote path is a directory, will quit regardless if any files would be overwritten or not. If True, only matching filenames are actually overwritten.
- buffersize (int) – int [2**22] Number of bytes for internal buffer. This block cannot be bigger than a chunk and cannot be smaller than a block.
- blocksize (int) – int [2**22] Number of bytes for a block. Within each chunk, we write a smaller block for each API call. This block cannot be bigger than a chunk.
-
upload_file
(local_path, remote_path, nthreads=64, overwrite=True, buffersize=4194304, blocksize=4194304)[source]¶ Upload a file to Azure Data Lake.
Parameters: - local_path (str) – local path. Can be single file, directory (in which case, upload recursively) or glob pattern. Recursive glob patterns using ** are not supported.
- remote_path (str) – Remote path to upload to; if multiple files, this is the dircetory root to write within.
- nthreads (int) – Number of threads to use. If None, uses the number of cores.
- overwrite (bool) – Whether to forcibly overwrite existing files/directories. If False and remote path is a directory, will quit regardless if any files would be overwritten or not. If True, only matching filenames are actually overwritten.
- buffersize (int) – int [2**22] Number of bytes for internal buffer. This block cannot be bigger than a chunk and cannot be smaller than a block.
- blocksize (int) – int [2**22] Number of bytes for a block. Within each chunk, we write a smaller block for each API call. This block cannot be bigger than a chunk.
-
AzureDataLakeStorageListOperator¶
-
class
airflow.contrib.operators.adls_list_operator.
AzureDataLakeStorageListOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
List all files from the specified path
- This operator returns a python list with the names of files which can be used by
- xcom in the downstream tasks.
Parameters: - path (str) – The Azure Data Lake path to find the objects. Supports glob strings (templated)
- azure_data_lake_conn_id (str) – The connection ID to use when connecting to Azure Data Lake Storage.
- Example:
The following Operator would list all the Parquet files from
folder/output/
folder in the specified ADLS accountadls_files = AzureDataLakeStorageListOperator( task_id='adls_files', path='folder/output/*.parquet', azure_data_lake_conn_id='azure_data_lake_default' )
AdlsToGoogleCloudStorageOperator¶
-
class
airflow.contrib.operators.adls_to_gcs.
AdlsToGoogleCloudStorageOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.adls_list_operator.AzureDataLakeStorageListOperator
Synchronizes an Azure Data Lake Storage path with a GCS bucket
Parameters: - src_adls (str) – The Azure Data Lake path to find the objects (templated)
- dest_gcs (str) – The Google Cloud Storage bucket and prefix to store the objects. (templated)
- replace (bool) – If true, replaces same-named files in GCS
- azure_data_lake_conn_id (str) – The connection ID to use when connecting to Azure Data Lake Storage.
- google_cloud_storage_conn_id (str) – The connection ID to use when connecting to Google Cloud Storage.
- delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- Examples:
The following Operator would copy a single file named
hello/world.avro
from ADLS to the GCS bucketmybucket
. Its full resulting gcs path will begs://mybucket/hello/world.avro
copy_single_file = AdlsToGoogleCloudStorageOperator( task_id='copy_single_file', src_adls='hello/world.avro', dest_gcs='gs://mybucket', replace=False, azure_data_lake_conn_id='azure_data_lake_default', google_cloud_storage_conn_id='google_cloud_default' )
The following Operator would copy all parquet files from ADLS to the GCS bucket
mybucket
.copy_all_files = AdlsToGoogleCloudStorageOperator( task_id='copy_all_files', src_adls='*.parquet', dest_gcs='gs://mybucket', replace=False, azure_data_lake_conn_id='azure_data_lake_default', google_cloud_storage_conn_id='google_cloud_default' ) The following Operator would copy all parquet files from ADLS path ``/hello/world``to the GCS bucket ``mybucket``. :: copy_world_files = AdlsToGoogleCloudStorageOperator( task_id='copy_world_files', src_adls='hello/world/*.parquet', dest_gcs='gs://mybucket', replace=False, azure_data_lake_conn_id='azure_data_lake_default', google_cloud_storage_conn_id='google_cloud_default' )
AWS: Amazon Web Services¶
Airflow has extensive support for Amazon Web Services. But note that the Hooks, Sensors and Operators are in the contrib section.
AWS EMR¶
- EmrAddStepsOperator : Adds steps to an existing EMR JobFlow.
- EmrCreateJobFlowOperator : Creates an EMR JobFlow, reading the config from the EMR connection.
- EmrTerminateJobFlowOperator : Terminates an EMR JobFlow.
- EmrHook : Interact with AWS EMR.
EmrAddStepsOperator¶
-
class
airflow.contrib.operators.emr_add_steps_operator.
EmrAddStepsOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
An operator that adds steps to an existing EMR job_flow.
Parameters: - job_flow_id (str) – id of the JobFlow to add steps to. (templated)
- aws_conn_id (str) – aws connection to uses
- steps (list) – boto3 style steps to be added to the jobflow. (templated)
EmrCreateJobFlowOperator¶
-
class
airflow.contrib.operators.emr_create_job_flow_operator.
EmrCreateJobFlowOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Creates an EMR JobFlow, reading the config from the EMR connection. A dictionary of JobFlow overrides can be passed that override the config from the connection.
Parameters: - aws_conn_id (str) – aws connection to uses
- emr_conn_id (str) – emr connection to use
- job_flow_overrides (dict) – boto3 style arguments to override emr_connection extra. (templated)
EmrTerminateJobFlowOperator¶
-
class
airflow.contrib.operators.emr_terminate_job_flow_operator.
EmrTerminateJobFlowOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Operator to terminate EMR JobFlows.
Parameters: - job_flow_id (str) – id of the JobFlow to terminate. (templated)
- aws_conn_id (str) – aws connection to uses
EmrHook¶
-
class
airflow.contrib.hooks.emr_hook.
EmrHook
(emr_conn_id=None, region_name=None, *args, **kwargs)[source]¶ Bases:
airflow.contrib.hooks.aws_hook.AwsHook
Interact with AWS EMR. emr_conn_id is only necessary for using the create_job_flow method.
AWS S3¶
- S3Hook : Interact with AWS S3.
- S3FileTransformOperator : Copies data from a source S3 location to a temporary location on the local filesystem.
- S3ListOperator : Lists the files matching a key prefix from a S3 location.
- S3ToGoogleCloudStorageOperator : Syncs an S3 location with a Google Cloud Storage bucket.
- S3ToGoogleCloudStorageTransferOperator : Syncs an S3 bucket with a Google Cloud Storage bucket using the GCP Storage Transfer Service.
- S3ToHiveTransfer : Moves data from S3 to Hive. The operator downloads a file from S3, stores the file locally before loading it into a Hive table.
S3Hook¶
-
class
airflow.hooks.S3_hook.
S3Hook
(aws_conn_id='aws_default', verify=None)[source]¶ Bases:
airflow.contrib.hooks.aws_hook.AwsHook
Interact with AWS S3, using the boto3 library.
-
check_for_bucket
(bucket_name)[source]¶ Check if bucket_name exists.
Parameters: bucket_name (str) – the name of the bucket
-
check_for_key
(key, bucket_name=None)[source]¶ Checks if a key exists in a bucket
Parameters: - key (str) – S3 key that will point to the file
- bucket_name (str) – Name of the bucket in which the file is stored
-
check_for_prefix
(bucket_name, prefix, delimiter)[source]¶ Checks that a prefix exists in a bucket
Parameters: - bucket_name (str) – the name of the bucket
- prefix (str) – a key prefix
- delimiter (str) – the delimiter marks key hierarchy.
-
check_for_wildcard_key
(wildcard_key, bucket_name=None, delimiter='')[source]¶ Checks that a key matching a wildcard expression exists in a bucket
Parameters: - wildcard_key (str) – the path to the key
- bucket_name (str) – the name of the bucket
- delimiter (str) – the delimiter marks key hierarchy
-
copy_object
(source_bucket_key, dest_bucket_key, source_bucket_name=None, dest_bucket_name=None, source_version_id=None)[source]¶ Creates a copy of an object that is already stored in S3.
Note: the S3 connection used here needs to have access to both source and destination bucket/key.
Parameters: - source_bucket_key (str) –
The key of the source object.
It can be either full s3:// style url or relative path from root level.
When it’s specified as a full s3:// url, please omit source_bucket_name.
- dest_bucket_key (str) –
The key of the object to copy to.
The convention to specify dest_bucket_key is the same as source_bucket_key.
- source_bucket_name (str) –
Name of the S3 bucket where the source object is in.
It should be omitted when source_bucket_key is provided as a full s3:// url.
- dest_bucket_name (str) –
Name of the S3 bucket to where the object is copied.
It should be omitted when dest_bucket_key is provided as a full s3:// url.
- source_version_id (str) – Version ID of the source object (OPTIONAL)
- source_bucket_key (str) –
-
create_bucket
(bucket_name, region_name=None)[source]¶ Creates an Amazon S3 bucket.
Parameters: - bucket_name (str) – The name of the bucket
- region_name (str) – The name of the aws region in which to create the bucket.
-
delete_objects
(bucket, keys)[source]¶ Parameters: - bucket (str) – Name of the bucket in which you are going to delete object(s)
- keys (str or list) –
The key(s) to delete from S3 bucket.
When
keys
is a string, it’s supposed to be the key name of the single object to delete.When
keys
is a list, it’s supposed to be the list of the keys to delete.
-
get_bucket
(bucket_name)[source]¶ Returns a boto3.S3.Bucket object
Parameters: bucket_name (str) – the name of the bucket
-
get_key
(key, bucket_name=None)[source]¶ Returns a boto3.s3.Object
Parameters: - key (str) – the path to the key
- bucket_name (str) – the name of the bucket
-
get_wildcard_key
(wildcard_key, bucket_name=None, delimiter='')[source]¶ Returns a boto3.s3.Object object matching the wildcard expression
Parameters: - wildcard_key (str) – the path to the key
- bucket_name (str) – the name of the bucket
- delimiter (str) – the delimiter marks key hierarchy
-
list_keys
(bucket_name, prefix='', delimiter='', page_size=None, max_items=None)[source]¶ Lists keys in a bucket under prefix and not containing delimiter
Parameters: - bucket_name (str) – the name of the bucket
- prefix (str) – a key prefix
- delimiter (str) – the delimiter marks key hierarchy.
- page_size (int) – pagination size
- max_items (int) – maximum items to return
-
list_prefixes
(bucket_name, prefix='', delimiter='', page_size=None, max_items=None)[source]¶ Lists prefixes in a bucket under prefix
Parameters: - bucket_name (str) – the name of the bucket
- prefix (str) – a key prefix
- delimiter (str) – the delimiter marks key hierarchy.
- page_size (int) – pagination size
- max_items (int) – maximum items to return
-
load_bytes
(bytes_data, key, bucket_name=None, replace=False, encrypt=False)[source]¶ Loads bytes to S3
This is provided as a convenience to drop a string in S3. It uses the boto infrastructure to ship a file to s3.
Parameters: - bytes_data (bytes) – bytes to set as content for the key.
- key (str) – S3 key that will point to the file
- bucket_name (str) – Name of the bucket in which to store the file
- replace (bool) – A flag to decide whether or not to overwrite the key if it already exists
- encrypt (bool) – If True, the file will be encrypted on the server-side by S3 and will be stored in an encrypted form while at rest in S3.
-
load_file
(filename, key, bucket_name=None, replace=False, encrypt=False)[source]¶ Loads a local file to S3
Parameters: - filename (str) – name of the file to load.
- key (str) – S3 key that will point to the file
- bucket_name (str) – Name of the bucket in which to store the file
- replace (bool) – A flag to decide whether or not to overwrite the key if it already exists. If replace is False and the key exists, an error will be raised.
- encrypt (bool) – If True, the file will be encrypted on the server-side by S3 and will be stored in an encrypted form while at rest in S3.
-
load_file_obj
(file_obj, key, bucket_name=None, replace=False, encrypt=False)[source]¶ Loads a file object to S3
Parameters: - file_obj (file-like object) – The file-like object to set as the content for the S3 key.
- key (str) – S3 key that will point to the file
- bucket_name (str) – Name of the bucket in which to store the file
- replace (bool) – A flag that indicates whether to overwrite the key if it already exists.
- encrypt (bool) – If True, S3 encrypts the file on the server, and the file is stored in encrypted form at rest in S3.
-
load_string
(string_data, key, bucket_name=None, replace=False, encrypt=False, encoding='utf-8')[source]¶ Loads a string to S3
This is provided as a convenience to drop a string in S3. It uses the boto infrastructure to ship a file to s3.
Parameters: - string_data (str) – string to set as content for the key.
- key (str) – S3 key that will point to the file
- bucket_name (str) – Name of the bucket in which to store the file
- replace (bool) – A flag to decide whether or not to overwrite the key if it already exists
- encrypt (bool) – If True, the file will be encrypted on the server-side by S3 and will be stored in an encrypted form while at rest in S3.
-
read_key
(key, bucket_name=None)[source]¶ Reads a key from S3
Parameters: - key (str) – S3 key that will point to the file
- bucket_name (str) – Name of the bucket in which the file is stored
-
select_key
(key, bucket_name=None, expression='SELECT * FROM S3Object', expression_type='SQL', input_serialization=None, output_serialization=None)[source]¶ Reads a key with S3 Select.
Parameters: - key (str) – S3 key that will point to the file
- bucket_name (str) – Name of the bucket in which the file is stored
- expression (str) – S3 Select expression
- expression_type (str) – S3 Select expression type
- input_serialization (dict) – S3 Select input data serialization format
- output_serialization (dict) – S3 Select output data serialization format
Returns: retrieved subset of original data by S3 Select
Return type: str
See also
For more details about S3 Select parameters: http://boto3.readthedocs.io/en/latest/reference/services/s3.html#S3.Client.select_object_content
-
S3FileTransformOperator¶
-
class
airflow.operators.s3_file_transform_operator.
S3FileTransformOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Copies data from a source S3 location to a temporary location on the local filesystem. Runs a transformation on this file as specified by the transformation script and uploads the output to a destination S3 location.
The locations of the source and the destination files in the local filesystem is provided as an first and second arguments to the transformation script. The transformation script is expected to read the data from source, transform it and write the output to the local destination file. The operator then takes over control and uploads the local destination file to S3.
S3 Select is also available to filter the source contents. Users can omit the transformation script if S3 Select expression is specified.
Parameters: - source_s3_key (str) – The key to be retrieved from S3. (templated)
- source_aws_conn_id (str) – source s3 connection
- source_verify (bool or str) –
Whether or not to verify SSL certificates for S3 connetion. By default SSL certificates are verified. You can provide the following values:
False
: do not validate SSL certificates. SSL will still be used- (unless use_ssl is False), but SSL certificates will not be verified.
path/to/cert/bundle.pem
: A filename of the CA cert bundle to uses.- You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
This is also applicable to
dest_verify
. - dest_s3_key (str) – The key to be written from S3. (templated)
- dest_aws_conn_id (str) – destination s3 connection
- replace (bool) – Replace dest S3 key if it already exists
- transform_script (str) – location of the executable transformation script
- select_expression (str) – S3 Select expression
S3ListOperator¶
-
class
airflow.contrib.operators.s3_list_operator.
S3ListOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
List all objects from the bucket with the given string prefix in name.
This operator returns a python list with the name of objects which can be used by xcom in the downstream task.
Parameters: - bucket (string) – The S3 bucket where to find the objects. (templated)
- prefix (string) – Prefix string to filters the objects whose name begin with such prefix. (templated)
- delimiter (string) – the delimiter marks key hierarchy. (templated)
- aws_conn_id (string) – The connection ID to use when connecting to S3 storage.
Parame verify: Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values: - False: do not validate SSL certificates. SSL will still be used
(unless use_ssl is False), but SSL certificates will not be verified.
- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
- You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
- Example:
The following operator would list all the files (excluding subfolders) from the S3
customers/2018/04/
key in thedata
bucket.s3_file = S3ListOperator( task_id='list_3s_files', bucket='data', prefix='customers/2018/04/', delimiter='/', aws_conn_id='aws_customers_conn' )
S3ToGoogleCloudStorageOperator¶
-
class
airflow.contrib.operators.s3_to_gcs_operator.
S3ToGoogleCloudStorageOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.s3_list_operator.S3ListOperator
Synchronizes an S3 key, possibly a prefix, with a Google Cloud Storage destination path.
Parameters: - bucket (string) – The S3 bucket where to find the objects. (templated)
- prefix (string) – Prefix string which filters objects whose name begin with such prefix. (templated)
- delimiter (string) – the delimiter marks key hierarchy. (templated)
- aws_conn_id (string) – The source S3 connection
- dest_gcs_conn_id (string) – The destination connection ID to use when connecting to Google Cloud Storage.
- dest_gcs (string) – The destination Google Cloud Storage bucket and prefix where you want to store the files. (templated)
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- replace (bool) – Whether you want to replace existing destination files or not.
Parame verify: Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values: - False: do not validate SSL certificates. SSL will still be used
(unless use_ssl is False), but SSL certificates will not be verified.
- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
- You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
Example:
s3_to_gcs_op = S3ToGoogleCloudStorageOperator( task_id='s3_to_gcs_example', bucket='my-s3-bucket', prefix='data/customers-201804', dest_gcs_conn_id='google_cloud_default', dest_gcs='gs://my.gcs.bucket/some/customers/', replace=False, dag=my-dag)
Note that
bucket
,prefix
,delimiter
anddest_gcs
are templated, so you can use variables in them if you wish.
S3ToGoogleCloudStorageTransferOperator¶
S3ToHiveTransfer¶
-
class
airflow.operators.s3_to_hive_operator.
S3ToHiveTransfer
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Moves data from S3 to Hive. The operator downloads a file from S3, stores the file locally before loading it into a Hive table. If the
create
orrecreate
arguments are set toTrue
, aCREATE TABLE
andDROP TABLE
statements are generated. Hive data types are inferred from the cursor’s metadata from.Note that the table generated in Hive uses
STORED AS textfile
which isn’t the most efficient serialization format. If a large amount of data is loaded and/or if the tables gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using aHiveOperator
.Parameters: - s3_key (str) – The key to be retrieved from S3. (templated)
- field_dict (dict) – A dictionary of the fields name in the file as keys and their Hive types as values
- hive_table (str) – target Hive table, use dot notation to target a specific database. (templated)
- create (bool) – whether to create the table if it doesn’t exist
- recreate (bool) – whether to drop and recreate the table at every execution
- partition (dict) – target partition as a dict of partition columns and values. (templated)
- headers (bool) – whether the file contains column names on the first line
- check_headers (bool) – whether the column names on the first line should be checked against the keys of field_dict
- wildcard_match (bool) – whether the s3_key should be interpreted as a Unix wildcard pattern
- delimiter (str) – field delimiter in the file
- aws_conn_id (str) – source s3 connection
- hive_cli_conn_id (str) – destination hive connection
- input_compressed (bool) – Boolean to determine if file decompression is required to process headers
- tblproperties (dict) – TBLPROPERTIES of the hive table being created
- select_expression (str) – S3 Select expression
Parame verify: Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values: - False: do not validate SSL certificates. SSL will still be used
(unless use_ssl is False), but SSL certificates will not be verified.
- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
- You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
AWS EC2 Container Service¶
- ECSOperator : Execute a task on AWS EC2 Container Service.
ECSOperator¶
-
class
airflow.contrib.operators.ecs_operator.
ECSOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Execute a task on AWS EC2 Container Service
Parameters: - task_definition (str) – the task definition name on EC2 Container Service
- cluster (str) – the cluster name on EC2 Container Service
- overrides (dict) – the same parameter that boto3 will receive (templated): http://boto3.readthedocs.org/en/latest/reference/services/ecs.html#ECS.Client.run_task
- aws_conn_id (str) – connection id of AWS credentials / region name. If None, credential boto3 strategy will be used (http://boto3.readthedocs.io/en/latest/guide/configuration.html).
- region_name (str) – region name to use in AWS Hook. Override the region_name in connection (if provided)
- launch_type (str) – the launch type on which to run your task (‘EC2’ or ‘FARGATE’)
AWS Batch Service¶
- AWSBatchOperator : Execute a task on AWS Batch Service.
AWSBatchOperator¶
-
class
airflow.contrib.operators.awsbatch_operator.
AWSBatchOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Execute a job on AWS Batch Service
Parameters: - job_name (str) – the name for the job that will run on AWS Batch
- job_definition (str) – the job definition name on AWS Batch
- job_queue (str) – the queue name on AWS Batch
- overrides (dict) – the same parameter that boto3 will receive on containerOverrides (templated): http://boto3.readthedocs.io/en/latest/reference/services/batch.html#submit_job
- max_retries (int) – exponential backoff retries while waiter is not merged, 4200 = 48 hours
- aws_conn_id (str) – connection id of AWS credentials / region name. If None, credential boto3 strategy will be used (http://boto3.readthedocs.io/en/latest/guide/configuration.html).
- region_name (str) – region name to use in AWS Hook. Override the region_name in connection (if provided)
AWS RedShift¶
- AwsRedshiftClusterSensor : Waits for a Redshift cluster to reach a specific status.
- RedshiftHook : Interact with AWS Redshift, using the boto3 library.
- RedshiftToS3Transfer : Executes an unload command to S3 as CSV with or without headers.
- S3ToRedshiftTransfer : Executes an copy command from S3 as CSV with or without headers.
AwsRedshiftClusterSensor¶
-
class
airflow.contrib.sensors.aws_redshift_cluster_sensor.
AwsRedshiftClusterSensor
(**kwargs)[source]¶ Bases:
airflow.sensors.base_sensor_operator.BaseSensorOperator
Waits for a Redshift cluster to reach a specific status.
Parameters: - cluster_identifier (str) – The identifier for the cluster being pinged.
- target_status (str) – The cluster status desired.
RedshiftHook¶
-
class
airflow.contrib.hooks.redshift_hook.
RedshiftHook
(aws_conn_id='aws_default', verify=None)[source]¶ Bases:
airflow.contrib.hooks.aws_hook.AwsHook
Interact with AWS Redshift, using the boto3 library
-
cluster_status
(cluster_identifier)[source]¶ Return status of a cluster
Parameters: cluster_identifier (str) – unique identifier of a cluster
-
create_cluster_snapshot
(snapshot_identifier, cluster_identifier)[source]¶ Creates a snapshot of a cluster
Parameters: - snapshot_identifier (str) – unique identifier for a snapshot of a cluster
- cluster_identifier (str) – unique identifier of a cluster
-
delete_cluster
(cluster_identifier, skip_final_cluster_snapshot=True, final_cluster_snapshot_identifier='')[source]¶ Delete a cluster and optionally create a snapshot
Parameters: - cluster_identifier (str) – unique identifier of a cluster
- skip_final_cluster_snapshot (bool) – determines cluster snapshot creation
- final_cluster_snapshot_identifier (str) – name of final cluster snapshot
-
RedshiftToS3Transfer¶
-
class
airflow.operators.redshift_to_s3_operator.
RedshiftToS3Transfer
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Executes an UNLOAD command to s3 as a CSV with headers
Parameters: - schema (string) – reference to a specific schema in redshift database
- table (string) – reference to a specific table in redshift database
- s3_bucket (string) – reference to a specific S3 bucket
- s3_key (string) – reference to a specific S3 key
- redshift_conn_id (string) – reference to a specific redshift database
- aws_conn_id (string) – reference to a specific S3 connection
- unload_options (list) – reference to a list of UNLOAD options
Parame verify: Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values: - False: do not validate SSL certificates. SSL will still be used
(unless use_ssl is False), but SSL certificates will not be verified.
- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
- You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
S3ToRedshiftTransfer¶
-
class
airflow.operators.s3_to_redshift_operator.
S3ToRedshiftTransfer
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Executes an COPY command to load files from s3 to Redshift
Parameters: - schema (string) – reference to a specific schema in redshift database
- table (string) – reference to a specific table in redshift database
- s3_bucket (string) – reference to a specific S3 bucket
- s3_key (string) – reference to a specific S3 key
- redshift_conn_id (string) – reference to a specific redshift database
- aws_conn_id (string) – reference to a specific S3 connection
- copy_options (list) – reference to a list of COPY options
Parame verify: Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values: - False: do not validate SSL certificates. SSL will still be used
(unless use_ssl is False), but SSL certificates will not be verified.
- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
- You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
Amazon SageMaker¶
For more instructions on using Amazon SageMaker in Airflow, please see the SageMaker Python SDK README.
- SageMakerHook : Interact with Amazon SageMaker.
- SageMakerTrainingOperator : Create a SageMaker training job.
- SageMakerTuningOperator : Create a SageMaker tuning job.
- SageMakerModelOperator : Create a SageMaker model.
- SageMakerTransformOperator : Create a SageMaker transform job.
- SageMakerEndpointConfigOperator : Create a SageMaker endpoint config.
- SageMakerEndpointOperator : Create a SageMaker endpoint.
SageMakerHook¶
-
class
airflow.contrib.hooks.sagemaker_hook.
SageMakerHook
(*args, **kwargs)[source]¶ Bases:
airflow.contrib.hooks.aws_hook.AwsHook
Interact with Amazon SageMaker.
-
check_s3_url
(s3url)[source]¶ Check if an S3 URL exists
Parameters: s3url (str) – S3 url Return type: bool
-
check_status
(job_name, key, describe_function, check_interval, max_ingestion_time, non_terminal_states=None)[source]¶ Check status of a SageMaker job
Parameters: - job_name (str) – name of the job to check status
- key (str) – the key of the response dict that points to the state
- describe_function (python callable) – the function used to retrieve the status
- args – the arguments for the function
- check_interval (int) – the time interval in seconds which the operator will check the status of any SageMaker job
- max_ingestion_time (int) – the maximum ingestion time in seconds. Any SageMaker jobs that run longer than this will fail. Setting this to None implies no timeout for any SageMaker job.
- non_terminal_states (set) – the set of nonterminal states
Returns: response of describe call after job is done
-
check_training_config
(training_config)[source]¶ Check if a training configuration is valid
Parameters: training_config (dict) – training_config Returns: None
-
check_training_status_with_log
(job_name, non_terminal_states, failed_states, wait_for_completion, check_interval, max_ingestion_time)[source]¶ Display the logs for a given training job, optionally tailing them until the job is complete.
Parameters: - job_name (str) – name of the training job to check status and display logs for
- non_terminal_states (set) – the set of non_terminal states
- failed_states (set) – the set of failed states
- wait_for_completion (bool) – Whether to keep looking for new log entries until the job completes
- check_interval (int) – The interval in seconds between polling for new log entries and job completion
- max_ingestion_time (int) – the maximum ingestion time in seconds. Any SageMaker jobs that run longer than this will fail. Setting this to None implies no timeout for any SageMaker job.
Returns: None
-
check_tuning_config
(tuning_config)[source]¶ Check if a tuning configuration is valid
Parameters: tuning_config (dict) – tuning_config Returns: None
-
configure_s3_resources
(config)[source]¶ Extract the S3 operations from the configuration and execute them.
Parameters: config (dict) – config of SageMaker operation Return type: dict
-
create_endpoint
(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source]¶ Create an endpoint
Parameters: - config (dict) – the config for endpoint
- wait_for_completion (bool) – if the program should keep running until job finishes
- check_interval (int) – the time interval in seconds which the operator will check the status of any SageMaker job
- max_ingestion_time (int) – the maximum ingestion time in seconds. Any SageMaker jobs that run longer than this will fail. Setting this to None implies no timeout for any SageMaker job.
Returns: A response to endpoint creation
-
create_endpoint_config
(config)[source]¶ Create an endpoint config
Parameters: config (dict) – the config for endpoint-config Returns: A response to endpoint config creation
-
create_model
(config)[source]¶ Create a model job
Parameters: config (dict) – the config for model Returns: A response to model creation
-
create_training_job
(config, wait_for_completion=True, print_log=True, check_interval=30, max_ingestion_time=None)[source]¶ Create a training job
Parameters: - config (dict) – the config for training
- wait_for_completion (bool) – if the program should keep running until job finishes
- check_interval (int) – the time interval in seconds which the operator will check the status of any SageMaker job
- max_ingestion_time (int) – the maximum ingestion time in seconds. Any SageMaker jobs that run longer than this will fail. Setting this to None implies no timeout for any SageMaker job.
Returns: A response to training job creation
-
create_transform_job
(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source]¶ Create a transform job
Parameters: - config (dict) – the config for transform job
- wait_for_completion (bool) – if the program should keep running until job finishes
- check_interval (int) – the time interval in seconds which the operator will check the status of any SageMaker job
- max_ingestion_time (int) – the maximum ingestion time in seconds. Any SageMaker jobs that run longer than this will fail. Setting this to None implies no timeout for any SageMaker job.
Returns: A response to transform job creation
-
create_tuning_job
(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source]¶ Create a tuning job
Parameters: - config (dict) – the config for tuning
- wait_for_completion – if the program should keep running until job finishes
- wait_for_completion – bool
- check_interval (int) – the time interval in seconds which the operator will check the status of any SageMaker job
- max_ingestion_time (int) – the maximum ingestion time in seconds. Any SageMaker jobs that run longer than this will fail. Setting this to None implies no timeout for any SageMaker job.
Returns: A response to tuning job creation
-
describe_endpoint
(name)[source]¶ Parameters: name (string) – the name of the endpoint Returns: A dict contains all the endpoint info
-
describe_endpoint_config
(name)[source]¶ Return the endpoint config info associated with the name
Parameters: name (string) – the name of the endpoint config Returns: A dict contains all the endpoint config info
-
describe_model
(name)[source]¶ Return the SageMaker model info associated with the name
Parameters: name (string) – the name of the SageMaker model Returns: A dict contains all the model info
-
describe_training_job
(name)[source]¶ Return the training job info associated with the name
Parameters: name (str) – the name of the training job Returns: A dict contains all the training job info
-
describe_training_job_with_log
(job_name, positions, stream_names, instance_count, state, last_description, last_describe_job_call)[source]¶ Return the training job info associated with job_name and print CloudWatch logs
-
describe_transform_job
(name)[source]¶ Return the transform job info associated with the name
Parameters: name (string) – the name of the transform job Returns: A dict contains all the transform job info
-
describe_tuning_job
(name)[source]¶ Return the tuning job info associated with the name
Parameters: name (string) – the name of the tuning job Returns: A dict contains all the tuning job info
-
get_conn
()[source]¶ Establish an AWS connection for SageMaker
Return type: SageMaker.Client
-
get_log_conn
()[source]¶ Establish an AWS connection for retrieving logs during training
Return type: CloudWatchLog.Client
-
log_stream
(log_group, stream_name, start_time=0, skip=0)[source]¶ A generator for log items in a single stream. This will yield all the items that are available at the current moment.
Parameters: - log_group (str) – The name of the log group.
- stream_name (str) – The name of the specific stream.
- start_time (int) – The time stamp value to start reading the logs from (default: 0).
- skip (int) – The number of log entries to skip at the start (default: 0). This is for when there are multiple entries at the same timestamp.
Return type: dict
Returns: A CloudWatch log event with the following key-value pairs:’timestamp’ (int): The time in milliseconds of the event.’message’ (str): The log event data.’ingestionTime’ (int): The time in milliseconds the event was ingested.
-
multi_stream_iter
(log_group, streams, positions=None)[source]¶ Iterate over the available events coming from a set of log streams in a single log group interleaving the events from each stream so they’re yielded in timestamp order.
Parameters: - log_group (str) – The name of the log group.
- streams (list) – A list of the log stream names. The position of the stream in this list is the stream number.
- positions (list) – A list of pairs of (timestamp, skip) which represents the last record read from each stream.
Returns: A tuple of (stream number, cloudwatch log event).
-
tar_and_s3_upload
(path, key, bucket)[source]¶ Tar the local file or directory and upload to s3
Parameters: - path (str) – local file or directory
- key (str) – s3 key
- bucket (str) – s3 bucket
Returns: None
-
update_endpoint
(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source]¶ Update an endpoint
Parameters: - config (dict) – the config for endpoint
- wait_for_completion (bool) – if the program should keep running until job finishes
- check_interval (int) – the time interval in seconds which the operator will check the status of any SageMaker job
- max_ingestion_time (int) – the maximum ingestion time in seconds. Any SageMaker jobs that run longer than this will fail. Setting this to None implies no timeout for any SageMaker job.
Returns: A response to endpoint update
-
SageMakerTrainingOperator¶
-
class
airflow.contrib.operators.sagemaker_training_operator.
SageMakerTrainingOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator
Initiate a SageMaker training job.
This operator returns The ARN of the training job created in Amazon SageMaker.
Parameters: - config (dict) –
The configuration necessary to start a training job (templated).
For details of the configuration parameter see
SageMaker.Client.create_training_job()
- aws_conn_id (str) – The AWS connection ID to use.
- wait_for_completion (bool) – If wait is set to True, the time interval, in seconds, that the operation waits to check the status of the training job.
- print_log (bool) – if the operator should print the cloudwatch log during training
- check_interval (int) – if wait is set to be true, this is the time interval in seconds which the operator will check the status of the training job
- max_ingestion_time (int) – If wait is set to True, the operation fails if the training job doesn’t finish within max_ingestion_time seconds. If you set this parameter to None, the operation does not timeout.
- config (dict) –
SageMakerTuningOperator¶
-
class
airflow.contrib.operators.sagemaker_tuning_operator.
SageMakerTuningOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator
Initiate a SageMaker hyperparameter tuning job.
This operator returns The ARN of the tuning job created in Amazon SageMaker.
Parameters: - config (dict) –
The configuration necessary to start a tuning job (templated).
For details of the configuration parameter see
SageMaker.Client.create_hyper_parameter_tuning_job()
- aws_conn_id (str) – The AWS connection ID to use.
- wait_for_completion (bool) – Set to True to wait until the tuning job finishes.
- check_interval (int) – If wait is set to True, the time interval, in seconds, that this operation waits to check the status of the tuning job.
- max_ingestion_time (int) – If wait is set to True, the operation fails if the tuning job doesn’t finish within max_ingestion_time seconds. If you set this parameter to None, the operation does not timeout.
- config (dict) –
SageMakerModelOperator¶
-
class
airflow.contrib.operators.sagemaker_model_operator.
SageMakerModelOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator
Create a SageMaker model.
This operator returns The ARN of the model created in Amazon SageMaker
Parameters: - config (dict) –
The configuration necessary to create a model.
For details of the configuration parameter see
SageMaker.Client.create_model()
- aws_conn_id (str) – The AWS connection ID to use.
- config (dict) –
SageMakerTransformOperator¶
-
class
airflow.contrib.operators.sagemaker_transform_operator.
SageMakerTransformOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator
Initiate a SageMaker transform job.
This operator returns The ARN of the model created in Amazon SageMaker.
Parameters: - config (dict) –
The configuration necessary to start a transform job (templated).
If you need to create a SageMaker transform job based on an existed SageMaker model:
config = transform_config
If you need to create both SageMaker model and SageMaker Transform job:
config = { 'Model': model_config, 'Transform': transform_config }
For details of the configuration parameter of transform_config see
SageMaker.Client.create_transform_job()
For details of the configuration parameter of model_config, See:
SageMaker.Client.create_model()
- aws_conn_id (string) – The AWS connection ID to use.
- wait_for_completion (bool) – Set to True to wait until the transform job finishes.
- check_interval (int) – If wait is set to True, the time interval, in seconds, that this operation waits to check the status of the transform job.
- max_ingestion_time (int) – If wait is set to True, the operation fails if the transform job doesn’t finish within max_ingestion_time seconds. If you set this parameter to None, the operation does not timeout.
- config (dict) –
SageMakerEndpointConfigOperator¶
-
class
airflow.contrib.operators.sagemaker_endpoint_config_operator.
SageMakerEndpointConfigOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator
Create a SageMaker endpoint config.
This operator returns The ARN of the endpoint config created in Amazon SageMaker
Parameters: - config (dict) –
The configuration necessary to create an endpoint config.
For details of the configuration parameter see
SageMaker.Client.create_endpoint_config()
- aws_conn_id (str) – The AWS connection ID to use.
- config (dict) –
SageMakerEndpointOperator¶
-
class
airflow.contrib.operators.sagemaker_endpoint_operator.
SageMakerEndpointOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator
Create a SageMaker endpoint.
This operator returns The ARN of the endpoint created in Amazon SageMaker
Parameters: - config (dict) –
The configuration necessary to create an endpoint.
If you need to create a SageMaker endpoint based on an existed SageMaker model and an existed SageMaker endpoint config:
config = endpoint_configuration;
If you need to create all of SageMaker model, SageMaker endpoint-config and SageMaker endpoint:
config = { 'Model': model_configuration, 'EndpointConfig': endpoint_config_configuration, 'Endpoint': endpoint_configuration }
For details of the configuration parameter of model_configuration see
SageMaker.Client.create_model()
For details of the configuration parameter of endpoint_config_configuration see
SageMaker.Client.create_endpoint_config()
For details of the configuration parameter of endpoint_configuration see
SageMaker.Client.create_endpoint()
- aws_conn_id (str) – The AWS connection ID to use.
- wait_for_completion (bool) – Whether the operator should wait until the endpoint creation finishes.
- check_interval (int) – If wait is set to True, this is the time interval, in seconds, that this operation waits before polling the status of the endpoint creation.
- max_ingestion_time (int) – If wait is set to True, this operation fails if the endpoint creation doesn’t finish within max_ingestion_time seconds. If you set this parameter to None it never times out.
- operation (str) – Whether to create an endpoint or update an endpoint. Must be either ‘create or ‘update’.
- config (dict) –
Amazon SageMaker¶
For more instructions on using Amazon SageMaker in Airflow, please see the SageMaker Python SDK README.
- SageMakerHook : Interact with Amazon SageMaker.
- SageMakerTrainingOperator : Create a SageMaker training job.
- SageMakerTuningOperator : Create a SageMaker tuning job.
- SageMakerModelOperator : Create a SageMaker model.
- SageMakerTransformOperator : Create a SageMaker transform job.
- SageMakerEndpointConfigOperator : Create a SageMaker endpoint config.
- SageMakerEndpointOperator : Create a SageMaker endpoint.
SageMakerHook¶
-
class
airflow.contrib.hooks.sagemaker_hook.
SageMakerHook
(*args, **kwargs)[source] Bases:
airflow.contrib.hooks.aws_hook.AwsHook
Interact with Amazon SageMaker.
-
check_s3_url
(s3url)[source] Check if an S3 URL exists
Parameters: s3url (str) – S3 url Return type: bool
-
check_status
(job_name, key, describe_function, check_interval, max_ingestion_time, non_terminal_states=None)[source] Check status of a SageMaker job
Parameters: - job_name (str) – name of the job to check status
- key (str) – the key of the response dict that points to the state
- describe_function (python callable) – the function used to retrieve the status
- args – the arguments for the function
- check_interval (int) – the time interval in seconds which the operator will check the status of any SageMaker job
- max_ingestion_time (int) – the maximum ingestion time in seconds. Any SageMaker jobs that run longer than this will fail. Setting this to None implies no timeout for any SageMaker job.
- non_terminal_states (set) – the set of nonterminal states
Returns: response of describe call after job is done
-
check_training_config
(training_config)[source] Check if a training configuration is valid
Parameters: training_config (dict) – training_config Returns: None
-
check_training_status_with_log
(job_name, non_terminal_states, failed_states, wait_for_completion, check_interval, max_ingestion_time)[source] Display the logs for a given training job, optionally tailing them until the job is complete.
Parameters: - job_name (str) – name of the training job to check status and display logs for
- non_terminal_states (set) – the set of non_terminal states
- failed_states (set) – the set of failed states
- wait_for_completion (bool) – Whether to keep looking for new log entries until the job completes
- check_interval (int) – The interval in seconds between polling for new log entries and job completion
- max_ingestion_time (int) – the maximum ingestion time in seconds. Any SageMaker jobs that run longer than this will fail. Setting this to None implies no timeout for any SageMaker job.
Returns: None
-
check_tuning_config
(tuning_config)[source] Check if a tuning configuration is valid
Parameters: tuning_config (dict) – tuning_config Returns: None
-
configure_s3_resources
(config)[source] Extract the S3 operations from the configuration and execute them.
Parameters: config (dict) – config of SageMaker operation Return type: dict
-
create_endpoint
(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source] Create an endpoint
Parameters: - config (dict) – the config for endpoint
- wait_for_completion (bool) – if the program should keep running until job finishes
- check_interval (int) – the time interval in seconds which the operator will check the status of any SageMaker job
- max_ingestion_time (int) – the maximum ingestion time in seconds. Any SageMaker jobs that run longer than this will fail. Setting this to None implies no timeout for any SageMaker job.
Returns: A response to endpoint creation
-
create_endpoint_config
(config)[source] Create an endpoint config
Parameters: config (dict) – the config for endpoint-config Returns: A response to endpoint config creation
-
create_model
(config)[source] Create a model job
Parameters: config (dict) – the config for model Returns: A response to model creation
-
create_training_job
(config, wait_for_completion=True, print_log=True, check_interval=30, max_ingestion_time=None)[source] Create a training job
Parameters: - config (dict) – the config for training
- wait_for_completion (bool) – if the program should keep running until job finishes
- check_interval (int) – the time interval in seconds which the operator will check the status of any SageMaker job
- max_ingestion_time (int) – the maximum ingestion time in seconds. Any SageMaker jobs that run longer than this will fail. Setting this to None implies no timeout for any SageMaker job.
Returns: A response to training job creation
-
create_transform_job
(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source] Create a transform job
Parameters: - config (dict) – the config for transform job
- wait_for_completion (bool) – if the program should keep running until job finishes
- check_interval (int) – the time interval in seconds which the operator will check the status of any SageMaker job
- max_ingestion_time (int) – the maximum ingestion time in seconds. Any SageMaker jobs that run longer than this will fail. Setting this to None implies no timeout for any SageMaker job.
Returns: A response to transform job creation
-
create_tuning_job
(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source] Create a tuning job
Parameters: - config (dict) – the config for tuning
- wait_for_completion – if the program should keep running until job finishes
- wait_for_completion – bool
- check_interval (int) – the time interval in seconds which the operator will check the status of any SageMaker job
- max_ingestion_time (int) – the maximum ingestion time in seconds. Any SageMaker jobs that run longer than this will fail. Setting this to None implies no timeout for any SageMaker job.
Returns: A response to tuning job creation
-
describe_endpoint
(name)[source] Parameters: name (string) – the name of the endpoint Returns: A dict contains all the endpoint info
-
describe_endpoint_config
(name)[source] Return the endpoint config info associated with the name
Parameters: name (string) – the name of the endpoint config Returns: A dict contains all the endpoint config info
-
describe_model
(name)[source] Return the SageMaker model info associated with the name
Parameters: name (string) – the name of the SageMaker model Returns: A dict contains all the model info
-
describe_training_job
(name)[source] Return the training job info associated with the name
Parameters: name (str) – the name of the training job Returns: A dict contains all the training job info
-
describe_training_job_with_log
(job_name, positions, stream_names, instance_count, state, last_description, last_describe_job_call)[source] Return the training job info associated with job_name and print CloudWatch logs
-
describe_transform_job
(name)[source] Return the transform job info associated with the name
Parameters: name (string) – the name of the transform job Returns: A dict contains all the transform job info
-
describe_tuning_job
(name)[source] Return the tuning job info associated with the name
Parameters: name (string) – the name of the tuning job Returns: A dict contains all the tuning job info
-
get_conn
()[source] Establish an AWS connection for SageMaker
Return type: SageMaker.Client
-
get_log_conn
()[source] Establish an AWS connection for retrieving logs during training
Return type: CloudWatchLog.Client
-
log_stream
(log_group, stream_name, start_time=0, skip=0)[source] A generator for log items in a single stream. This will yield all the items that are available at the current moment.
Parameters: - log_group (str) – The name of the log group.
- stream_name (str) – The name of the specific stream.
- start_time (int) – The time stamp value to start reading the logs from (default: 0).
- skip (int) – The number of log entries to skip at the start (default: 0). This is for when there are multiple entries at the same timestamp.
Return type: dict
Returns: A CloudWatch log event with the following key-value pairs:’timestamp’ (int): The time in milliseconds of the event.’message’ (str): The log event data.’ingestionTime’ (int): The time in milliseconds the event was ingested.
-
multi_stream_iter
(log_group, streams, positions=None)[source] Iterate over the available events coming from a set of log streams in a single log group interleaving the events from each stream so they’re yielded in timestamp order.
Parameters: - log_group (str) – The name of the log group.
- streams (list) – A list of the log stream names. The position of the stream in this list is the stream number.
- positions (list) – A list of pairs of (timestamp, skip) which represents the last record read from each stream.
Returns: A tuple of (stream number, cloudwatch log event).
-
tar_and_s3_upload
(path, key, bucket)[source] Tar the local file or directory and upload to s3
Parameters: - path (str) – local file or directory
- key (str) – s3 key
- bucket (str) – s3 bucket
Returns: None
-
update_endpoint
(config, wait_for_completion=True, check_interval=30, max_ingestion_time=None)[source] Update an endpoint
Parameters: - config (dict) – the config for endpoint
- wait_for_completion (bool) – if the program should keep running until job finishes
- check_interval (int) – the time interval in seconds which the operator will check the status of any SageMaker job
- max_ingestion_time (int) – the maximum ingestion time in seconds. Any SageMaker jobs that run longer than this will fail. Setting this to None implies no timeout for any SageMaker job.
Returns: A response to endpoint update
-
SageMakerTrainingOperator¶
-
class
airflow.contrib.operators.sagemaker_training_operator.
SageMakerTrainingOperator
(**kwargs)[source] Bases:
airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator
Initiate a SageMaker training job.
This operator returns The ARN of the training job created in Amazon SageMaker.
Parameters: - config (dict) –
The configuration necessary to start a training job (templated).
For details of the configuration parameter see
SageMaker.Client.create_training_job()
- aws_conn_id (str) – The AWS connection ID to use.
- wait_for_completion (bool) – If wait is set to True, the time interval, in seconds, that the operation waits to check the status of the training job.
- print_log (bool) – if the operator should print the cloudwatch log during training
- check_interval (int) – if wait is set to be true, this is the time interval in seconds which the operator will check the status of the training job
- max_ingestion_time (int) – If wait is set to True, the operation fails if the training job doesn’t finish within max_ingestion_time seconds. If you set this parameter to None, the operation does not timeout.
- config (dict) –
SageMakerTuningOperator¶
-
class
airflow.contrib.operators.sagemaker_tuning_operator.
SageMakerTuningOperator
(**kwargs)[source] Bases:
airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator
Initiate a SageMaker hyperparameter tuning job.
This operator returns The ARN of the tuning job created in Amazon SageMaker.
Parameters: - config (dict) –
The configuration necessary to start a tuning job (templated).
For details of the configuration parameter see
SageMaker.Client.create_hyper_parameter_tuning_job()
- aws_conn_id (str) – The AWS connection ID to use.
- wait_for_completion (bool) – Set to True to wait until the tuning job finishes.
- check_interval (int) – If wait is set to True, the time interval, in seconds, that this operation waits to check the status of the tuning job.
- max_ingestion_time (int) – If wait is set to True, the operation fails if the tuning job doesn’t finish within max_ingestion_time seconds. If you set this parameter to None, the operation does not timeout.
- config (dict) –
SageMakerModelOperator¶
-
class
airflow.contrib.operators.sagemaker_model_operator.
SageMakerModelOperator
(**kwargs)[source] Bases:
airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator
Create a SageMaker model.
This operator returns The ARN of the model created in Amazon SageMaker
Parameters: - config (dict) –
The configuration necessary to create a model.
For details of the configuration parameter see
SageMaker.Client.create_model()
- aws_conn_id (str) – The AWS connection ID to use.
- config (dict) –
SageMakerTransformOperator¶
-
class
airflow.contrib.operators.sagemaker_transform_operator.
SageMakerTransformOperator
(**kwargs)[source] Bases:
airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator
Initiate a SageMaker transform job.
This operator returns The ARN of the model created in Amazon SageMaker.
Parameters: - config (dict) –
The configuration necessary to start a transform job (templated).
If you need to create a SageMaker transform job based on an existed SageMaker model:
config = transform_config
If you need to create both SageMaker model and SageMaker Transform job:
config = { 'Model': model_config, 'Transform': transform_config }
For details of the configuration parameter of transform_config see
SageMaker.Client.create_transform_job()
For details of the configuration parameter of model_config, See:
SageMaker.Client.create_model()
- aws_conn_id (string) – The AWS connection ID to use.
- wait_for_completion (bool) – Set to True to wait until the transform job finishes.
- check_interval (int) – If wait is set to True, the time interval, in seconds, that this operation waits to check the status of the transform job.
- max_ingestion_time (int) – If wait is set to True, the operation fails if the transform job doesn’t finish within max_ingestion_time seconds. If you set this parameter to None, the operation does not timeout.
- config (dict) –
SageMakerEndpointConfigOperator¶
-
class
airflow.contrib.operators.sagemaker_endpoint_config_operator.
SageMakerEndpointConfigOperator
(**kwargs)[source] Bases:
airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator
Create a SageMaker endpoint config.
This operator returns The ARN of the endpoint config created in Amazon SageMaker
Parameters: - config (dict) –
The configuration necessary to create an endpoint config.
For details of the configuration parameter see
SageMaker.Client.create_endpoint_config()
- aws_conn_id (str) – The AWS connection ID to use.
- config (dict) –
SageMakerEndpointOperator¶
-
class
airflow.contrib.operators.sagemaker_endpoint_operator.
SageMakerEndpointOperator
(**kwargs)[source] Bases:
airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator
Create a SageMaker endpoint.
This operator returns The ARN of the endpoint created in Amazon SageMaker
Parameters: - config (dict) –
The configuration necessary to create an endpoint.
If you need to create a SageMaker endpoint based on an existed SageMaker model and an existed SageMaker endpoint config:
config = endpoint_configuration;
If you need to create all of SageMaker model, SageMaker endpoint-config and SageMaker endpoint:
config = { 'Model': model_configuration, 'EndpointConfig': endpoint_config_configuration, 'Endpoint': endpoint_configuration }
For details of the configuration parameter of model_configuration see
SageMaker.Client.create_model()
For details of the configuration parameter of endpoint_config_configuration see
SageMaker.Client.create_endpoint_config()
For details of the configuration parameter of endpoint_configuration see
SageMaker.Client.create_endpoint()
- aws_conn_id (str) – The AWS connection ID to use.
- wait_for_completion (bool) – Whether the operator should wait until the endpoint creation finishes.
- check_interval (int) – If wait is set to True, this is the time interval, in seconds, that this operation waits before polling the status of the endpoint creation.
- max_ingestion_time (int) – If wait is set to True, this operation fails if the endpoint creation doesn’t finish within max_ingestion_time seconds. If you set this parameter to None it never times out.
- operation (str) – Whether to create an endpoint or update an endpoint. Must be either ‘create or ‘update’.
- config (dict) –
Databricks¶
Databricks has contributed an Airflow operator which enables
submitting runs to the Databricks platform. Internally the operator talks to the
api/2.0/jobs/runs/submit
endpoint.
DatabricksSubmitRunOperator¶
-
class
airflow.contrib.operators.databricks_operator.
DatabricksSubmitRunOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Submits a Spark job run to Databricks using the api/2.0/jobs/runs/submit API endpoint.
There are two ways to instantiate this operator.
In the first way, you can take the JSON payload that you typically use to call the
api/2.0/jobs/runs/submit
endpoint and pass it directly to ourDatabricksSubmitRunOperator
through thejson
parameter. For examplejson = { 'new_cluster': { 'spark_version': '2.1.0-db3-scala2.11', 'num_workers': 2 }, 'notebook_task': { 'notebook_path': '/Users/airflow@example.com/PrepareData', }, } notebook_run = DatabricksSubmitRunOperator(task_id='notebook_run', json=json)
Another way to accomplish the same thing is to use the named parameters of the
DatabricksSubmitRunOperator
directly. Note that there is exactly one named parameter for each top level parameter in theruns/submit
endpoint. In this method, your code would look like this:new_cluster = { 'spark_version': '2.1.0-db3-scala2.11', 'num_workers': 2 } notebook_task = { 'notebook_path': '/Users/airflow@example.com/PrepareData', } notebook_run = DatabricksSubmitRunOperator( task_id='notebook_run', new_cluster=new_cluster, notebook_task=notebook_task)
In the case where both the json parameter AND the named parameters are provided, they will be merged together. If there are conflicts during the merge, the named parameters will take precedence and override the top level
json
keys.- Currently the named parameters that
DatabricksSubmitRunOperator
supports are spark_jar_task
notebook_task
new_cluster
existing_cluster_id
libraries
run_name
timeout_seconds
Parameters: - json (dict) –
A JSON object containing API parameters which will be passed directly to the
api/2.0/jobs/runs/submit
endpoint. The other named parameters (i.e.spark_jar_task
,notebook_task
..) to this operator will be merged with this json dictionary if they are provided. If there are conflicts during the merge, the named parameters will take precedence and override the top level json keys. (templated)See also
For more information about templating see Jinja Templating. https://docs.databricks.com/api/latest/jobs.html#runs-submit
- spark_jar_task (dict) –
The main class and parameters for the JAR task. Note that the actual JAR is specified in the
libraries
. EITHERspark_jar_task
ORnotebook_task
should be specified. This field will be templated. - notebook_task (dict) –
The notebook path and parameters for the notebook task. EITHER
spark_jar_task
ORnotebook_task
should be specified. This field will be templated. - new_cluster (dict) –
Specs for a new cluster on which this task will be run. EITHER
new_cluster
ORexisting_cluster_id
should be specified. This field will be templated. - existing_cluster_id (string) – ID for existing cluster on which to run this task.
EITHER
new_cluster
ORexisting_cluster_id
should be specified. This field will be templated. - libraries (list of dicts) –
Libraries which this run will use. This field will be templated.
- run_name (string) – The run name used for this task.
By default this will be set to the Airflow
task_id
. Thistask_id
is a required parameter of the superclassBaseOperator
. This field will be templated. - timeout_seconds (int32) – The timeout for this run. By default a value of 0 is used which means to have no timeout. This field will be templated.
- databricks_conn_id (string) – The name of the Airflow connection to use.
By default and in the common case this will be
databricks_default
. To use token based authentication, provide the keytoken
in the extra field for the connection. - polling_period_seconds (int) – Controls the rate which we poll for the result of this run. By default the operator will poll every 30 seconds.
- databricks_retry_limit (int) – Amount of times retry if the Databricks backend is unreachable. Its value must be greater than or equal to 1.
- databricks_retry_delay (float) – Number of seconds to wait between retries (it might be a floating point number).
- do_xcom_push (boolean) – Whether we should push run_id and run_page_url to xcom.
- Currently the named parameters that
GCP: Google Cloud Platform¶
Airflow has extensive support for the Google Cloud Platform. But note that most Hooks and Operators are in the contrib section. Meaning that they have a beta status, meaning that they can have breaking changes between minor releases.
See the GCP connection type documentation to configure connections to GCP.
Logging¶
Airflow can be configured to read and write task logs in Google Cloud Storage. See Writing Logs to Google Cloud Storage.
GoogleCloudBaseHook¶
-
class
airflow.contrib.hooks.gcp_api_base_hook.
GoogleCloudBaseHook
(gcp_conn_id='google_cloud_default', delegate_to=None)[source]¶ Bases:
airflow.hooks.base_hook.BaseHook
,airflow.utils.log.logging_mixin.LoggingMixin
A base hook for Google cloud-related hooks. Google cloud has a shared REST API client that is built in the same way no matter which service you use. This class helps construct and authorize the credentials needed to then call googleapiclient.discovery.build() to actually discover and build a client for a Google cloud service.
The class also contains some miscellaneous helper functions.
All hook derived from this base hook use the ‘Google Cloud Platform’ connection type. Three ways of authentication are supported:
Default credentials: Only the ‘Project Id’ is required. You’ll need to have set up default credentials, such as by the
GOOGLE_APPLICATION_DEFAULT
environment variable or from the metadata server on Google Compute Engine.JSON key file: Specify ‘Project Id’, ‘Keyfile Path’ and ‘Scope’.
Legacy P12 key files are not supported.
JSON data provided in the UI: Specify ‘Keyfile JSON’.
-
static
fallback_to_default_project_id
(func)[source]¶ Decorator that provides fallback for Google Cloud Platform project id. If the project is None it will be replaced with the project_id from the service account the Hook is authenticated with. Project id can be specified either via project_id kwarg or via first parameter in positional args.
Parameters: func – function to wrap Returns: result of the function call
-
static
BigQuery¶
BigQuery Operators¶
- BigQueryCheckOperator : Performs checks against a SQL query that will return a single row with different values.
- BigQueryValueCheckOperator : Performs a simple value check using SQL code.
- BigQueryIntervalCheckOperator : Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before.
- BigQueryGetDataOperator : Fetches the data from a BigQuery table and returns data in a python list
- BigQueryCreateEmptyDatasetOperator : Creates an empty BigQuery dataset.
- BigQueryCreateEmptyTableOperator : Creates a new, empty table in the specified BigQuery dataset optionally with schema.
- BigQueryCreateExternalTableOperator : Creates a new, external table in the dataset with the data in Google Cloud Storage.
- BigQueryDeleteDatasetOperator : Deletes an existing BigQuery dataset.
- BigQueryTableDeleteOperator : Deletes an existing BigQuery table.
- BigQueryOperator : Executes BigQuery SQL queries in a specific BigQuery database.
- BigQueryToBigQueryOperator : Copy a BigQuery table to another BigQuery table.
- BigQueryToCloudStorageOperator : Transfers a BigQuery table to a Google Cloud Storage bucket
BigQueryCheckOperator¶
-
class
airflow.contrib.operators.bigquery_check_operator.
BigQueryCheckOperator
(**kwargs)[source]¶ Bases:
airflow.operators.check_operator.CheckOperator
Performs checks against BigQuery. The
BigQueryCheckOperator
expects a sql query that will return a single row. Each value on that first row is evaluated using pythonbool
casting. If any of the values returnFalse
the check is failed and errors out.Note that Python bool casting evals the following as
False
:False
0
- Empty string (
""
) - Empty list (
[]
) - Empty dictionary or set (
{}
)
Given a query like
SELECT COUNT(*) FROM foo
, it will fail only if the count== 0
. You can craft much more complex query that could, for instance, check that the table has the same number of rows as the source table upstream, or that the count of today’s partition is greater than yesterday’s partition, or that a set of metrics are less than 3 standard deviation for the 7 day average.This operator can be used as a data quality check in your pipeline, and depending on where you put it in your DAG, you have the choice to stop the critical path, preventing from publishing dubious data, or on the side and receive email alterts without stopping the progress of the DAG.
Parameters: - sql (string) – the sql to be executed
- bigquery_conn_id (string) – reference to the BigQuery database
- use_legacy_sql (boolean) – Whether to use legacy SQL (true) or standard SQL (false).
BigQueryValueCheckOperator¶
-
class
airflow.contrib.operators.bigquery_check_operator.
BigQueryValueCheckOperator
(**kwargs)[source]¶ Bases:
airflow.operators.check_operator.ValueCheckOperator
Performs a simple value check using sql code.
Parameters: - sql (string) – the sql to be executed
- use_legacy_sql (boolean) – Whether to use legacy SQL (true) or standard SQL (false).
BigQueryIntervalCheckOperator¶
-
class
airflow.contrib.operators.bigquery_check_operator.
BigQueryIntervalCheckOperator
(**kwargs)[source]¶ Bases:
airflow.operators.check_operator.IntervalCheckOperator
Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before.
This method constructs a query like so
SELECT {metrics_threshold_dict_key} FROM {table} WHERE {date_filter_column}=<date>
Parameters: - table (str) – the table name
- days_back (int) – number of days between ds and the ds we want to check against. Defaults to 7 days
- metrics_threshold (dict) – a dictionary of ratios indexed by metrics, for example ‘COUNT(*)’: 1.5 would require a 50 percent or less difference between the current day, and the prior days_back.
- use_legacy_sql (boolean) – Whether to use legacy SQL (true) or standard SQL (false).
BigQueryGetDataOperator¶
-
class
airflow.contrib.operators.bigquery_get_data.
BigQueryGetDataOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Fetches the data from a BigQuery table (alternatively fetch data for selected columns) and returns data in a python list. The number of elements in the returned list will be equal to the number of rows fetched. Each element in the list will again be a list where element would represent the columns values for that row.
Example Result:
[['Tony', '10'], ['Mike', '20'], ['Steve', '15']]
Note
If you pass fields to
selected_fields
which are in different order than the order of columns already in BQ table, the data will still be in the order of BQ table. For example if the BQ table has 3 columns as[A,B,C]
and you pass ‘B,A’ in theselected_fields
the data would still be of the form'A,B'
.Example:
get_data = BigQueryGetDataOperator( task_id='get_data_from_bq', dataset_id='test_dataset', table_id='Transaction_partitions', max_results='100', selected_fields='DATE', bigquery_conn_id='airflow-service-account' )
Parameters: - dataset_id (string) – The dataset ID of the requested table. (templated)
- table_id (string) – The table ID of the requested table. (templated)
- max_results (string) – The maximum number of records (rows) to be fetched from the table. (templated)
- selected_fields (string) – List of fields to return (comma-separated). If unspecified, all fields are returned.
- bigquery_conn_id (string) – reference to a specific BigQuery hook.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
BigQueryCreateEmptyTableOperator¶
-
class
airflow.contrib.operators.bigquery_operator.
BigQueryCreateEmptyTableOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Creates a new, empty table in the specified BigQuery dataset, optionally with schema.
The schema to be used for the BigQuery table may be specified in one of two ways. You may either directly pass the schema fields in, or you may point the operator to a Google cloud storage object name. The object in Google cloud storage must be a JSON file with the schema fields in it. You can also create a table without schema.
Parameters: - project_id (string) – The project to create the table into. (templated)
- dataset_id (string) – The dataset to create the table into. (templated)
- table_id (string) – The Name of the table to be created. (templated)
- schema_fields (list) –
If set, the schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
Example:
schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}]
- gcs_schema_object (string) – Full path to the JSON file containing
schema (templated). For
example:
gs://test-bucket/dir1/dir2/employee_schema.json
- time_partitioning (dict) –
configure optional time partitioning fields i.e. partition by field, type and expiration as per API specifications.
- bigquery_conn_id (string) – Reference to a specific BigQuery hook.
- google_cloud_storage_conn_id (string) – Reference to a specific Google cloud storage hook.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- labels (dict) –
a dictionary containing labels for the table, passed to BigQuery
Example (with schema JSON in GCS):
CreateTable = BigQueryCreateEmptyTableOperator( task_id='BigQueryCreateEmptyTableOperator_task', dataset_id='ODS', table_id='Employees', project_id='internal-gcp-project', gcs_schema_object='gs://schema-bucket/employee_schema.json', bigquery_conn_id='airflow-service-account', google_cloud_storage_conn_id='airflow-service-account' )
Corresponding Schema file (
employee_schema.json
):[ { "mode": "NULLABLE", "name": "emp_name", "type": "STRING" }, { "mode": "REQUIRED", "name": "salary", "type": "INTEGER" } ]
Example (with schema in the DAG):
CreateTable = BigQueryCreateEmptyTableOperator( task_id='BigQueryCreateEmptyTableOperator_task', dataset_id='ODS', table_id='Employees', project_id='internal-gcp-project', schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}], bigquery_conn_id='airflow-service-account', google_cloud_storage_conn_id='airflow-service-account' )
BigQueryCreateExternalTableOperator¶
-
class
airflow.contrib.operators.bigquery_operator.
BigQueryCreateExternalTableOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Creates a new external table in the dataset with the data in Google Cloud Storage.
The schema to be used for the BigQuery table may be specified in one of two ways. You may either directly pass the schema fields in, or you may point the operator to a Google cloud storage object name. The object in Google cloud storage must be a JSON file with the schema fields in it.
Parameters: - bucket (string) – The bucket to point the external table to. (templated)
- source_objects (list) – List of Google cloud storage URIs to point table to. (templated) If source_format is ‘DATASTORE_BACKUP’, the list must only contain a single URI.
- destination_project_dataset_table (string) – The dotted (<project>.)<dataset>.<table> BigQuery table to load data into (templated). If <project> is not included, project will be the project defined in the connection json.
- schema_fields (list) –
If set, the schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
Example:
schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}]
Should not be set when source_format is ‘DATASTORE_BACKUP’.
- schema_object (string) – If set, a GCS object path pointing to a .json file that contains the schema for the table. (templated)
- source_format (string) – File format of the data.
- compression (string) – [Optional] The compression type of the data source. Possible values include GZIP and NONE. The default value is NONE. This setting is ignored for Google Cloud Bigtable, Google Cloud Datastore backups and Avro formats.
- skip_leading_rows (int) – Number of rows to skip when loading from a CSV.
- field_delimiter (string) – The delimiter to use for the CSV.
- max_bad_records (int) – The maximum number of bad records that BigQuery can ignore when running the job.
- quote_character (string) – The value that is used to quote data sections in a CSV file.
- allow_quoted_newlines (boolean) – Whether to allow quoted newlines (true) or not (false).
- allow_jagged_rows (bool) – Accept rows that are missing trailing optional columns. The missing values are treated as nulls. If false, records with missing trailing columns are treated as bad records, and if there are too many bad records, an invalid error is returned in the job result. Only applicable to CSV, ignored for other formats.
- bigquery_conn_id (string) – Reference to a specific BigQuery hook.
- google_cloud_storage_conn_id (string) – Reference to a specific Google cloud storage hook.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- src_fmt_configs (dict) – configure optional fields specific to the source format
:param labels a dictionary containing labels for the table, passed to BigQuery :type labels: dict
BigQueryCreateEmptyDatasetOperator¶
-
class
airflow.contrib.operators.bigquery_operator.
BigQueryCreateEmptyDatasetOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
” This operator is used to create new dataset for your Project in Big query. https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
Parameters: - project_id (str) – The name of the project where we want to create the dataset. Don’t need to provide, if projectId in dataset_reference.
- dataset_id (str) – The id of dataset. Don’t need to provide, if datasetId in dataset_reference.
- dataset_reference – Dataset reference that could be provided with request body. More info: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
BigQueryDeleteDatasetOperator¶
-
class
airflow.contrib.operators.bigquery_operator.
BigQueryDeleteDatasetOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
” This operator deletes an existing dataset from your Project in Big query. https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/delete :param project_id: The project id of the dataset. :type project_id: string :param dataset_id: The dataset to be deleted. :type dataset_id: string
Example:
delete_temp_data = BigQueryDeleteDatasetOperator(dataset_id = 'temp-dataset', project_id = 'temp-project', bigquery_conn_id='_my_gcp_conn_', task_id='Deletetemp', dag=dag)
BigQueryTableDeleteOperator¶
-
class
airflow.contrib.operators.bigquery_table_delete_operator.
BigQueryTableDeleteOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Deletes BigQuery tables
Parameters: - deletion_dataset_table (string) – A dotted (<project>.|<project>:)<dataset>.<table> that indicates which table will be deleted. (templated)
- bigquery_conn_id (string) – reference to a specific BigQuery hook.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- ignore_if_missing (boolean) – if True, then return success even if the requested table does not exist.
BigQueryOperator¶
-
class
airflow.contrib.operators.bigquery_operator.
BigQueryOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Executes BigQuery SQL queries in a specific BigQuery database
Parameters: - bql (Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in '.sql'.) – (Deprecated. Use sql parameter instead) the sql code to be executed (templated)
- sql (Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in '.sql'.) – the sql code to be executed (templated)
- destination_dataset_table (string) – A dotted (<project>.|<project>:)<dataset>.<table> that, if set, will store the results of the query. (templated)
- write_disposition (string) – Specifies the action that occurs if the destination table already exists. (default: ‘WRITE_EMPTY’)
- create_disposition (string) – Specifies whether the job is allowed to create new tables. (default: ‘CREATE_IF_NEEDED’)
- allow_large_results (boolean) – Whether to allow large results.
- flatten_results (boolean) – If true and query uses legacy SQL dialect, flattens
all nested and repeated fields in the query results.
allow_large_results
must betrue
if this is set tofalse
. For standard SQL queries, this flag is ignored and results are never flattened. - bigquery_conn_id (string) – reference to a specific BigQuery hook.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- udf_config (list) – The User Defined Function configuration for the query. See https://cloud.google.com/bigquery/user-defined-functions for details.
- use_legacy_sql (boolean) – Whether to use legacy SQL (true) or standard SQL (false).
- maximum_billing_tier (integer) – Positive integer that serves as a multiplier of the basic price. Defaults to None, in which case it uses the value set in the project.
- maximum_bytes_billed (float) – Limits the bytes billed for this job. Queries that will have bytes billed beyond this limit will fail (without incurring a charge). If unspecified, this will be set to your project default.
- api_resource_configs (dict) – a dictionary that contain params ‘configuration’ applied for Google BigQuery Jobs API: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs for example, {‘query’: {‘useQueryCache’: False}}. You could use it if you need to provide some params that are not supported by BigQueryOperator like args.
- schema_update_options (tuple) – Allows the schema of the destination table to be updated as a side effect of the load job.
- query_params (dict) – a dictionary containing query parameter types and values, passed to BigQuery.
- labels (dict) – a dictionary containing labels for the job/query, passed to BigQuery
- priority (string) – Specifies a priority for the query. Possible values include INTERACTIVE and BATCH. The default value is INTERACTIVE.
- time_partitioning (dict) – configure optional time partitioning fields i.e. partition by field, type and expiration as per API specifications.
- cluster_fields (list of str) – Request that the result of this query be stored sorted by one or more columns. This is only available in conjunction with time_partitioning. The order of columns given determines the sort order.
- location (str) – The geographic location of the job. Required except for US and EU. See details at https://cloud.google.com/bigquery/docs/locations#specifying_your_location
BigQueryToBigQueryOperator¶
-
class
airflow.contrib.operators.bigquery_to_bigquery.
BigQueryToBigQueryOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Copies data from one BigQuery table to another.
See also
For more details about these parameters: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy
Parameters: - source_project_dataset_tables (list|string) – One or more dotted (project:|project.)<dataset>.<table> BigQuery tables to use as the source data. If <project> is not included, project will be the project defined in the connection json. Use a list if there are multiple source tables. (templated)
- destination_project_dataset_table (string) – The destination BigQuery table. Format is: (project:|project.)<dataset>.<table> (templated)
- write_disposition (string) – The write disposition if the table already exists.
- create_disposition (string) – The create disposition if the table doesn’t exist.
- bigquery_conn_id (string) – reference to a specific BigQuery hook.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- labels (dict) – a dictionary containing labels for the job/query, passed to BigQuery
BigQueryToCloudStorageOperator¶
-
class
airflow.contrib.operators.bigquery_to_gcs.
BigQueryToCloudStorageOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Transfers a BigQuery table to a Google Cloud Storage bucket.
See also
For more details about these parameters: https://cloud.google.com/bigquery/docs/reference/v2/jobs
Parameters: - source_project_dataset_table (string) – The dotted
(<project>.|<project>:)<dataset>.<table>
BigQuery table to use as the source data. If <project> is not included, project will be the project defined in the connection json. (templated) - destination_cloud_storage_uris (list) – The destination Google Cloud Storage URI (e.g. gs://some-bucket/some-file.txt). (templated) Follows convention defined here: https://cloud.google.com/bigquery/exporting-data-from-bigquery#exportingmultiple
- compression (string) – Type of compression to use.
- export_format (string) – File format to export.
- field_delimiter (string) – The delimiter to use when extracting to a CSV.
- print_header (boolean) – Whether to print a header for a CSV file extract.
- bigquery_conn_id (string) – reference to a specific BigQuery hook.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- labels (dict) – a dictionary containing labels for the job/query, passed to BigQuery
- source_project_dataset_table (string) – The dotted
BigQueryHook¶
-
class
airflow.contrib.hooks.bigquery_hook.
BigQueryHook
(bigquery_conn_id='bigquery_default', delegate_to=None, use_legacy_sql=True, location=None)[source]¶ Bases:
airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook
,airflow.hooks.dbapi_hook.DbApiHook
,airflow.utils.log.logging_mixin.LoggingMixin
Interact with BigQuery. This hook uses the Google Cloud Platform connection.
-
get_pandas_df
(sql, parameters=None, dialect=None)[source]¶ Returns a Pandas DataFrame for the results produced by a BigQuery query. The DbApiHook method must be overridden because Pandas doesn’t support PEP 249 connections, except for SQLite. See:
https://github.com/pydata/pandas/blob/master/pandas/io/sql.py#L447 https://github.com/pydata/pandas/issues/6900
Parameters: - sql (string) – The BigQuery SQL to execute.
- parameters (mapping or iterable) – The parameters to render the SQL query with (not used, leave to override superclass method)
- dialect (string in {'legacy', 'standard'}) – Dialect of BigQuery SQL – legacy SQL or standard SQL defaults to use self.use_legacy_sql if not specified
-
insert_rows
(table, rows, target_fields=None, commit_every=1000)[source]¶ Insertion is currently unsupported. Theoretically, you could use BigQuery’s streaming API to insert rows into a table, but this hasn’t been implemented.
-
table_exists
(project_id, dataset_id, table_id)[source]¶ Checks for the existence of a table in Google BigQuery.
Parameters: - project_id (string) – The Google cloud project in which to look for the table. The connection supplied to the hook must provide access to the specified project.
- dataset_id (string) – The name of the dataset in which to look for the table.
- table_id (string) – The name of the table to check the existence of.
-
Cloud Spanner¶
Cloud Spanner Operators¶
- CloudSpannerInstanceDatabaseDeleteOperator : deletes an existing database from a Google Cloud Spanner instance or returns success if the database is missing.
- CloudSpannerInstanceDatabaseDeployOperator : creates a new database in a Google Cloud instance or returns success if the database already exists.
- CloudSpannerInstanceDatabaseUpdateOperator : updates the structure of a Google Cloud Spanner database.
- CloudSpannerInstanceDatabaseQueryOperator : executes an arbitrary DML query (INSERT, UPDATE, DELETE).
- CloudSpannerInstanceDeployOperator : creates a new Google Cloud Spanner instance, or if an instance with the same name exists, updates the instance.
- CloudSpannerInstanceDeleteOperator : deletes a Google Cloud Spanner instance.
CloudSpannerInstanceDatabaseDeleteOperator¶
CloudSpannerInstanceDatabaseDeployOperator¶
CloudSpannerInstanceDatabaseUpdateOperator¶
CloudSpannerInstanceDatabaseQueryOperator¶
CloudSpannerInstanceDeployOperator¶
CloudSpannerInstanceDeleteOperator¶
CloudSpannerHook¶
Cloud SQL¶
Cloud SQL Operators¶
- CloudSqlInstanceDatabaseDeleteOperator : deletes a database from a Cloud SQL instance.
- CloudSqlInstanceDatabaseCreateOperator : creates a new database inside a Cloud SQL instance.
- CloudSqlInstanceDatabasePatchOperator : updates a database inside a Cloud SQL instance.
- CloudSqlInstanceDeleteOperator : delete a Cloud SQL instance.
- CloudSqlInstanceExportOperator : exports data from a Cloud SQL instance.
- CloudSqlInstanceImportOperator : imports data into a Cloud SQL instance.
- CloudSqlInstanceCreateOperator : create a new Cloud SQL instance.
- CloudSqlInstancePatchOperator : patch a Cloud SQL instance.
- CloudSqlQueryOperator : run query in a Cloud SQL instance.
CloudSqlInstanceDatabaseDeleteOperator¶
-
class
airflow.contrib.operators.gcp_sql_operator.
CloudSqlInstanceDatabaseDeleteOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.gcp_sql_operator.CloudSqlBaseOperator
Deletes a database from a Cloud SQL instance.
Parameters: - instance (str) – Database instance ID. This does not include the project ID.
- database (str) – Name of the database to be deleted in the instance.
- project_id (str) – Optional, Google Cloud Platform Project ID. If set to None or missing, the default project_id from the GCP connection is used.
- gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
- api_version (str) – API version used (e.g. v1beta4).
CloudSqlInstanceDatabaseCreateOperator¶
-
class
airflow.contrib.operators.gcp_sql_operator.
CloudSqlInstanceDatabaseCreateOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.gcp_sql_operator.CloudSqlBaseOperator
Creates a new database inside a Cloud SQL instance.
Parameters: - instance (str) – Database instance ID. This does not include the project ID.
- body (dict) – The request body, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body
- project_id (str) – Optional, Google Cloud Platform Project ID. If set to None or missing, the default project_id from the GCP connection is used.
- gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
- api_version (str) – API version used (e.g. v1beta4).
- validate_body (bool) – Whether the body should be validated. Defaults to True.
CloudSqlInstanceDatabasePatchOperator¶
-
class
airflow.contrib.operators.gcp_sql_operator.
CloudSqlInstanceDatabasePatchOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.gcp_sql_operator.CloudSqlBaseOperator
Updates a resource containing information about a database inside a Cloud SQL instance using patch semantics. See: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
Parameters: - instance (str) – Database instance ID. This does not include the project ID.
- database (str) – Name of the database to be updated in the instance.
- body (dict) – The request body, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/patch#request-body
- project_id (str) – Optional, Google Cloud Platform Project ID.
- gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
- api_version (str) – API version used (e.g. v1beta4).
- validate_body (bool) – Whether the body should be validated. Defaults to True.
CloudSqlInstanceDeleteOperator¶
-
class
airflow.contrib.operators.gcp_sql_operator.
CloudSqlInstanceDeleteOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.gcp_sql_operator.CloudSqlBaseOperator
Deletes a Cloud SQL instance.
Parameters: - instance (str) – Cloud SQL instance ID. This does not include the project ID.
- project_id (str) – Optional, Google Cloud Platform Project ID. If set to None or missing, the default project_id from the GCP connection is used.
- gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
- api_version (str) – API version used (e.g. v1beta4).
CloudSqlInstanceExportOperator¶
-
class
airflow.contrib.operators.gcp_sql_operator.
CloudSqlInstanceExportOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.gcp_sql_operator.CloudSqlBaseOperator
Exports data from a Cloud SQL instance to a Cloud Storage bucket as a SQL dump or CSV file.
Note: This operator is idempotent. If executed multiple times with the same export file URI, the export file in GCS will simply be overridden.
Parameters: - instance (str) – Cloud SQL instance ID. This does not include the project ID.
- body (dict) – The request body, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/export#request-body
- project_id (str) – Optional, Google Cloud Platform Project ID. If set to None or missing, the default project_id from the GCP connection is used.
- gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
- api_version (str) – API version used (e.g. v1beta4).
- validate_body (bool) – Whether the body should be validated. Defaults to True.
CloudSqlInstanceImportOperator¶
-
class
airflow.contrib.operators.gcp_sql_operator.
CloudSqlInstanceImportOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.gcp_sql_operator.CloudSqlBaseOperator
Imports data into a Cloud SQL instance from a SQL dump or CSV file in Cloud Storage.
CSV IMPORT:
This operator is NOT idempotent for a CSV import. If the same file is imported multiple times, the imported data will be duplicated in the database. Moreover, if there are any unique constraints the duplicate import may result in an error.
SQL IMPORT:
This operator is idempotent for a SQL import if it was also exported by Cloud SQL. The exported SQL contains ‘DROP TABLE IF EXISTS’ statements for all tables to be imported.
If the import file was generated in a different way, idempotence is not guaranteed. It has to be ensured on the SQL file level.
Parameters: - instance (str) – Cloud SQL instance ID. This does not include the project ID.
- body (dict) – The request body, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/export#request-body
- project_id (str) – Optional, Google Cloud Platform Project ID. If set to None or missing, the default project_id from the GCP connection is used.
- gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
- api_version (str) – API version used (e.g. v1beta4).
- validate_body (bool) – Whether the body should be validated. Defaults to True.
CloudSqlInstanceCreateOperator¶
-
class
airflow.contrib.operators.gcp_sql_operator.
CloudSqlInstanceCreateOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.gcp_sql_operator.CloudSqlBaseOperator
Creates a new Cloud SQL instance. If an instance with the same name exists, no action will be taken and the operator will succeed.
Parameters: - body (dict) – Body required by the Cloud SQL insert API, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/insert #request-body
- instance (str) – Cloud SQL instance ID. This does not include the project ID.
- project_id (str) – Optional, Google Cloud Platform Project ID. If set to None or missing, the default project_id from the GCP connection is used.
- gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
- api_version (str) – API version used (e.g. v1beta4).
- validate_body (bool) – True if body should be validated, False otherwise.
CloudSqlInstancePatchOperator¶
-
class
airflow.contrib.operators.gcp_sql_operator.
CloudSqlInstancePatchOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.gcp_sql_operator.CloudSqlBaseOperator
Updates settings of a Cloud SQL instance.
Caution: This is a partial update, so only included values for the settings will be updated.
In the request body, supply the relevant portions of an instance resource, according to the rules of patch semantics. https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
Parameters: - body (dict) – Body required by the Cloud SQL patch API, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/patch#request-body
- instance (str) – Cloud SQL instance ID. This does not include the project ID.
- project_id (str) – Optional, Google Cloud Platform Project ID. If set to None or missing, the default project_id from the GCP connection is used.
- gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform.
- api_version (str) – API version used (e.g. v1beta4).
CloudSqlQueryOperator¶
-
class
airflow.contrib.operators.gcp_sql_operator.
CloudSqlQueryOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Performs DML or DDL query on an existing Cloud Sql instance. It optionally uses cloud-sql-proxy to establish secure connection with the database.
Parameters: - sql (str or [str]) – SQL query or list of queries to run (should be DML or DDL query - this operator does not return any data from the database, so it is useless to pass it DQL queries. Note that it is responsibility of the author of the queries to make sure that the queries are idempotent. For example you can use CREATE TABLE IF NOT EXISTS to create a table.
- parameters (mapping or iterable) – (optional) the parameters to render the SQL query with.
- autocommit (bool) – if True, each command is automatically committed. (default value: False)
- gcp_conn_id (str) – The connection ID used to connect to Google Cloud Platform for cloud-sql-proxy authentication.
- gcp_cloudsql_conn_id (str) – The connection ID used to connect to Google Cloud SQL
its schema should be gcpcloudsql://.
See
CloudSqlDatabaseHook
for details on how to define gcpcloudsql:// connection.
Cloud SQL Hooks¶
-
class
airflow.contrib.hooks.gcp_sql_hook.
CloudSqlHook
(api_version, gcp_conn_id='google_cloud_default', delegate_to=None)[source]¶ Bases:
airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook
Hook for Google Cloud SQL APIs.
-
create_database
(*args, **kwargs)[source]¶ Creates a new database inside a Cloud SQL instance.
Parameters: - instance (str) – Database instance ID. This does not include the project ID.
- body (dict) – The request body, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body.
- project_id (str) – Project ID of the project that contains the instance. If set to None or missing, the default project_id from the GCP connection is used.
Returns: None
-
create_instance
(*args, **kwargs)[source]¶ Creates a new Cloud SQL instance.
Parameters: - body (dict) – Body required by the Cloud SQL insert API, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/insert#request-body.
- project_id (str) – Project ID of the project that contains the instance. If set to None or missing, the default project_id from the GCP connection is used.
Returns: None
-
delete_database
(*args, **kwargs)[source]¶ Deletes a database from a Cloud SQL instance.
Parameters: - instance (str) – Database instance ID. This does not include the project ID.
- database (str) – Name of the database to be deleted in the instance.
- project_id (str) – Project ID of the project that contains the instance. If set to None or missing, the default project_id from the GCP connection is used.
Returns: None
-
delete_instance
(*args, **kwargs)[source]¶ Deletes a Cloud SQL instance.
Parameters: - project_id (str) – Project ID of the project that contains the instance. If set to None or missing, the default project_id from the GCP connection is used.
- instance (str) – Cloud SQL instance ID. This does not include the project ID.
Returns: None
-
export_instance
(*args, **kwargs)[source]¶ Exports data from a Cloud SQL instance to a Cloud Storage bucket as a SQL dump or CSV file.
Parameters: - instance (str) – Database instance ID of the Cloud SQL instance. This does not include the project ID.
- body (dict) – The request body, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/export#request-body
- project_id (str) – Project ID of the project that contains the instance. If set to None or missing, the default project_id from the GCP connection is used.
Returns: None
-
get_conn
()[source]¶ Retrieves connection to Cloud SQL.
Returns: Google Cloud SQL services object. Return type: dict
-
get_database
(*args, **kwargs)[source]¶ Retrieves a database resource from a Cloud SQL instance.
Parameters: - instance (str) – Database instance ID. This does not include the project ID.
- database (str) – Name of the database in the instance.
- project_id (str) – Project ID of the project that contains the instance. If set to None or missing, the default project_id from the GCP connection is used.
Returns: A Cloud SQL database resource, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases#resource.
Return type: dict
-
get_instance
(*args, **kwargs)[source]¶ Retrieves a resource containing information about a Cloud SQL instance.
Parameters: - instance (str) – Database instance ID. This does not include the project ID.
- project_id (str) – Project ID of the project that contains the instance. If set to None or missing, the default project_id from the GCP connection is used.
Returns: A Cloud SQL instance resource.
Return type: dict
-
import_instance
(*args, **kwargs)[source]¶ Imports data into a Cloud SQL instance from a SQL dump or CSV file in Cloud Storage.
Parameters: - instance (str) – Database instance ID. This does not include the project ID.
- body (dict) – The request body, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/export#request-body
- project_id (str) – Project ID of the project that contains the instance. If set to None or missing, the default project_id from the GCP connection is used.
Returns: None
-
patch_database
(*args, **kwargs)[source]¶ Updates a database resource inside a Cloud SQL instance.
This method supports patch semantics. See https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch.
Parameters: - instance (str) – Database instance ID. This does not include the project ID.
- database (str) – Name of the database to be updated in the instance.
- body (dict) – The request body, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body.
- project_id (str) – Project ID of the project that contains the instance. If set to None or missing, the default project_id from the GCP connection is used.
Returns: None
-
patch_instance
(*args, **kwargs)[source]¶ Updates settings of a Cloud SQL instance.
Caution: This is not a partial update, so you must include values for all the settings that you want to retain.
Parameters: - body (dict) – Body required by the Cloud SQL patch API, as described in https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/patch#request-body.
- instance (str) – Cloud SQL instance ID. This does not include the project ID.
- project_id (str) – Project ID of the project that contains the instance. If set to None or missing, the default project_id from the GCP connection is used.
Returns: None
-
-
class
airflow.contrib.hooks.gcp_sql_hook.
CloudSqlDatabaseHook
(gcp_cloudsql_conn_id='google_cloud_sql_default', default_gcp_project_id=None)[source]¶ Bases:
airflow.hooks.base_hook.BaseHook
Serves DB connection configuration for Google Cloud SQL (Connections of gcpcloudsql:// type).
The hook is a “meta” one. It does not perform an actual connection. It is there to retrieve all the parameters configured in gcpcloudsql:// connection, start/stop Cloud SQL Proxy if needed, dynamically generate Postgres or MySQL connection in the database and return an actual Postgres or MySQL hook. The returned Postgres/MySQL hooks are using direct connection or Cloud SQL Proxy socket/TCP as configured.
Main parameters of the hook are retrieved from the standard URI components:
- user - User name to authenticate to the database (from login of the URI).
- password - Password to authenticate to the database (from password of the URI).
- public_ip - IP to connect to for public connection (from host of the URI).
- public_port - Port to connect to for public connection (from port of the URI).
- database - Database to connect to (from schema of the URI).
Remaining parameters are retrieved from the extras (URI query parameters):
- project_id - Optional, Google Cloud Platform project where the Cloud SQL
- instance exists. If missing, default project id passed is used.
- instance - Name of the instance of the Cloud SQL database instance.
- location - The location of the Cloud SQL instance (for example europe-west1).
- database_type - The type of the database instance (MySQL or Postgres).
- use_proxy - (default False) Whether SQL proxy should be used to connect to Cloud SQL DB.
- use_ssl - (default False) Whether SSL should be used to connect to Cloud SQL DB. You cannot use proxy and SSL together.
- sql_proxy_use_tcp - (default False) If set to true, TCP is used to connect via proxy, otherwise UNIX sockets are used.
- sql_proxy_binary_path - Optional path to Cloud SQL Proxy binary. If the binary is not specified or the binary is not present, it is automatically downloaded.
- sql_proxy_version - Specific version of the proxy to download (for example v1.13). If not specified, the latest version is downloaded.
- sslcert - Path to client certificate to authenticate when SSL is used.
- sslkey - Path to client private key to authenticate when SSL is used.
- sslrootcert - Path to server’s certificate to authenticate when SSL is used.
Parameters: - gcp_cloudsql_conn_id (str) – URL of the connection
- default_gcp_project_id (str) – Default project id used if project_id not specified in the connection URL
-
create_connection
(**kwargs)[source]¶ Create connection in the Connection table, according to whether it uses proxy, TCP, UNIX sockets, SSL. Connection ID will be randomly generated.
Parameters: session – Session of the SQL Alchemy ORM (automatically generated with decorator).
-
delete_connection
(**kwargs)[source]¶ Delete the dynamically created connection from the Connection table.
Parameters: session – Session of the SQL Alchemy ORM (automatically generated with decorator).
-
free_reserved_port
()[source]¶ Free TCP port. Makes it immediately ready to be used by Cloud SQL Proxy.
-
get_database_hook
()[source]¶ Retrieve database hook. This is the actual Postgres or MySQL database hook that uses proxy or connects directly to the Google Cloud SQL database.
-
get_sqlproxy_runner
()[source]¶ Retrieve Cloud SQL Proxy runner. It is used to manage the proxy lifecycle per task.
Returns: The Cloud SQL Proxy runner. Return type: CloudSqlProxyRunner
-
class
airflow.contrib.hooks.gcp_sql_hook.
CloudSqlProxyRunner
(path_prefix, instance_specification, gcp_conn_id='google_cloud_default', project_id=None, sql_proxy_version=None, sql_proxy_binary_path=None)[source]¶ Bases:
airflow.utils.log.logging_mixin.LoggingMixin
Downloads and runs cloud-sql-proxy as subprocess of the Python process.
The cloud-sql-proxy needs to be downloaded and started before we can connect to the Google Cloud SQL instance via database connection. It establishes secure tunnel connection to the database. It authorizes using the GCP credentials that are passed by the configuration.
More details about the proxy can be found here: https://cloud.google.com/sql/docs/mysql/sql-proxy
-
get_socket_path
()[source]¶ Retrieves UNIX socket path used by Cloud SQL Proxy.
Returns: The dynamically generated path for the socket created by the proxy. Return type: str
-
Cloud Bigtable¶
Cloud Bigtable Operators¶
- BigtableInstanceCreateOperator : creates a Cloud Bigtable instance.
- BigtableInstanceDeleteOperator : deletes a Google Cloud Bigtable instance.
- BigtableClusterUpdateOperator : updates the number of nodes in a Google Cloud Bigtable cluster.
- BigtableTableCreateOperator : creates a table in a Google Cloud Bigtable instance.
- BigtableTableDeleteOperator : deletes a table in a Google Cloud Bigtable instance.
- BigtableTableWaitForReplicationSensor : (sensor) waits for a table to be fully replicated.
BigtableInstanceCreateOperator¶
BigtableInstanceDeleteOperator¶
BigtableClusterUpdateOperator¶
BigtableTableCreateOperator¶
BigtableTableDeleteOperator¶
BigtableTableWaitForReplicationSensor¶
Cloud Bigtable Hook¶
Compute Engine¶
Compute Engine Operators¶
- GceInstanceStartOperator : start an existing Google Compute Engine instance.
- GceInstanceStopOperator : stop an existing Google Compute Engine instance.
- GceSetMachineTypeOperator : change the machine type for a stopped instance.
- GceInstanceTemplateCopyOperator : copy the Instance Template, applying specified changes.
- GceInstanceGroupManagerUpdateTemplateOperator : patch the Instance Group Manager, replacing source Instance Template URL with the destination one.
The operators have the common base operator:
-
class
airflow.contrib.operators.gcp_compute_operator.
GceBaseOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Abstract base operator for Google Compute Engine operators to inherit from.
They also use Compute Engine Hook to communicate with Google Cloud Platform.
GceInstanceStartOperator¶
-
class
airflow.contrib.operators.gcp_compute_operator.
GceInstanceStartOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.gcp_compute_operator.GceBaseOperator
Starts an instance in Google Compute Engine.
Parameters: - zone (str) – Google Cloud Platform zone where the instance exists.
- resource_id (str) – Name of the Compute Engine instance resource.
- project_id (str) – Optional, Google Cloud Platform Project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used.
- gcp_conn_id (str) – Optional, The connection ID used to connect to Google Cloud Platform. Defaults to ‘google_cloud_default’.
- api_version (str) – Optional, API version used (for example v1 - or beta). Defaults to v1.
- validate_body – Optional, If set to False, body validation is not performed. Defaults to False.
GceInstanceStopOperator¶
-
class
airflow.contrib.operators.gcp_compute_operator.
GceInstanceStopOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.gcp_compute_operator.GceBaseOperator
Stops an instance in Google Compute Engine.
Parameters: - zone (str) – Google Cloud Platform zone where the instance exists.
- resource_id (str) – Name of the Compute Engine instance resource.
- project_id (str) – Optional, Google Cloud Platform Project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used.
- gcp_conn_id (str) – Optional, The connection ID used to connect to Google Cloud Platform. Defaults to ‘google_cloud_default’.
- api_version (str) – Optional, API version used (for example v1 - or beta). Defaults to v1.
- validate_body – Optional, If set to False, body validation is not performed. Defaults to False.
GceSetMachineTypeOperator¶
-
class
airflow.contrib.operators.gcp_compute_operator.
GceSetMachineTypeOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.gcp_compute_operator.GceBaseOperator
- Changes the machine type for a stopped instance to the machine type specified in
- the request.
Parameters: - zone (str) – Google Cloud Platform zone where the instance exists.
- resource_id (str) – Name of the Compute Engine instance resource.
- body (dict) – Body required by the Compute Engine setMachineType API, as described in https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType#request-body
- project_id (str) – Optional, Google Cloud Platform Project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used.
- gcp_conn_id (str) – Optional, The connection ID used to connect to Google Cloud Platform. Defaults to ‘google_cloud_default’.
- api_version (str) – Optional, API version used (for example v1 - or beta). Defaults to v1.
- validate_body (bool) – Optional, If set to False, body validation is not performed. Defaults to False.
GceInstanceTemplateCopyOperator¶
-
class
airflow.contrib.operators.gcp_compute_operator.
GceInstanceTemplateCopyOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.gcp_compute_operator.GceBaseOperator
Copies the instance template, applying specified changes.
Parameters: - resource_id (str) – Name of the Instance Template
- body_patch (dict) – Patch to the body of instanceTemplates object following rfc7386 PATCH semantics. The body_patch content follows https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates Name field is required as we need to rename the template, all the other fields are optional. It is important to follow PATCH semantics - arrays are replaced fully, so if you need to update an array you should provide the whole target array as patch element.
- project_id (str) – Optional, Google Cloud Platform Project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used.
- request_id (str) – Optional, unique request_id that you might add to achieve full idempotence (for example when client call times out repeating the request with the same request id will not create a new instance template again). It should be in UUID format as defined in RFC 4122.
- gcp_conn_id (str) – Optional, The connection ID used to connect to Google Cloud Platform. Defaults to ‘google_cloud_default’.
- api_version (str) – Optional, API version used (for example v1 - or beta). Defaults to v1.
- validate_body (bool) – Optional, If set to False, body validation is not performed. Defaults to False.
GceInstanceGroupManagerUpdateTemplateOperator¶
-
class
airflow.contrib.operators.gcp_compute_operator.
GceInstanceGroupManagerUpdateTemplateOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.gcp_compute_operator.GceBaseOperator
Patches the Instance Group Manager, replacing source template URL with the destination one. API V1 does not have update/patch operations for Instance Group Manager, so you must use beta or newer API version. Beta is the default.
Parameters: - resource_id (str) – Name of the Instance Group Manager
- zone (str) – Google Cloud Platform zone where the Instance Group Manager exists.
- source_template (str) – URL of the template to replace.
- destination_template (str) – URL of the target template.
- project_id (str) – Optional, Google Cloud Platform Project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used.
- request_id (str) – Optional, unique request_id that you might add to achieve full idempotence (for example when client call times out repeating the request with the same request id will not create a new instance template again). It should be in UUID format as defined in RFC 4122.
- gcp_conn_id (str) – Optional, The connection ID used to connect to Google Cloud Platform. Defaults to ‘google_cloud_default’.
- api_version (str) – Optional, API version used (for example v1 - or beta). Defaults to v1.
- validate_body (bool) – Optional, If set to False, body validation is not performed. Defaults to False.
Compute Engine Hook¶
-
class
airflow.contrib.hooks.gcp_compute_hook.
GceHook
(api_version='v1', gcp_conn_id='google_cloud_default', delegate_to=None)[source]¶ Bases:
airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook
Hook for Google Compute Engine APIs.
All the methods in the hook where project_id is used must be called with keyword arguments rather than positional.
-
get_conn
()[source]¶ Retrieves connection to Google Compute Engine.
Returns: Google Compute Engine services object Return type: dict
-
get_instance_group_manager
(*args, **kwargs)[source]¶ Retrieves Instance Group Manager by project_id, zone and resource_id. Must be called with keyword arguments rather than positional.
Parameters: - zone (str) – Google Cloud Platform zone where the Instance Group Manager exists
- resource_id (str) – Name of the Instance Group Manager
- project_id (str) – Optional, Google Cloud Platform project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used.
Returns: Instance group manager representation as object according to https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers
Return type: dict
-
get_instance_template
(*args, **kwargs)[source]¶ Retrieves instance template by project_id and resource_id. Must be called with keyword arguments rather than positional.
Parameters: - resource_id (str) – Name of the instance template
- project_id (str) – Optional, Google Cloud Platform project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used.
Returns: Instance template representation as object according to https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates
Return type: dict
-
insert_instance_template
(*args, **kwargs)[source]¶ Inserts instance template using body specified Must be called with keyword arguments rather than positional.
Parameters: - body (dict) – Instance template representation as object according to https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates
- request_id (str) – Optional, unique request_id that you might add to achieve full idempotence (for example when client call times out repeating the request with the same request id will not create a new instance template again) It should be in UUID format as defined in RFC 4122
- project_id (str) – Optional, Google Cloud Platform project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used.
Returns: None
-
patch_instance_group_manager
(*args, **kwargs)[source]¶ Patches Instance Group Manager with the specified body. Must be called with keyword arguments rather than positional.
Parameters: - zone (str) – Google Cloud Platform zone where the Instance Group Manager exists
- resource_id (str) – Name of the Instance Group Manager
- body (dict) – Instance Group Manager representation as json-merge-patch object according to https://cloud.google.com/compute/docs/reference/rest/beta/instanceTemplates/patch
- request_id (str) – Optional, unique request_id that you might add to achieve full idempotence (for example when client call times out repeating the request with the same request id will not create a new instance template again). It should be in UUID format as defined in RFC 4122
- project_id (str) – Optional, Google Cloud Platform project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used.
:return None
-
set_machine_type
(*args, **kwargs)[source]¶ Sets machine type of an instance defined by project_id, zone and resource_id. Must be called with keyword arguments rather than positional.
Parameters: - zone (str) – Google Cloud Platform zone where the instance exists.
- resource_id (str) – Name of the Compute Engine instance resource
- body (dict) – Body required by the Compute Engine setMachineType API, as described in https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType
- project_id (str) – Optional, Google Cloud Platform project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used.
Returns: None
-
start_instance
(*args, **kwargs)[source]¶ Starts an existing instance defined by project_id, zone and resource_id. Must be called with keyword arguments rather than positional.
Parameters: - zone (str) – Google Cloud Platform zone where the instance exists
- resource_id (str) – Name of the Compute Engine instance resource
- project_id (str) – Optional, Google Cloud Platform project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used.
Returns: None
-
stop_instance
(*args, **kwargs)[source]¶ Stops an instance defined by project_id, zone and resource_id Must be called with keyword arguments rather than positional.
Parameters: - zone (str) – Google Cloud Platform zone where the instance exists
- resource_id (str) – Name of the Compute Engine instance resource
- project_id (str) – Optional, Google Cloud Platform project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used.
Returns: None
-
members: |
---|
Cloud Functions¶
Cloud Functions Operators¶
- GcfFunctionDeployOperator : deploy Google Cloud Function to Google Cloud Platform
- GcfFunctionDeleteOperator : delete Google Cloud Function in Google Cloud Platform
They also use Cloud Functions Hook to communicate with Google Cloud Platform.
GcfFunctionDeployOperator¶
-
class
airflow.contrib.operators.gcp_function_operator.
GcfFunctionDeployOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Creates a function in Google Cloud Functions. If a function with this name already exists, it will be updated.
Parameters: - location (str) – Google Cloud Platform region where the function should be created.
- body (dict or google.cloud.functions.v1.CloudFunction) – Body of the Cloud Functions definition. The body must be a Cloud Functions dictionary as described in: https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions . Different API versions require different variants of the Cloud Functions dictionary.
- project_id (str) – (Optional) Google Cloud Platform project ID where the function should be created.
- gcp_conn_id (str) – (Optional) The connection ID used to connect to Google Cloud Platform - default ‘google_cloud_default’.
- api_version (str) – (Optional) API version used (for example v1 - default - or v1beta1).
- zip_path (str) – Path to zip file containing source code of the function. If the path is set, the sourceUploadUrl should not be specified in the body or it should be empty. Then the zip file will be uploaded using the upload URL generated via generateUploadUrl from the Cloud Functions API.
- validate_body (bool) – If set to False, body validation is not performed.
GcfFunctionDeleteOperator¶
-
class
airflow.contrib.operators.gcp_function_operator.
GcfFunctionDeleteOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Deletes the specified function from Google Cloud Functions.
Parameters: - name (str) – A fully-qualified function name, matching the pattern: ^projects/[^/]+/locations/[^/]+/functions/[^/]+$
- gcp_conn_id (str) – The connection ID to use to connect to Google Cloud Platform.
- api_version (str) – API version used (for example v1 or v1beta1).
Cloud Functions Hook¶
-
class
airflow.contrib.hooks.gcp_function_hook.
GcfHook
(api_version, gcp_conn_id='google_cloud_default', delegate_to=None)[source]¶ Bases:
airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook
Hook for the Google Cloud Functions APIs.
-
create_new_function
(*args, **kwargs)[source]¶ Creates a new function in Cloud Function in the location specified in the body.
Parameters: - location (str) – The location of the function.
- body (dict) – The body required by the Cloud Functions insert API.
- project_id (str) – Optional, Google Cloud Project project_id where the function belongs. If set to None or missing, the default project_id from the GCP connection is used.
Returns: None
-
delete_function
(name)[source]¶ Deletes the specified Cloud Function.
Parameters: name (str) – The name of the function. Returns: None
-
get_conn
()[source]¶ Retrieves the connection to Cloud Functions.
Returns: Google Cloud Function services object. Return type: dict
-
get_function
(name)[source]¶ Returns the Cloud Function with the given name.
Parameters: name (str) – Name of the function. Returns: A Cloud Functions object representing the function. Return type: dict
-
update_function
(name, body, update_mask)[source]¶ Updates Cloud Functions according to the specified update mask.
Parameters: - name (str) – The name of the function.
- body (dict) – The body required by the cloud function patch API.
- update_mask ([str]) – The update mask - array of fields that should be patched.
Returns: None
-
upload_function_zip
(*args, **kwargs)[source]¶ Uploads zip file with sources.
Parameters: - location (str) – The location where the function is created.
- zip_path (str) – The path of the valid .zip file to upload.
- project_id (str) – Optional, Google Cloud Project project_id where the function belongs. If set to None or missing, the default project_id from the GCP connection is used.
Returns: The upload URL that was returned by generateUploadUrl method.
-
Cloud DataFlow¶
DataFlow Operators¶
- DataFlowJavaOperator : launching Cloud Dataflow jobs written in Java.
- DataflowTemplateOperator : launching a templated Cloud DataFlow batch job.
- DataFlowPythonOperator : launching Cloud Dataflow jobs written in python.
DataFlowJavaOperator¶
-
class
airflow.contrib.operators.dataflow_operator.
DataFlowJavaOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Start a Java Cloud DataFlow batch job. The parameters of the operation will be passed to the job.
See also
For more detail on job submission have a look at the reference: https://cloud.google.com/dataflow/pipelines/specifying-exec-params
Parameters: - jar (string) – The reference to a self executing DataFlow jar.
- dataflow_default_options (dict) – Map of default job options.
- options (dict) – Map of job specific options.
- gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- poll_sleep (int) – The time in seconds to sleep between polling Google Cloud Platform for the dataflow job status while the job is in the JOB_STATE_RUNNING state.
- job_class (string) – The name of the dataflow job class to be executued, it is often not the main class configured in the dataflow jar file.
Both
jar
andoptions
are templated so you can use variables in them.Note that both
dataflow_default_options
andoptions
will be merged to specify pipeline execution parameter, anddataflow_default_options
is expected to save high-level options, for instances, project and zone information, which apply to all dataflow operators in the DAG.It’s a good practice to define dataflow_* parameters in the default_args of the dag like the project, zone and staging location.
default_args = { 'dataflow_default_options': { 'project': 'my-gcp-project', 'zone': 'europe-west1-d', 'stagingLocation': 'gs://my-staging-bucket/staging/' } }
You need to pass the path to your dataflow as a file reference with the
jar
parameter, the jar needs to be a self executing jar (see documentation here: https://beam.apache.org/documentation/runners/dataflow/#self-executing-jar). Useoptions
to pass on options to your job.t1 = DataFlowJavaOperator( task_id='datapflow_example', jar='{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar', options={ 'autoscalingAlgorithm': 'BASIC', 'maxNumWorkers': '50', 'start': '{{ds}}', 'partitionType': 'DAY', 'labels': {'foo' : 'bar'} }, gcp_conn_id='gcp-airflow-service-account', dag=my-dag)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date':
(2016, 8, 1),
'email': ['alex@vanboxel.be'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=30),
'dataflow_default_options': {
'project': 'my-gcp-project',
'zone': 'us-central1-f',
'stagingLocation': 'gs://bucket/tmp/dataflow/staging/',
}
}
dag = DAG('test-dag', default_args=default_args)
task = DataFlowJavaOperator(
gcp_conn_id='gcp_default',
task_id='normalize-cal',
jar='{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar',
options={
'autoscalingAlgorithm': 'BASIC',
'maxNumWorkers': '50',
'start': '{{ds}}',
'partitionType': 'DAY'
},
dag=dag)
DataflowTemplateOperator¶
-
class
airflow.contrib.operators.dataflow_operator.
DataflowTemplateOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Start a Templated Cloud DataFlow batch job. The parameters of the operation will be passed to the job.
Parameters: - template (string) – The reference to the DataFlow template.
- dataflow_default_options (dict) – Map of default job environment options.
- parameters (dict) – Map of job specific parameters for the template.
- gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- poll_sleep (int) – The time in seconds to sleep between polling Google Cloud Platform for the dataflow job status while the job is in the JOB_STATE_RUNNING state.
It’s a good practice to define dataflow_* parameters in the default_args of the dag like the project, zone and staging location.
See also
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
default_args = { 'dataflow_default_options': { 'project': 'my-gcp-project' 'zone': 'europe-west1-d', 'tempLocation': 'gs://my-staging-bucket/staging/' } } }
You need to pass the path to your dataflow template as a file reference with the
template
parameter. Useparameters
to pass on parameters to your job. Useenvironment
to pass on runtime environment variables to your job.t1 = DataflowTemplateOperator( task_id='datapflow_example', template='{{var.value.gcp_dataflow_base}}', parameters={ 'inputFile': "gs://bucket/input/my_input.txt", 'outputFile': "gs://bucket/output/my_output.txt" }, gcp_conn_id='gcp-airflow-service-account', dag=my-dag)
template
,dataflow_default_options
andparameters
are templated so you can use variables in them.Note that
dataflow_default_options
is expected to save high-level options for project information, which apply to all dataflow operators in the DAG.See also
https://cloud.google.com/dataflow/docs/reference/rest/v1b3 /LaunchTemplateParameters https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment For more detail on job template execution have a look at the reference: https://cloud.google.com/dataflow/docs/templates/executing-templates
DataFlowPythonOperator¶
-
class
airflow.contrib.operators.dataflow_operator.
DataFlowPythonOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Launching Cloud Dataflow jobs written in python. Note that both dataflow_default_options and options will be merged to specify pipeline execution parameter, and dataflow_default_options is expected to save high-level options, for instances, project and zone information, which apply to all dataflow operators in the DAG.
See also
For more detail on job submission have a look at the reference: https://cloud.google.com/dataflow/pipelines/specifying-exec-params
Parameters: - py_file (string) – Reference to the python dataflow pipleline file.py, e.g., /some/local/file/path/to/your/python/pipeline/file.
- py_options – Additional python options.
- dataflow_default_options (dict) – Map of default job options.
- options (dict) – Map of job specific options.
- gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- poll_sleep (int) – The time in seconds to sleep between polling Google Cloud Platform for the dataflow job status while the job is in the JOB_STATE_RUNNING state.
Cloud DataProc¶
DataProc Operators¶
- DataprocClusterCreateOperator : Create a new cluster on Google Cloud Dataproc.
- DataprocClusterDeleteOperator : Delete a cluster on Google Cloud Dataproc.
- DataprocClusterScaleOperator : Scale up or down a cluster on Google Cloud Dataproc.
- DataProcPigOperator : Start a Pig query Job on a Cloud DataProc cluster.
- DataProcHiveOperator : Start a Hive query Job on a Cloud DataProc cluster.
- DataProcSparkSqlOperator : Start a Spark SQL query Job on a Cloud DataProc cluster.
- DataProcSparkOperator : Start a Spark Job on a Cloud DataProc cluster.
- DataProcHadoopOperator : Start a Hadoop Job on a Cloud DataProc cluster.
- DataProcPySparkOperator : Start a PySpark Job on a Cloud DataProc cluster.
- DataprocWorkflowTemplateInstantiateOperator : Instantiate a WorkflowTemplate on Google Cloud Dataproc.
- DataprocWorkflowTemplateInstantiateInlineOperator : Instantiate a WorkflowTemplate Inline on Google Cloud Dataproc.
DataprocClusterCreateOperator¶
-
class
airflow.contrib.operators.dataproc_operator.
DataprocClusterCreateOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Create a new cluster on Google Cloud Dataproc. The operator will wait until the creation is successful or an error occurs in the creation process.
The parameters allow to configure the cluster. Please refer to
https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters
for a detailed explanation on the different parameters. Most of the configuration parameters detailed in the link are available as a parameter to this operator.
Parameters: - cluster_name (string) – The name of the DataProc cluster to create. (templated)
- project_id (str) – The ID of the google cloud project in which to create the cluster. (templated)
- num_workers (int) – The # of workers to spin up. If set to zero will spin up cluster in a single node mode
- storage_bucket (string) – The storage bucket to use, setting to None lets dataproc generate a custom one for you
- init_actions_uris (list[string]) – List of GCS uri’s containing dataproc initialization scripts
- init_action_timeout (string) – Amount of time executable scripts in init_actions_uris has to complete
- metadata (dict) – dict of key-value google compute engine metadata entries to add to all instances
- image_version (string) – the version of software inside the Dataproc cluster
- custom_image – custom Dataproc image for more info see https://cloud.google.com/dataproc/docs/guides/dataproc-images
- properties (dict) – dict of properties to set on config files (e.g. spark-defaults.conf), see https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#SoftwareConfig
- master_machine_type (string) – Compute engine machine type to use for the master node
- master_disk_type (string) – Type of the boot disk for the master node
(default is
pd-standard
). Valid values:pd-ssd
(Persistent Disk Solid State Drive) orpd-standard
(Persistent Disk Hard Disk Drive). - master_disk_size (int) – Disk size for the master node
- worker_machine_type (string) – Compute engine machine type to use for the worker nodes
- worker_disk_type (string) – Type of the boot disk for the worker node
(default is
pd-standard
). Valid values:pd-ssd
(Persistent Disk Solid State Drive) orpd-standard
(Persistent Disk Hard Disk Drive). - worker_disk_size (int) – Disk size for the worker nodes
- num_preemptible_workers (int) – The # of preemptible worker nodes to spin up
- labels (dict) – dict of labels to add to the cluster
- zone (string) – The zone where the cluster will be located. (templated)
- network_uri (string) – The network uri to be used for machine communication, cannot be specified with subnetwork_uri
- subnetwork_uri (string) – The subnetwork uri to be used for machine communication, cannot be specified with network_uri
- internal_ip_only (bool) – If true, all instances in the cluster will only have internal IP addresses. This can only be enabled for subnetwork enabled networks
- tags (list[string]) – The GCE tags to add to all instances
- region – leave as ‘global’, might become relevant in the future. (templated)
- gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- service_account (string) – The service account of the dataproc instances.
- service_account_scopes (list[string]) – The URIs of service account scopes to be included.
- idle_delete_ttl (int) – The longest duration that cluster would keep alive while staying idle. Passing this threshold will cause cluster to be auto-deleted. A duration in seconds.
- auto_delete_time (datetime.datetime) – The time when cluster will be auto-deleted.
- auto_delete_ttl (int) – The life duration of cluster, the cluster will be auto-deleted at the end of this duration. A duration in seconds. (If auto_delete_time is set this parameter will be ignored)
Type: custom_image: string
DataprocClusterScaleOperator¶
-
class
airflow.contrib.operators.dataproc_operator.
DataprocClusterScaleOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Scale, up or down, a cluster on Google Cloud Dataproc. The operator will wait until the cluster is re-scaled.
Example:
t1 = DataprocClusterScaleOperator( task_id='dataproc_scale', project_id='my-project', cluster_name='cluster-1', num_workers=10, num_preemptible_workers=10, graceful_decommission_timeout='1h', dag=dag)
See also
For more detail on about scaling clusters have a look at the reference: https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/scaling-clusters
Parameters: - cluster_name (string) – The name of the cluster to scale. (templated)
- project_id (string) – The ID of the google cloud project in which the cluster runs. (templated)
- region (string) – The region for the dataproc cluster. (templated)
- gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
- num_workers (int) – The new number of workers
- num_preemptible_workers (int) – The new number of preemptible workers
- graceful_decommission_timeout (string) – Timeout for graceful YARN decomissioning. Maximum value is 1d
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
DataprocClusterDeleteOperator¶
-
class
airflow.contrib.operators.dataproc_operator.
DataprocClusterDeleteOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Delete a cluster on Google Cloud Dataproc. The operator will wait until the cluster is destroyed.
Parameters: - cluster_name (string) – The name of the cluster to create. (templated)
- project_id (string) – The ID of the google cloud project in which the cluster runs. (templated)
- region (string) – leave as ‘global’, might become relevant in the future. (templated)
- gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
DataProcPigOperator¶
-
class
airflow.contrib.operators.dataproc_operator.
DataProcPigOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Start a Pig query Job on a Cloud DataProc cluster. The parameters of the operation will be passed to the cluster.
It’s a good practice to define dataproc_* parameters in the default_args of the dag like the cluster name and UDFs.
default_args = { 'cluster_name': 'cluster-1', 'dataproc_pig_jars': [ 'gs://example/udf/jar/datafu/1.2.0/datafu.jar', 'gs://example/udf/jar/gpig/1.2/gpig.jar' ] }
You can pass a pig script as string or file reference. Use variables to pass on variables for the pig script to be resolved on the cluster or use the parameters to be resolved in the script as template parameters.
Example:
t1 = DataProcPigOperator( task_id='dataproc_pig', query='a_pig_script.pig', variables={'out': 'gs://example/output/{{ds}}'}, dag=dag)
See also
For more detail on about job submission have a look at the reference: https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs
Parameters: - query (string) – The query or reference to the query file (pg or pig extension). (templated)
- query_uri (string) – The uri of a pig script on Cloud Storage.
- variables (dict) – Map of named parameters for the query. (templated)
- job_name (string) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. (templated)
- cluster_name (string) – The name of the DataProc cluster. (templated)
- dataproc_pig_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
- dataproc_pig_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
- gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- region (str) – The specified region where the dataproc cluster is created.
- job_error_states (list) – Job states that should be considered error states.
Any states in this list will result in an error being raised and failure of the
task. Eg, if the
CANCELLED
state should also be considered a task failure, pass in['ERROR', 'CANCELLED']
. Possible values are currently only'ERROR'
and'CANCELLED'
, but could change in the future. Defaults to['ERROR']
.
Variables: dataproc_job_id (string) – The actual “jobId” as submitted to the Dataproc API. This is useful for identifying or linking to the job in the Google Cloud Console Dataproc UI, as the actual “jobId” submitted to the Dataproc API is appended with an 8 character random string.
DataProcHiveOperator¶
-
class
airflow.contrib.operators.dataproc_operator.
DataProcHiveOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Start a Hive query Job on a Cloud DataProc cluster.
Parameters: - query (string) – The query or reference to the query file (q extension).
- query_uri (string) – The uri of a hive script on Cloud Storage.
- variables (dict) – Map of named parameters for the query.
- job_name (string) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes.
- cluster_name (string) – The name of the DataProc cluster.
- dataproc_hive_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
- dataproc_hive_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
- gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- region (str) – The specified region where the dataproc cluster is created.
- job_error_states (list) – Job states that should be considered error states.
Any states in this list will result in an error being raised and failure of the
task. Eg, if the
CANCELLED
state should also be considered a task failure, pass in['ERROR', 'CANCELLED']
. Possible values are currently only'ERROR'
and'CANCELLED'
, but could change in the future. Defaults to['ERROR']
.
Variables: dataproc_job_id (string) – The actual “jobId” as submitted to the Dataproc API. This is useful for identifying or linking to the job in the Google Cloud Console Dataproc UI, as the actual “jobId” submitted to the Dataproc API is appended with an 8 character random string.
DataProcSparkSqlOperator¶
-
class
airflow.contrib.operators.dataproc_operator.
DataProcSparkSqlOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Start a Spark SQL query Job on a Cloud DataProc cluster.
Parameters: - query (string) – The query or reference to the query file (q extension). (templated)
- query_uri (string) – The uri of a spark sql script on Cloud Storage.
- variables (dict) – Map of named parameters for the query. (templated)
- job_name (string) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. (templated)
- cluster_name (string) – The name of the DataProc cluster. (templated)
- dataproc_spark_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
- dataproc_spark_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
- gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- region (str) – The specified region where the dataproc cluster is created.
- job_error_states (list) – Job states that should be considered error states.
Any states in this list will result in an error being raised and failure of the
task. Eg, if the
CANCELLED
state should also be considered a task failure, pass in['ERROR', 'CANCELLED']
. Possible values are currently only'ERROR'
and'CANCELLED'
, but could change in the future. Defaults to['ERROR']
.
Variables: dataproc_job_id (string) – The actual “jobId” as submitted to the Dataproc API. This is useful for identifying or linking to the job in the Google Cloud Console Dataproc UI, as the actual “jobId” submitted to the Dataproc API is appended with an 8 character random string.
DataProcSparkOperator¶
-
class
airflow.contrib.operators.dataproc_operator.
DataProcSparkOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Start a Spark Job on a Cloud DataProc cluster.
Parameters: - main_jar (string) – URI of the job jar provisioned on Cloud Storage. (use this or the main_class, not both together).
- main_class (string) – Name of the job class. (use this or the main_jar, not both together).
- arguments (list) – Arguments for the job. (templated)
- archives (list) – List of archived files that will be unpacked in the work directory. Should be stored in Cloud Storage.
- files (list) – List of files to be copied to the working directory
- job_name (string) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. (templated)
- cluster_name (string) – The name of the DataProc cluster. (templated)
- dataproc_spark_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
- dataproc_spark_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
- gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- region (str) – The specified region where the dataproc cluster is created.
- job_error_states (list) – Job states that should be considered error states.
Any states in this list will result in an error being raised and failure of the
task. Eg, if the
CANCELLED
state should also be considered a task failure, pass in['ERROR', 'CANCELLED']
. Possible values are currently only'ERROR'
and'CANCELLED'
, but could change in the future. Defaults to['ERROR']
.
Variables: dataproc_job_id (string) – The actual “jobId” as submitted to the Dataproc API. This is useful for identifying or linking to the job in the Google Cloud Console Dataproc UI, as the actual “jobId” submitted to the Dataproc API is appended with an 8 character random string.
DataProcHadoopOperator¶
-
class
airflow.contrib.operators.dataproc_operator.
DataProcHadoopOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Start a Hadoop Job on a Cloud DataProc cluster.
Parameters: - main_jar (string) – URI of the job jar provisioned on Cloud Storage. (use this or the main_class, not both together).
- main_class (string) – Name of the job class. (use this or the main_jar, not both together).
- arguments (list) – Arguments for the job. (templated)
- archives (list) – List of archived files that will be unpacked in the work directory. Should be stored in Cloud Storage.
- files (list) – List of files to be copied to the working directory
- job_name (string) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. (templated)
- cluster_name (string) – The name of the DataProc cluster. (templated)
- dataproc_hadoop_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
- dataproc_hadoop_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
- gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- region (str) – The specified region where the dataproc cluster is created.
- job_error_states (list) – Job states that should be considered error states.
Any states in this list will result in an error being raised and failure of the
task. Eg, if the
CANCELLED
state should also be considered a task failure, pass in['ERROR', 'CANCELLED']
. Possible values are currently only'ERROR'
and'CANCELLED'
, but could change in the future. Defaults to['ERROR']
.
Variables: dataproc_job_id (string) – The actual “jobId” as submitted to the Dataproc API. This is useful for identifying or linking to the job in the Google Cloud Console Dataproc UI, as the actual “jobId” submitted to the Dataproc API is appended with an 8 character random string.
DataProcPySparkOperator¶
-
class
airflow.contrib.operators.dataproc_operator.
DataProcPySparkOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Start a PySpark Job on a Cloud DataProc cluster.
Parameters: - main (string) – [Required] The Hadoop Compatible Filesystem (HCFS) URI of the main Python file to use as the driver. Must be a .py file.
- arguments (list) – Arguments for the job. (templated)
- archives (list) – List of archived files that will be unpacked in the work directory. Should be stored in Cloud Storage.
- files (list) – List of files to be copied to the working directory
- pyfiles (list) – List of Python files to pass to the PySpark framework. Supported file types: .py, .egg, and .zip
- job_name (string) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. (templated)
- cluster_name (string) – The name of the DataProc cluster.
- dataproc_pyspark_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
- dataproc_pyspark_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
- gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- region (str) – The specified region where the dataproc cluster is created.
- job_error_states (list) – Job states that should be considered error states.
Any states in this list will result in an error being raised and failure of the
task. Eg, if the
CANCELLED
state should also be considered a task failure, pass in['ERROR', 'CANCELLED']
. Possible values are currently only'ERROR'
and'CANCELLED'
, but could change in the future. Defaults to['ERROR']
.
Variables: dataproc_job_id (string) – The actual “jobId” as submitted to the Dataproc API. This is useful for identifying or linking to the job in the Google Cloud Console Dataproc UI, as the actual “jobId” submitted to the Dataproc API is appended with an 8 character random string.
DataprocWorkflowTemplateInstantiateOperator¶
-
class
airflow.contrib.operators.dataproc_operator.
DataprocWorkflowTemplateInstantiateOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateBaseOperator
Instantiate a WorkflowTemplate on Google Cloud Dataproc. The operator will wait until the WorkflowTemplate is finished executing.
See also
Please refer to: https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiate
Parameters: - template_id (string) – The id of the template. (templated)
- project_id (string) – The ID of the google cloud project in which the template runs
- region (string) – leave as ‘global’, might become relevant in the future
- gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
DataprocWorkflowTemplateInstantiateInlineOperator¶
-
class
airflow.contrib.operators.dataproc_operator.
DataprocWorkflowTemplateInstantiateInlineOperator
(**kwargs)[source]¶ Bases:
airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateBaseOperator
Instantiate a WorkflowTemplate Inline on Google Cloud Dataproc. The operator will wait until the WorkflowTemplate is finished executing.
See also
Please refer to: https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiateInline
Parameters: - template (map) – The template contents. (templated)
- project_id (string) – The ID of the google cloud project in which the template runs
- region (string) – leave as ‘global’, might become relevant in the future
- gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
Cloud Datastore¶
Datastore Operators¶
- DatastoreExportOperator : Export entities from Google Cloud Datastore to Cloud Storage.
- DatastoreImportOperator : Import entities from Cloud Storage to Google Cloud Datastore.
DatastoreExportOperator¶
-
class
airflow.contrib.operators.datastore_export_operator.
DatastoreExportOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Export entities from Google Cloud Datastore to Cloud Storage
Parameters: - bucket (string) – name of the cloud storage bucket to backup data
- namespace (str) – optional namespace path in the specified Cloud Storage bucket to backup data. If this namespace does not exist in GCS, it will be created.
- datastore_conn_id (string) – the name of the Datastore connection id to use
- cloud_storage_conn_id (string) – the name of the cloud storage connection id to force-write backup
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- entity_filter (dict) – description of what data from the project is included in the export, refer to https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter
- labels (dict) – client-assigned labels for cloud storage
- polling_interval_in_seconds (int) – number of seconds to wait before polling for execution status again
- overwrite_existing (bool) – if the storage bucket + namespace is not empty, it will be emptied prior to exports. This enables overwriting existing backups.
- xcom_push (bool) – push operation name to xcom for reference
DatastoreImportOperator¶
-
class
airflow.contrib.operators.datastore_import_operator.
DatastoreImportOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Import entities from Cloud Storage to Google Cloud Datastore
Parameters: - bucket (string) – container in Cloud Storage to store data
- file (string) – path of the backup metadata file in the specified Cloud Storage bucket. It should have the extension .overall_export_metadata
- namespace (str) – optional namespace of the backup metadata file in the specified Cloud Storage bucket.
- entity_filter (dict) – description of what data from the project is included in the export, refer to https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter
- labels (dict) – client-assigned labels for cloud storage
- datastore_conn_id (string) – the name of the connection id to use
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- polling_interval_in_seconds (int) – number of seconds to wait before polling for execution status again
- xcom_push (bool) – push operation name to xcom for reference
DatastoreHook¶
-
class
airflow.contrib.hooks.datastore_hook.
DatastoreHook
(datastore_conn_id='google_cloud_datastore_default', delegate_to=None)[source]¶ Bases:
airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook
Interact with Google Cloud Datastore. This hook uses the Google Cloud Platform connection.
This object is not threads safe. If you want to make multiple requests simultaneously, you will need to create a hook per thread.
-
allocate_ids
(partialKeys)[source]¶ Allocate IDs for incomplete keys. see https://cloud.google.com/datastore/docs/reference/rest/v1/projects/allocateIds
Parameters: partialKeys – a list of partial keys Returns: a list of full keys.
-
commit
(body)[source]¶ Commit a transaction, optionally creating, deleting or modifying some entities.
Parameters: body – the body of the commit request Returns: the response body of the commit request
-
delete_operation
(name)[source]¶ Deletes the long-running operation
Parameters: name – the name of the operation resource
-
export_to_storage_bucket
(bucket, namespace=None, entity_filter=None, labels=None)[source]¶ Export entities from Cloud Datastore to Cloud Storage for backup
-
get_operation
(name)[source]¶ Gets the latest state of a long-running operation
Parameters: name – the name of the operation resource
-
import_from_storage_bucket
(bucket, file, namespace=None, entity_filter=None, labels=None)[source]¶ Import a backup from Cloud Storage to Cloud Datastore
-
lookup
(keys, read_consistency=None, transaction=None)[source]¶ Lookup some entities by key
Parameters: - keys – the keys to lookup
- read_consistency – the read consistency to use. default, strong or eventual. Cannot be used with a transaction.
- transaction – the transaction to use, if any.
Returns: the response body of the lookup request.
-
poll_operation_until_done
(name, polling_interval_in_seconds)[source]¶ Poll backup operation state until it’s completed
-
Cloud ML Engine¶
Cloud ML Engine Operators¶
- MLEngineBatchPredictionOperator : Start a Cloud ML Engine batch prediction job.
- MLEngineModelOperator : Manages a Cloud ML Engine model.
- MLEngineTrainingOperator : Start a Cloud ML Engine training job.
- MLEngineVersionOperator : Manages a Cloud ML Engine model version.
MLEngineBatchPredictionOperator¶
-
class
airflow.contrib.operators.mlengine_operator.
MLEngineBatchPredictionOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Start a Google Cloud ML Engine prediction job.
NOTE: For model origin, users should consider exactly one from the three options below: 1. Populate ‘uri’ field only, which should be a GCS location that points to a tensorflow savedModel directory. 2. Populate ‘model_name’ field only, which refers to an existing model, and the default version of the model will be used. 3. Populate both ‘model_name’ and ‘version_name’ fields, which refers to a specific version of a specific model.
In options 2 and 3, both model and version name should contain the minimal identifier. For instance, call
MLEngineBatchPredictionOperator( ..., model_name='my_model', version_name='my_version', ...)
if the desired model version is “projects/my_project/models/my_model/versions/my_version”.
See https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs for further documentation on the parameters.
Parameters: - project_id (string) – The Google Cloud project name where the prediction job is submitted. (templated)
- job_id (string) – A unique id for the prediction job on Google Cloud ML Engine. (templated)
- data_format (string) – The format of the input data. It will default to ‘DATA_FORMAT_UNSPECIFIED’ if is not provided or is not one of [“TEXT”, “TF_RECORD”, “TF_RECORD_GZIP”].
- input_paths (list of string) – A list of GCS paths of input data for batch prediction. Accepting wildcard operator *, but only at the end. (templated)
- output_path (string) – The GCS path where the prediction results are written to. (templated)
- region (string) – The Google Compute Engine region to run the prediction job in. (templated)
- model_name (string) – The Google Cloud ML Engine model to use for prediction. If version_name is not provided, the default version of this model will be used. Should not be None if version_name is provided. Should be None if uri is provided. (templated)
- version_name (string) – The Google Cloud ML Engine model version to use for prediction. Should be None if uri is provided. (templated)
- uri (string) – The GCS path of the saved model to use for prediction. Should be None if model_name is provided. It should be a GCS path pointing to a tensorflow SavedModel. (templated)
- max_worker_count (int) – The maximum number of workers to be used for parallel processing. Defaults to 10 if not specified.
- runtime_version (string) – The Google Cloud ML Engine runtime version to use for batch prediction.
- gcp_conn_id (string) – The connection ID used for connection to Google Cloud Platform.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have doamin-wide delegation enabled.
- Raises:
ValueError
: if a unique model/version origin cannot be determined.
MLEngineModelOperator¶
-
class
airflow.contrib.operators.mlengine_operator.
MLEngineModelOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Operator for managing a Google Cloud ML Engine model.
Parameters: - project_id (string) – The Google Cloud project name to which MLEngine model belongs. (templated)
- model (dict) –
A dictionary containing the information about the model. If the operation is create, then the model parameter should contain all the information about this model such as name.
If the operation is get, the model parameter should contain the name of the model.
- operation (string) –
The operation to perform. Available operations are:
create
: Creates a new model as provided by the model parameter.get
: Gets a particular model where the name is specified in model.
- gcp_conn_id (string) – The connection ID to use when fetching connection info.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
MLEngineTrainingOperator¶
-
class
airflow.contrib.operators.mlengine_operator.
MLEngineTrainingOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Operator for launching a MLEngine training job.
Parameters: - project_id (string) – The Google Cloud project name within which MLEngine training job should run (templated).
- job_id (string) – A unique templated id for the submitted Google MLEngine training job. (templated)
- package_uris (string) – A list of package locations for MLEngine training job, which should include the main training program + any additional dependencies. (templated)
- training_python_module (string) – The Python module name to run within MLEngine training job after installing ‘package_uris’ packages. (templated)
- training_args (string) – A list of templated command line arguments to pass to the MLEngine training program. (templated)
- region (string) – The Google Compute Engine region to run the MLEngine training job in (templated).
- scale_tier (string) – Resource tier for MLEngine training job. (templated)
- runtime_version (string) – The Google Cloud ML runtime version to use for training. (templated)
- python_version (string) – The version of Python used in training. (templated)
- job_dir (string) – A Google Cloud Storage path in which to store training outputs and other data needed for training. (templated)
- gcp_conn_id (string) – The connection ID to use when fetching connection info.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- mode (string) – Can be one of ‘DRY_RUN’/’CLOUD’. In ‘DRY_RUN’ mode, no real training job will be launched, but the MLEngine training job request will be printed out. In ‘CLOUD’ mode, a real MLEngine training job creation request will be issued.
MLEngineVersionOperator¶
-
class
airflow.contrib.operators.mlengine_operator.
MLEngineVersionOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Operator for managing a Google Cloud ML Engine version.
Parameters: - project_id (string) – The Google Cloud project name to which MLEngine model belongs.
- model_name (string) – The name of the Google Cloud ML Engine model that the version belongs to. (templated)
- version_name (string) – A name to use for the version being operated upon. If not None and the version argument is None or does not have a value for the name key, then this will be populated in the payload for the name key. (templated)
- version (dict) – A dictionary containing the information about the version. If the operation is create, version should contain all the information about this version such as name, and deploymentUrl. If the operation is get or delete, the version parameter should contain the name of the version. If it is None, the only operation possible would be list. (templated)
- operation (string) –
The operation to perform. Available operations are:
create
: Creates a new version in the model specified by model_name, in which case the version parameter should contain all the information to create that version (e.g. name, deploymentUrl).get
: Gets full information of a particular version in the model specified by model_name. The name of the version should be specified in the version parameter.list
: Lists all available versions of the model specified by model_name.delete
: Deletes the version specified in version parameter from the model specified by model_name). The name of the version should be specified in the version parameter.
- gcp_conn_id (string) – The connection ID to use when fetching connection info.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
Cloud ML Engine Hook¶
MLEngineHook¶
-
class
airflow.contrib.hooks.gcp_mlengine_hook.
MLEngineHook
(gcp_conn_id='google_cloud_default', delegate_to=None)[source]¶ Bases:
airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook
-
create_job
(project_id, job, use_existing_job_fn=None)[source]¶ Launches a MLEngine job and wait for it to reach a terminal state.
Parameters: - project_id (string) – The Google Cloud project id within which MLEngine job will be launched.
- job (dict) –
MLEngine Job object that should be provided to the MLEngine API, such as:
{ 'jobId': 'my_job_id', 'trainingInput': { 'scaleTier': 'STANDARD_1', ... } }
- use_existing_job_fn (function) – In case that a MLEngine job with the same job_id already exist, this method (if provided) will decide whether we should use this existing job, continue waiting for it to finish and returning the job object. It should accepts a MLEngine job object, and returns a boolean value indicating whether it is OK to reuse the existing job. If ‘use_existing_job_fn’ is not provided, we by default reuse the existing MLEngine job.
Returns: The MLEngine job object if the job successfully reach a terminal state (which might be FAILED or CANCELLED state).
Return type: dict
-
create_version
(project_id, model_name, version_spec)[source]¶ Creates the Version on Google Cloud ML Engine.
Returns the operation if the version was created successfully and raises an error otherwise.
-
delete_version
(project_id, model_name, version_name)[source]¶ Deletes the given version of a model. Blocks until finished.
-
Cloud Storage¶
Storage Operators¶
- FileToGoogleCloudStorageOperator : Uploads a file to Google Cloud Storage.
- GoogleCloudStorageCreateBucketOperator : Creates a new ACL entry on the specified bucket.
- GoogleCloudStorageBucketCreateAclEntryOperator : Creates a new cloud storage bucket.
- GoogleCloudStorageDownloadOperator : Downloads a file from Google Cloud Storage.
- GoogleCloudStorageListOperator : List all objects from the bucket with the give string prefix and delimiter in name.
- GoogleCloudStorageToBigQueryOperator : Creates a new ACL entry on the specified object.
- GoogleCloudStorageObjectCreateAclEntryOperator : Loads files from Google cloud storage into BigQuery.
- GoogleCloudStorageToGoogleCloudStorageOperator : Copies objects from a bucket to another, with renaming if requested.
- GoogleCloudStorageToGoogleCloudStorageTransferOperator : Copies objects from a bucket to another using Google Transfer service.
- MySqlToGoogleCloudStorageOperator: Copy data from any MySQL Database to Google cloud storage in JSON format.
FileToGoogleCloudStorageOperator¶
-
class
airflow.contrib.operators.file_to_gcs.
FileToGoogleCloudStorageOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Uploads a file to Google Cloud Storage. Optionally can compress the file for upload.
Parameters: - src (string) – Path to the local file. (templated)
- dst (string) – Destination path within the specified bucket. (templated)
- bucket (string) – The bucket to upload to. (templated)
- google_cloud_storage_conn_id (string) – The Airflow connection ID to upload with
- mime_type (string) – The mime-type string
- delegate_to (str) – The account to impersonate, if any
- gzip (bool) – Allows for file to be compressed and uploaded as gzip
GoogleCloudStorageBucketCreateAclEntryOperator¶
-
class
airflow.contrib.operators.gcs_acl_operator.
GoogleCloudStorageBucketCreateAclEntryOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Creates a new ACL entry on the specified bucket.
Parameters: - bucket (str) – Name of a bucket.
- entity (str) – The entity holding the permission, in one of the following forms: user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers
- role (str) – The access permission for the entity. Acceptable values are: “OWNER”, “READER”, “WRITER”.
- user_project (str) – (Optional) The project to be billed for this request. Required for Requester Pays buckets.
- google_cloud_storage_conn_id (str) – The connection ID to use when connecting to Google Cloud Storage.
GoogleCloudStorageCreateBucketOperator¶
-
class
airflow.contrib.operators.gcs_operator.
GoogleCloudStorageCreateBucketOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Creates a new bucket. Google Cloud Storage uses a flat namespace, so you can’t create a bucket with a name that is already in use.
See also
For more information, see Bucket Naming Guidelines: https://cloud.google.com/storage/docs/bucketnaming.html#requirements
Parameters: - bucket_name (string) – The name of the bucket. (templated)
- storage_class (string) –
This defines how objects in the bucket are stored and determines the SLA and the cost of storage (templated). Values include
MULTI_REGIONAL
REGIONAL
STANDARD
NEARLINE
COLDLINE
.
If this value is not specified when the bucket is created, it will default to STANDARD.
- location (string) –
The location of the bucket. (templated) Object data for objects in the bucket resides in physical storage within this region. Defaults to US.
- project_id (string) – The ID of the GCP Project. (templated)
- labels (dict) – User-provided labels, in key/value pairs.
- google_cloud_storage_conn_id (string) – The connection ID to use when connecting to Google cloud storage.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- Example:
The following Operator would create a new bucket
test-bucket
withMULTI_REGIONAL
storage class inEU
regionCreateBucket = GoogleCloudStorageCreateBucketOperator( task_id='CreateNewBucket', bucket_name='test-bucket', storage_class='MULTI_REGIONAL', location='EU', labels={'env': 'dev', 'team': 'airflow'}, google_cloud_storage_conn_id='airflow-service-account' )
GoogleCloudStorageDownloadOperator¶
-
class
airflow.contrib.operators.gcs_download_operator.
GoogleCloudStorageDownloadOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Downloads a file from Google Cloud Storage.
Parameters: - bucket (string) – The Google cloud storage bucket where the object is. (templated)
- object (string) – The name of the object to download in the Google cloud storage bucket. (templated)
- filename (string) – The file path on the local file system (where the operator is being executed) that the file should be downloaded to. (templated) If no filename passed, the downloaded data will not be stored on the local file system.
- store_to_xcom_key (string) – If this param is set, the operator will push the contents of the downloaded file to XCom with the key set in this parameter. If not set, the downloaded data will not be pushed to XCom. (templated)
- google_cloud_storage_conn_id (string) – The connection ID to use when connecting to Google cloud storage.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
GoogleCloudStorageListOperator¶
-
class
airflow.contrib.operators.gcs_list_operator.
GoogleCloudStorageListOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
List all objects from the bucket with the give string prefix and delimiter in name.
- This operator returns a python list with the name of objects which can be used by
- xcom in the downstream task.
Parameters: - bucket (string) – The Google cloud storage bucket to find the objects. (templated)
- prefix (string) – Prefix string which filters objects whose name begin with this prefix. (templated)
- delimiter (string) – The delimiter by which you want to filter the objects. (templated) For e.g to lists the CSV files from in a directory in GCS you would use delimiter=’.csv’.
- google_cloud_storage_conn_id (string) – The connection ID to use when connecting to Google cloud storage.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- Example:
The following Operator would list all the Avro files from
sales/sales-2017
folder indata
bucket.GCS_Files = GoogleCloudStorageListOperator( task_id='GCS_Files', bucket='data', prefix='sales/sales-2017/', delimiter='.avro', google_cloud_storage_conn_id=google_cloud_conn_id )
GoogleCloudStorageObjectCreateAclEntryOperator¶
-
class
airflow.contrib.operators.gcs_acl_operator.
GoogleCloudStorageObjectCreateAclEntryOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Creates a new ACL entry on the specified object.
Parameters: - bucket (str) – Name of a bucket.
- object_name (str) – Name of the object. For information about how to URL encode object names to be path safe, see: https://cloud.google.com/storage/docs/json_api/#encoding
- entity (str) – The entity holding the permission, in one of the following forms: user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers
- role (str) – The access permission for the entity. Acceptable values are: “OWNER”, “READER”.
- generation (str) – (Optional) If present, selects a specific revision of this object (as opposed to the latest version, the default).
- user_project (str) – (Optional) The project to be billed for this request. Required for Requester Pays buckets.
- google_cloud_storage_conn_id (str) – The connection ID to use when connecting to Google Cloud Storage.
GoogleCloudStorageToBigQueryOperator¶
-
class
airflow.contrib.operators.gcs_to_bq.
GoogleCloudStorageToBigQueryOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Loads files from Google cloud storage into BigQuery.
The schema to be used for the BigQuery table may be specified in one of two ways. You may either directly pass the schema fields in, or you may point the operator to a Google cloud storage object name. The object in Google cloud storage must be a JSON file with the schema fields in it.
Parameters: - bucket (string) – The bucket to load from. (templated)
- source_objects (list of str) – List of Google cloud storage URIs to load from. (templated) If source_format is ‘DATASTORE_BACKUP’, the list must only contain a single URI.
- destination_project_dataset_table (string) – The dotted (<project>.)<dataset>.<table> BigQuery table to load data into. If <project> is not included, project will be the project defined in the connection json. (templated)
- schema_fields (list) – If set, the schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load Should not be set when source_format is ‘DATASTORE_BACKUP’.
- schema_object (string) – If set, a GCS object path pointing to a .json file that contains the schema for the table. (templated)
- source_format (string) – File format to export.
- compression (string) – [Optional] The compression type of the data source. Possible values include GZIP and NONE. The default value is NONE. This setting is ignored for Google Cloud Bigtable, Google Cloud Datastore backups and Avro formats.
- create_disposition (string) – The create disposition if the table doesn’t exist.
- skip_leading_rows (int) – Number of rows to skip when loading from a CSV.
- write_disposition (string) – The write disposition if the table already exists.
- field_delimiter (string) – The delimiter to use when loading from a CSV.
- max_bad_records (int) – The maximum number of bad records that BigQuery can ignore when running the job.
- quote_character (string) – The value that is used to quote data sections in a CSV file.
- ignore_unknown_values (bool) – [Optional] Indicates if BigQuery should allow extra values that are not represented in the table schema. If true, the extra values are ignored. If false, records with extra columns are treated as bad records, and if there are too many bad records, an invalid error is returned in the job result.
- allow_quoted_newlines (bool) – Whether to allow quoted newlines (true) or not (false).
- allow_jagged_rows (bool) – Accept rows that are missing trailing optional columns. The missing values are treated as nulls. If false, records with missing trailing columns are treated as bad records, and if there are too many bad records, an invalid error is returned in the job result. Only applicable to CSV, ignored for other formats.
- max_id_key (string) – If set, the name of a column in the BigQuery table that’s to be loaded. This will be used to select the MAX value from BigQuery after the load occurs. The results will be returned by the execute() command, which in turn gets stored in XCom for future operators to use. This can be helpful with incremental loads–during future executions, you can pick up from the max ID.
- bigquery_conn_id (string) – Reference to a specific BigQuery hook.
- google_cloud_storage_conn_id (string) – Reference to a specific Google cloud storage hook.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- schema_update_options (list) – Allows the schema of the destination table to be updated as a side effect of the load job.
- src_fmt_configs (dict) – configure optional fields specific to the source format
- external_table (bool) – Flag to specify if the destination table should be a BigQuery external table. Default Value is False.
- time_partitioning (dict) – configure optional time partitioning fields i.e. partition by field, type and expiration as per API specifications. Note that ‘field’ is not available in concurrency with dataset.table$partition.
- cluster_fields (list of str) – Request that the result of this load be stored sorted by one or more columns. This is only available in conjunction with time_partitioning. The order of columns given determines the sort order. Not applicable for external tables.
GoogleCloudStorageToGoogleCloudStorageOperator¶
-
class
airflow.contrib.operators.gcs_to_gcs.
GoogleCloudStorageToGoogleCloudStorageOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Copies objects from a bucket to another, with renaming if requested.
Parameters: - source_bucket (string) – The source Google cloud storage bucket where the object is. (templated)
- source_object (string) –
The source name of the object to copy in the Google cloud storage bucket. (templated) If wildcards are used in this argument:
You can use only one wildcard for objects (filenames) within your bucket. The wildcard can appear inside the object name or at the end of the object name. Appending a wildcard to the bucket name is unsupported. - destination_bucket (string) – The destination Google cloud storage bucket where the object should be. (templated)
- destination_object (string) – The destination name of the object in the
destination Google cloud storage bucket. (templated)
If a wildcard is supplied in the source_object argument, this is the
prefix that will be prepended to the final destination objects’ paths.
Note that the source path’s part before the wildcard will be removed;
if it needs to be retained it should be appended to destination_object.
For example, with prefix
foo/*
and destination_objectblah/
, the filefoo/baz
will be copied toblah/baz
; to retain the prefix write the destination_object as e.g.blah/foo
, in which case the copied file will be namedblah/foo/baz
. - move_object (bool) – When move object is True, the object is moved instead of copied to the new location. This is the equivalent of a mv command as opposed to a cp command.
- google_cloud_storage_conn_id (string) – The connection ID to use when connecting to Google cloud storage.
- delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- Examples:
The following Operator would copy a single file named
sales/sales-2017/january.avro
in thedata
bucket to the file namedcopied_sales/2017/january-backup.avro` in the ``data_backup
bucketcopy_single_file = GoogleCloudStorageToGoogleCloudStorageOperator( task_id='copy_single_file', source_bucket='data', source_object='sales/sales-2017/january.avro', destination_bucket='data_backup', destination_object='copied_sales/2017/january-backup.avro', google_cloud_storage_conn_id=google_cloud_conn_id )
The following Operator would copy all the Avro files from
sales/sales-2017
folder (i.e. with names starting with that prefix) indata
bucket to thecopied_sales/2017
folder in thedata_backup
bucket.copy_files = GoogleCloudStorageToGoogleCloudStorageOperator( task_id='copy_files', source_bucket='data', source_object='sales/sales-2017/*.avro', destination_bucket='data_backup', destination_object='copied_sales/2017/', google_cloud_storage_conn_id=google_cloud_conn_id )
The following Operator would move all the Avro files from
sales/sales-2017
folder (i.e. with names starting with that prefix) indata
bucket to the same folder in thedata_backup
bucket, deleting the original files in the process.move_files = GoogleCloudStorageToGoogleCloudStorageOperator( task_id='move_files', source_bucket='data', source_object='sales/sales-2017/*.avro', destination_bucket='data_backup', move_object=True, google_cloud_storage_conn_id=google_cloud_conn_id )
GoogleCloudStorageToGoogleCloudStorageTransferOperator¶
-
class
airflow.contrib.operators.gcs_to_gcs_transfer_operator.
GoogleCloudStorageToGoogleCloudStorageTransferOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Copies objects from a bucket to another using the GCP Storage Transfer Service.
Parameters: - source_bucket (str) – The source Google cloud storage bucket where the object is. (templated)
- destination_bucket (str) – The destination Google cloud storage bucket where the object should be. (templated)
- project_id (str) – The ID of the Google Cloud Platform Console project that owns the job
- gcp_conn_id (str) – Optional connection ID to use when connecting to Google Cloud Storage.
- delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
- description (str) – Optional transfer service job description
- schedule (dict) – Optional transfer service schedule; see https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs. If not set, run transfer job once as soon as the operator runs
- object_conditions (dict) – Optional transfer service object conditions; see https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#ObjectConditions
- transfer_options (dict) – Optional transfer service transfer options; see https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#TransferOptions
- wait (bool) – Wait for transfer to finish; defaults to True
Example:
gcs_to_gcs_transfer_op = GoogleCloudStorageToGoogleCloudStorageTransferOperator( task_id='gcs_to_gcs_transfer_example', source_bucket='my-source-bucket', destination_bucket='my-destination-bucket', project_id='my-gcp-project', dag=my_dag)
MySqlToGoogleCloudStorageOperator¶
-
class
airflow.contrib.operators.mysql_to_gcs.
MySqlToGoogleCloudStorageOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Copy data from MySQL to Google cloud storage in JSON format.
GoogleCloudStorageHook¶
-
class
airflow.contrib.hooks.gcs_hook.
GoogleCloudStorageHook
(google_cloud_storage_conn_id='google_cloud_default', delegate_to=None)[source]¶ Bases:
airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook
Interact with Google Cloud Storage. This hook uses the Google Cloud Platform connection.
-
copy
(source_bucket, source_object, destination_bucket=None, destination_object=None)[source]¶ Copies an object from a bucket to another, with renaming if requested.
destination_bucket or destination_object can be omitted, in which case source bucket/object is used, but not both.
Parameters: - source_bucket (string) – The bucket of the object to copy from.
- source_object (string) – The object to copy.
- destination_bucket (string) – The destination of the object to copied to. Can be omitted; then the same bucket is used.
- destination_object (string) – The (renamed) path of the object if given. Can be omitted; then the same name is used.
-
create_bucket
(bucket_name, storage_class='MULTI_REGIONAL', location='US', project_id=None, labels=None)[source]¶ Creates a new bucket. Google Cloud Storage uses a flat namespace, so you can’t create a bucket with a name that is already in use.
See also
For more information, see Bucket Naming Guidelines: https://cloud.google.com/storage/docs/bucketnaming.html#requirements
Parameters: - bucket_name (string) – The name of the bucket.
- storage_class (string) –
This defines how objects in the bucket are stored and determines the SLA and the cost of storage. Values include
MULTI_REGIONAL
REGIONAL
STANDARD
NEARLINE
COLDLINE
.
If this value is not specified when the bucket is created, it will default to STANDARD.
- location (string) –
The location of the bucket. Object data for objects in the bucket resides in physical storage within this region. Defaults to US.
- project_id (string) – The ID of the GCP Project.
- labels (dict) – User-provided labels, in key/value pairs.
Returns: If successful, it returns the
id
of the bucket.
-
delete
(bucket, object, generation=None)[source]¶ Delete an object if versioning is not enabled for the bucket, or if generation parameter is used.
Parameters: - bucket (string) – name of the bucket, where the object resides
- object (string) – name of the object to delete
- generation (string) – if present, permanently delete the object of this generation
Returns: True if succeeded
-
download
(bucket, object, filename=None)[source]¶ Get a file from Google Cloud Storage.
Parameters: - bucket (string) – The bucket to fetch from.
- object (string) – The object to fetch.
- filename (string) – If set, a local file path where the file should be written to.
-
exists
(bucket, object)[source]¶ Checks for the existence of a file in Google Cloud Storage.
Parameters: - bucket (string) – The Google cloud storage bucket where the object is.
- object (string) – The name of the object to check in the Google cloud storage bucket.
-
get_crc32c
(bucket, object)[source]¶ Gets the CRC32c checksum of an object in Google Cloud Storage.
Parameters: - bucket (string) – The Google cloud storage bucket where the object is.
- object (string) – The name of the object to check in the Google cloud storage bucket.
-
get_md5hash
(bucket, object)[source]¶ Gets the MD5 hash of an object in Google Cloud Storage.
Parameters: - bucket (string) – The Google cloud storage bucket where the object is.
- object (string) – The name of the object to check in the Google cloud storage bucket.
-
get_size
(bucket, object)[source]¶ Gets the size of a file in Google Cloud Storage.
Parameters: - bucket (string) – The Google cloud storage bucket where the object is.
- object (string) – The name of the object to check in the Google cloud storage bucket.
-
insert_bucket_acl
(bucket, entity, role, user_project)[source]¶ Creates a new ACL entry on the specified bucket. See: https://cloud.google.com/storage/docs/json_api/v1/bucketAccessControls/insert
Parameters: - bucket (str) – Name of a bucket.
- entity (str) – The entity holding the permission, in one of the following forms: user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers. See: https://cloud.google.com/storage/docs/access-control/lists#scopes
- role (str) – The access permission for the entity. Acceptable values are: “OWNER”, “READER”, “WRITER”.
- user_project (str) – (Optional) The project to be billed for this request. Required for Requester Pays buckets.
-
insert_object_acl
(bucket, object_name, entity, role, generation, user_project)[source]¶ Creates a new ACL entry on the specified object. See: https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert
Parameters: - bucket (str) – Name of a bucket.
- object_name (str) – Name of the object. For information about how to URL encode object names to be path safe, see: https://cloud.google.com/storage/docs/json_api/#encoding
- entity (str) – The entity holding the permission, in one of the following forms: user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers See: https://cloud.google.com/storage/docs/access-control/lists#scopes
- role (str) – The access permission for the entity. Acceptable values are: “OWNER”, “READER”.
- generation (str) – (Optional) If present, selects a specific revision of this object (as opposed to the latest version, the default).
- user_project (str) – (Optional) The project to be billed for this request. Required for Requester Pays buckets.
-
is_updated_after
(bucket, object, ts)[source]¶ Checks if an object is updated in Google Cloud Storage.
Parameters: - bucket (string) – The Google cloud storage bucket where the object is.
- object (string) – The name of the object to check in the Google cloud storage bucket.
- ts (datetime) – The timestamp to check against.
-
list
(bucket, versions=None, maxResults=None, prefix=None, delimiter=None)[source]¶ List all objects from the bucket with the give string prefix in name
Parameters: - bucket (string) – bucket name
- versions (boolean) – if true, list all versions of the objects
- maxResults (integer) – max count of items to return in a single page of responses
- prefix (string) – prefix string which filters objects whose name begin with this prefix
- delimiter (string) – filters objects based on the delimiter (for e.g ‘.csv’)
Returns: a stream of object names matching the filtering criteria
-
rewrite
(source_bucket, source_object, destination_bucket, destination_object=None)[source]¶ Has the same functionality as copy, except that will work on files over 5 TB, as well as when copying between locations and/or storage classes.
destination_object can be omitted, in which case source_object is used.
Parameters: - source_bucket (string) – The bucket of the object to copy from.
- source_object (string) – The object to copy.
- destination_bucket (string) – The destination of the object to copied to.
- destination_object – The (renamed) path of the object if given. Can be omitted; then the same name is used.
-
upload
(bucket, object, filename, mime_type='application/octet-stream', gzip=False, multipart=False, num_retries=0)[source]¶ Uploads a local file to Google Cloud Storage.
Parameters: - bucket (string) – The bucket to upload to.
- object (string) – The object name to set when uploading the local file.
- filename (string) – The local file path to the file to be uploaded.
- mime_type (str) – The MIME type to set when uploading the file.
- gzip (bool) – Option to compress file for upload
- multipart (bool or int) – If True, the upload will be split into multiple HTTP requests. The default size is 256MiB per request. Pass a number instead of True to specify the request size, which must be a multiple of 262144 (256KiB).
- num_retries (int) – The number of times to attempt to re-upload the file (or individual chunks, in the case of multipart uploads). Retries are attempted with exponential backoff.
-
GCPTransferServiceHook¶
-
class
airflow.contrib.hooks.gcp_transfer_hook.
GCPTransferServiceHook
(api_version='v1', gcp_conn_id='google_cloud_default', delegate_to=None)[source]¶ Bases:
airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook
Hook for GCP Storage Transfer Service.
Google Kubernetes Engine¶
Google Kubernetes Engine Cluster Operators¶
- GKEClusterDeleteOperator : Creates a Kubernetes Cluster in Google Cloud Platform
- GKEPodOperator : Deletes a Kubernetes Cluster in Google Cloud Platform
GKEClusterCreateOperator¶
GKEClusterDeleteOperator¶
GKEPodOperator¶
Google Kubernetes Engine Hook¶
Qubole¶
Apache Airflow has a native operator and hooks to talk to Qubole, which lets you submit your big data jobs directly to Qubole from Apache Airflow.
QuboleOperator¶
-
class
airflow.contrib.operators.qubole_operator.
QuboleOperator
(**kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Execute tasks (commands) on QDS (https://qubole.com).
Parameters: qubole_conn_id (str) – Connection id which consists of qds auth_token - kwargs:
command_type: type of command to be executed, e.g. hivecmd, shellcmd, hadoopcmd tags: array of tags to be assigned with the command cluster_label: cluster label on which the command will be executed name: name to be given to command notify: whether to send email on command completion or not (default is False) Arguments specific to command types
- hivecmd:
query: inline query statement script_location: s3 location containing query statement sample_size: size of sample in bytes on which to run query macros: macro values which were used in query - prestocmd:
query: inline query statement script_location: s3 location containing query statement macros: macro values which were used in query - hadoopcmd:
sub_commnad: must be one these [“jar”, “s3distcp”, “streaming”] followed by 1 or more args - shellcmd:
script: inline command with args script_location: s3 location containing query statement files: list of files in s3 bucket as file1,file2 format. These files will be copied into the working directory where the qubole command is being executed. archives: list of archives in s3 bucket as archive1,archive2 format. These will be unarchived intothe working directory where the qubole command is being executed parameters: any extra args which need to be passed to script (only when script_location is supplied) - pigcmd:
script: inline query statement (latin_statements) script_location: s3 location containing pig query parameters: any extra args which need to be passed to script (only when script_location is supplied - sparkcmd:
program: the complete Spark Program in Scala, SQL, Command, R, or Python cmdline: spark-submit command line, all required information must be specify in cmdline itself. sql: inline sql query script_location: s3 location containing query statement language: language of the program, Scala, SQL, Command, R, or Python app_id: ID of an Spark job server app arguments: spark-submit command line arguments user_program_arguments: arguments that the user program takes in macros: macro values which were used in query - dbtapquerycmd:
db_tap_id: data store ID of the target database, in Qubole. query: inline query statement macros: macro values which were used in query - dbexportcmd:
mode: 1 (simple), 2 (advance) hive_table: Name of the hive table partition_spec: partition specification for Hive table. dbtap_id: data store ID of the target database, in Qubole. db_table: name of the db table db_update_mode: allowinsert or updateonly db_update_keys: columns used to determine the uniqueness of rows export_dir: HDFS/S3 location from which data will be exported. fields_terminated_by: hex of the char used as column separator in the dataset - dbimportcmd:
mode: 1 (simple), 2 (advance) hive_table: Name of the hive table dbtap_id: data store ID of the target database, in Qubole. db_table: name of the db table where_clause: where clause, if any parallelism: number of parallel db connections to use for extracting data extract_query: SQL query to extract data from db. $CONDITIONS must be part of the where clause. boundary_query: Query to be used get range of row IDs to be extracted split_column: Column used as row ID to split data into ranges (mode 2)
Note
Following fields are template-supported :
query
,script_location
,sub_command
,script
,files
,archives
,program
,cmdline
,sql
,where_clause
,extract_query
,boundary_query
,macros
,tags
,name
,parameters
,dbtap_id
,hive_table
,db_table
,split_column
,note_id
,db_update_keys
,export_dir
,partition_spec
,qubole_conn_id
,arguments
,user_program_arguments
.You can also use.txt
files for template driven use cases.Note
In QuboleOperator there is a default handler for task failures and retries, which generally kills the command running at QDS for the corresponding task instance. You can override this behavior by providing your own failure and retry handler in task definition.
QubolePartitionSensor¶
-
class
airflow.contrib.sensors.qubole_sensor.
QubolePartitionSensor
(**kwargs)[source]¶ Bases:
airflow.contrib.sensors.qubole_sensor.QuboleSensor
Wait for a Hive partition to show up in QHS (Qubole Hive Service) and check for its presence via QDS APIs
Parameters: - qubole_conn_id (str) – Connection id which consists of qds auth_token
- data (a JSON object) – a JSON object containing payload, whose presence needs to be checked. Check this example for sample payload structure.
Note
Both
data
andqubole_conn_id
fields support templating. You can also use.txt
files for template-driven use cases.
QuboleFileSensor¶
-
class
airflow.contrib.sensors.qubole_sensor.
QuboleFileSensor
(**kwargs)[source]¶ Bases:
airflow.contrib.sensors.qubole_sensor.QuboleSensor
Wait for a file or folder to be present in cloud storage and check for its presence via QDS APIs
Parameters: - qubole_conn_id (str) – Connection id which consists of qds auth_token
- data (a JSON object) –
a JSON object containing payload, whose presence needs to be checked Check this example for sample payload structure.
Note
Both
data
andqubole_conn_id
fields support templating. You can also use.txt
files for template-driven use cases.
QuboleCheckOperator¶
-
class
airflow.contrib.operators.qubole_check_operator.
QuboleCheckOperator
(**kwargs)[source]¶ Bases:
airflow.operators.check_operator.CheckOperator
,airflow.contrib.operators.qubole_operator.QuboleOperator
Performs checks against Qubole Commands.
QuboleCheckOperator
expects a command that will be executed on QDS. By default, each value on first row of the result of this Qubole Command is evaluated using pythonbool
casting. If any of the values returnFalse
, the check is failed and errors out.Note that Python bool casting evals the following as
False
:False
0
- Empty string (
""
) - Empty list (
[]
) - Empty dictionary or set (
{}
)
Given a query like
SELECT COUNT(*) FROM foo
, it will fail only if the count== 0
. You can craft much more complex query that could, for instance, check that the table has the same number of rows as the source table upstream, or that the count of today’s partition is greater than yesterday’s partition, or that a set of metrics are less than 3 standard deviation for the 7 day average.This operator can be used as a data quality check in your pipeline, and depending on where you put it in your DAG, you have the choice to stop the critical path, preventing from publishing dubious data, or on the side and receive email alerts without stopping the progress of the DAG.
Parameters: qubole_conn_id (str) – Connection id which consists of qds auth_token kwargs:
Arguments specific to Qubole command can be referred from QuboleOperator docs.
results_parser_callable: This is an optional parameter to extend the flexibility of parsing the results of Qubole command to the users. This is a python callable which can hold the logic to parse list of rows returned by Qubole command. By default, only the values on first row are used for performing checks. This callable should return a list of records on which the checks have to be performed. Note
All fields in common with template fields of QuboleOperator and CheckOperator are template-supported.
QuboleValueCheckOperator¶
-
class
airflow.contrib.operators.qubole_check_operator.
QuboleValueCheckOperator
(**kwargs)[source]¶ Bases:
airflow.operators.check_operator.ValueCheckOperator
,airflow.contrib.operators.qubole_operator.QuboleOperator
Performs a simple value check using Qubole command. By default, each value on the first row of this Qubole command is compared with a pre-defined value. The check fails and errors out if the output of the command is not within the permissible limit of expected value.
Parameters: - qubole_conn_id (str) – Connection id which consists of qds auth_token
- pass_value (str/int/float) – Expected value of the query results.
- tolerance (int/float) – Defines the permissible pass_value range, for example if tolerance is 2, the Qubole command output can be anything between -2*pass_value and 2*pass_value, without the operator erring out.
kwargs:
Arguments specific to Qubole command can be referred from QuboleOperator docs.
results_parser_callable: This is an optional parameter to extend the flexibility of parsing the results of Qubole command to the users. This is a python callable which can hold the logic to parse list of rows returned by Qubole command. By default, only the values on first row are used for performing checks. This callable should return a list of records on which the checks have to be performed. Note
All fields in common with template fields of QuboleOperator and ValueCheckOperator are template-supported.