Object Storage¶
This tutorial shows how to use the Object Storage API to manage objects that reside on object storage, like S3, gcs and azure blob storage. The API is introduced as part of Airflow 2.8.
The tutorial covers a simple pattern that is often used in data engineering and
data science workflows: accessing a web api, saving and analyzing the result. For the
tutorial to work you will need to have Duck DB installed, which is a in-process
analytical database. You can do this by running pip install duckdb
. The tutorial
makes use of S3 Object Storage. This requires that the amazon provider is installed
including s3fs
by running pip install apache-airflow-providers-amazon[s3fs]
.
If you would like to use a different storage provider, you can do so by changing the
URL in the create_object_storage_path
function to the appropriate URL for your
provider, for example by replacing s3://
with gs://
for Google Cloud Storage.
You will also need the right provider to be installed then. Finally, you will need
pandas
, which can be installed by running pip install pandas
.
Creating an ObjectStoragePath¶
The ObjectStoragePath is a path-like object that represents a path on object storage. It is the fundamental building block of the Object Storage API.
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")
The username part of the URL given to ObjectStoragePath should be a connection ID. The specified connection will be used to obtain the right credentials to access the backend. If it is omitted, the default connection for the backend will be used.
The connection ID can alternatively be passed in with a keyword argument:
ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")
This is useful when reusing a URL defined for another purpose (e.g. Dataset), which generally does not contain a username part. The explicit keyword argument takes precedence over the URL’s username value if both are specified.
It is safe to instantiate an ObjectStoragePath at the root of your DAG. Connections will not be created until the path is used. This means that you can create the path in the global scope of your DAG and use it in multiple tasks.
Saving data to Object Storage¶
An ObjectStoragePath behaves mostly like a pathlib.Path object. You can use it to save and load data directly to and from object storage. So, a typical flow could look like this:
@task
def get_air_quality_data(**kwargs) -> ObjectStoragePath:
"""
#### Get Air Quality Data
This task gets air quality data from the Finnish Meteorological Institute's
open data API. The data is saved as parquet.
"""
import pandas as pd
execution_date = kwargs["logical_date"]
start_time = kwargs["data_interval_start"]
params = {
"format": "json",
"precision": "double",
"groupareas": "0",
"producer": "airquality_urban",
"area": "Uusimaa",
"param": ",".join(aq_fields.keys()),
"starttime": start_time.isoformat(timespec="seconds"),
"endtime": execution_date.isoformat(timespec="seconds"),
"tz": "UTC",
}
response = requests.get(API, params=params)
response.raise_for_status()
# ensure the bucket exists
base.mkdir(exist_ok=True)
formatted_date = execution_date.format("YYYYMMDD")
path = base / f"air_quality_{formatted_date}.parquet"
df = pd.DataFrame(response.json()).astype(aq_fields)
with path.open("wb") as file:
df.to_parquet(file)
return path
The get_air_quality_data
calls the API of the Finnish Meteorological Institute
to obtain the air quality data for the region of Helsinki. It creates a
Pandas DataFrame from the resulting json. It then saves the data to object storage
and converts it on the fly to parquet.
The key of the object is automatically generated from the logical date of the task, so we could run this everyday and it would create a new object for each day. We concatenate this key with the base path to create the full path to the object. Finally, after writing the object to storage, we return the path to the object. This allows us to use the path in the next task.
Analyzing the data¶
In understanding the data, you typically want to analyze it. Duck DB is a great tool for this. It is an in-process analytical database that allows you to run SQL queries on data in memory.
Because the data is already in parquet format, we can use the read_parquet
and
because both Duck DB and the ObjectStoragePath use fsspec
we can register the
backend of the ObjectStoragePath with Duck DB. ObjectStoragePath exposes the fs
property for this. We can then use the register_filesystem
function from Duck DB
to register the backend with Duck DB.
In Duck DB we can then create a table from the data and run a query on it. The query is returned as a dataframe, which could be used for further analysis or saved to object storage.
@task
def analyze(path: ObjectStoragePath, **kwargs):
"""
#### Analyze
This task analyzes the air quality data, prints the results
"""
import duckdb
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")
df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()
print(df2.head())
You might note that the analyze
function does not know the original
path to the object, but that it is passed in as a parameter and obtained
through XCom. You do not need to re-instantiate the Path object. Also
the connection details are handled transparently.
Putting it all together¶
The final DAG looks like this, which wraps things so that we can run it:
import pendulum
import requests
from airflow.decorators import dag, task
from airflow.io.path import ObjectStoragePath
API = "https://opendata.fmi.fi/timeseries"
aq_fields = {
"fmisid": "int32",
"time": "datetime64[ns]",
"AQINDEX_PT1H_avg": "float64",
"PM10_PT1H_avg": "float64",
"PM25_PT1H_avg": "float64",
"O3_PT1H_avg": "float64",
"CO_PT1H_avg": "float64",
"SO2_PT1H_avg": "float64",
"NO2_PT1H_avg": "float64",
"TRSC_PT1H_avg": "float64",
}
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def tutorial_objectstorage():
"""
### Object Storage Tutorial Documentation
This is a tutorial DAG to showcase the usage of the Object Storage API.
Documentation that goes along with the Airflow Object Storage tutorial is
located
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html)
"""
@task
def get_air_quality_data(**kwargs) -> ObjectStoragePath:
"""
#### Get Air Quality Data
This task gets air quality data from the Finnish Meteorological Institute's
open data API. The data is saved as parquet.
"""
import pandas as pd
execution_date = kwargs["logical_date"]
start_time = kwargs["data_interval_start"]
params = {
"format": "json",
"precision": "double",
"groupareas": "0",
"producer": "airquality_urban",
"area": "Uusimaa",
"param": ",".join(aq_fields.keys()),
"starttime": start_time.isoformat(timespec="seconds"),
"endtime": execution_date.isoformat(timespec="seconds"),
"tz": "UTC",
}
response = requests.get(API, params=params)
response.raise_for_status()
# ensure the bucket exists
base.mkdir(exist_ok=True)
formatted_date = execution_date.format("YYYYMMDD")
path = base / f"air_quality_{formatted_date}.parquet"
df = pd.DataFrame(response.json()).astype(aq_fields)
with path.open("wb") as file:
df.to_parquet(file)
return path
@task
def analyze(path: ObjectStoragePath, **kwargs):
"""
#### Analyze
This task analyzes the air quality data, prints the results
"""
import duckdb
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")
df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()
print(df2.head())
obj_path = get_air_quality_data()
analyze(obj_path)
tutorial_objectstorage()