Google Dataplex Operators¶
Dataplex is an intelligent data fabric that provides unified analytics and data management across your data lakes, data warehouses, and data marts.
For more information about the task visit Dataplex production documentation <Product documentation
Create a Task¶
Before you create a dataplex task you need to define its body. For more information about the available fields to pass when creating a task, visit Dataplex create task API.
A simple task configuration can look as followed:
EXAMPLE_TASK_BODY = {
"trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
"execution_spec": {"service_account": SERVICE_ACC},
"spark": {"python_script_file": SPARK_FILE_FULL_PATH},
}
With this configuration we can create the task both synchronously & asynchronously:
DataplexCreateTaskOperator
create_dataplex_task = DataplexCreateTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_TASK_BODY,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="create_dataplex_task",
)
create_dataplex_task_async = DataplexCreateTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_TASK_BODY,
dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
asynchronous=True,
task_id="create_dataplex_task_async",
)
Delete a task¶
To delete a task you can use:
delete_dataplex_task_async = DataplexDeleteTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
task_id="delete_dataplex_task_async",
)
List tasks¶
To list tasks you can use:
list_dataplex_task = DataplexListTasksOperator(
project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, task_id="list_dataplex_task"
)
Get a task¶
To get a task you can use:
get_dataplex_task = DataplexGetTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="get_dataplex_task",
)
Wait for a task¶
To wait for a task created asynchronously you can use:
dataplex_task_state = DataplexTaskStateSensor(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="dataplex_task_state",
)
Create a Lake¶
Before you create a dataplex lake you need to define its body.
For more information about the available fields to pass when creating a lake, visit Dataplex create lake API.
A simple task configuration can look as followed:
EXAMPLE_LAKE_BODY = {
"display_name": "test_display_name",
"labels": [],
"description": "test_description",
"metastore": {"service": ""},
}
With this configuration we can create the lake:
create_lake = DataplexCreateLakeOperator(
project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake"
)
Delete a lake¶
To delete a lake you can use:
delete_lake = DataplexDeleteLakeOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
task_id="delete_lake",
trigger_rule=TriggerRule.ALL_DONE,
)
Create or update a Data Quality scan¶
Before you create a Dataplex Data Quality scan you need to define its body. For more information about the available fields to pass when creating a Data Quality scan, visit Dataplex create data quality API.
A simple Data Quality scan configuration can look as followed:
EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_quality_spec = DataQualitySpec(
{
"rules": [
{
"range_expectation": {
"min_value": "0",
"max_value": "10000",
},
"column": "value",
"dimension": "VALIDITY",
}
],
}
)
With this configuration we can create or update the Data Quality scan:
DataplexCreateOrUpdateDataQualityScanOperator
create_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
task_id="create_data_scan",
project_id=PROJECT_ID,
region=REGION,
body=EXAMPLE_DATA_SCAN,
data_scan_id=DATA_SCAN_ID,
)
Get a Data Quality scan¶
To get a Data Quality scan you can use:
DataplexGetDataQualityScanOperator
get_data_scan = DataplexGetDataQualityScanOperator(
task_id="get_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
Delete a Data Quality scan¶
To delete a Data Quality scan you can use:
DataplexDeleteDataQualityScanOperator
delete_data_scan = DataplexDeleteDataQualityScanOperator(
task_id="delete_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
Run a Data Quality scan¶
You can run Dataplex Data Quality scan in asynchronous modes to later check its status using sensor:
DataplexRunDataQualityScanOperator
run_data_scan_async = DataplexRunDataQualityScanOperator(
task_id="run_data_scan_async",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
asynchronous=True,
)
To check that running Dataplex Data Quality scan succeeded you can use:
DataplexDataQualityJobStatusSensor
.
get_data_scan_job_status = DataplexDataQualityJobStatusSensor(
task_id="get_data_scan_job_status",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)
Also for this action you can use operator in the deferrable mode:
run_data_scan_def = DataplexRunDataQualityScanOperator(
task_id="run_data_scan_def",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
deferrable=True,
)
Get a Data Quality scan job¶
To get a Data Quality scan job you can use:
DataplexGetDataQualityScanResultOperator
get_data_scan_job_result_2 = DataplexGetDataQualityScanResultOperator(
task_id="get_data_scan_job_result_2",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
Also for this action you can use operator in the deferrable mode:
get_data_scan_job_result_def = DataplexGetDataQualityScanResultOperator(
task_id="get_data_scan_job_result_def",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
deferrable=True,
)
Create a zone¶
Before you create a Dataplex zone you need to define its body.
For more information about the available fields to pass when creating a zone, visit Dataplex create zone API.
A simple zone configuration can look as followed:
EXAMPLE_ZONE = {
"type_": "RAW",
"resource_spec": {"location_type": "SINGLE_REGION"},
}
With this configuration we can create a zone:
create_zone = DataplexCreateZoneOperator(
task_id="create_zone",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_ZONE,
zone_id=ZONE_ID,
)
Delete a zone¶
To delete a zone you can use:
delete_zone = DataplexDeleteZoneOperator(
task_id="delete_zone",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
)
Create a asset¶
Before you create a Dataplex asset you need to define its body.
For more information about the available fields to pass when creating a asset, visit Dataplex create asset API.
A simple asset configuration can look as followed:
EXAMPLE_ASSET = {
"resource_spec": {"name": f"projects/{PROJECT_ID}/datasets/{DATASET_NAME}", "type_": "BIGQUERY_DATASET"},
"discovery_spec": {"enabled": True},
}
With this configuration we can create the asset:
create_asset = DataplexCreateAssetOperator(
task_id="create_asset",
project_id=PROJECT_ID,
region=REGION,
body=EXAMPLE_ASSET,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
asset_id=ASSET_ID,
)
Delete a asset¶
To delete a asset you can use:
delete_asset = DataplexDeleteAssetOperator(
task_id="delete_asset",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
asset_id=ASSET_ID,
)
Create or update a Data Profile scan¶
Before you create a Dataplex Data Profile scan you need to define its body. For more information about the available fields to pass when creating a Data Profile scan, visit Dataplex create data profile API.
A simple Data Profile scan configuration can look as followed:
EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_profile_spec = DataProfileSpec({})
With this configuration we can create or update the Data Profile scan:
DataplexCreateOrUpdateDataProfileScanOperator
create_data_scan = DataplexCreateOrUpdateDataProfileScanOperator(
task_id="create_data_scan",
project_id=PROJECT_ID,
region=REGION,
body=EXAMPLE_DATA_SCAN,
data_scan_id=DATA_SCAN_ID,
)
Get a Data Profile scan¶
To get a Data Profile scan you can use:
DataplexGetDataProfileScanOperator
get_data_scan = DataplexGetDataProfileScanOperator(
task_id="get_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
Delete a Data Profile scan¶
To delete a Data Profile scan you can use:
DataplexDeleteDataProfileScanOperator
delete_data_scan = DataplexDeleteDataProfileScanOperator(
task_id="delete_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
Run a Data Profile scan¶
You can run Dataplex Data Profile scan in asynchronous modes to later check its status using sensor:
DataplexRunDataProfileScanOperator
run_data_scan_async = DataplexRunDataProfileScanOperator(
task_id="run_data_scan_async",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
asynchronous=True,
)
To check that running Dataplex Data Profile scan succeeded you can use:
DataplexDataProfileJobStatusSensor
.
get_data_scan_job_status = DataplexDataProfileJobStatusSensor(
task_id="get_data_scan_job_status",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)
Also for this action you can use operator in the deferrable mode:
run_data_scan_def = DataplexRunDataProfileScanOperator(
task_id="run_data_scan_def",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
deferrable=True,
)
Get a Data Profile scan job¶
To get a Data Profile scan job you can use:
DataplexGetDataProfileScanResultOperator
get_data_scan_job_result_2 = DataplexGetDataProfileScanResultOperator(
task_id="get_data_scan_job_result_2",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)