Google Cloud Vision Operators¶
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
CloudVisionAddProductToProductSetOperator¶
Creates a new ReferenceImage resource.
For parameter definition, take a look at
CloudVisionAddProductToProductSetOperator
Using the operator¶
We are using the Product,
ProductSet and Retry objects from
Google libraries:
from google.api_core.retry import Retry # isort:skip
from google.cloud.vision_v1.types import ProductSet # isort:skip
from google.cloud.vision_v1.types import Product # isort:skip
If product_set_id and product_id was generated by the API it can be extracted from XCOM:
add_product_to_product_set = CloudVisionAddProductToProductSetOperator(
location=LOCATION,
product_set_id=product_set_create_output,
product_id="{{ task_instance.xcom_pull('product_create') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="add_product_to_product_set",
)
Otherwise it can be specified explicitly:
add_product_to_product_set_2 = CloudVisionAddProductToProductSetOperator(
location=LOCATION,
product_set_id=GCP_VISION_PRODUCT_SET_ID,
product_id=GCP_VISION_PRODUCT_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id="add_product_to_product_set_2",
)
Templating¶
template_fields: Sequence[str] = (
"location",
"product_set_id",
"product_id",
"project_id",
"gcp_conn_id",
"impersonation_chain",
)
More information¶
See Google Cloud Vision Add Product To Product Set documentation.
CloudVisionImageAnnotateOperator¶
Run image detection and annotation for an image.
For parameter definition, take a look at
CloudVisionImageAnnotateOperator
Using the operator¶
We are using the enums and Retry objects from
Google libraries:
from google.api_core.retry import Retry # isort:skip
from google.cloud.vision_v1 import Feature # isort:skip
from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
annotate_image = CloudVisionImageAnnotateOperator(
request=annotate_image_request,
retry=Retry(maximum=10.0),
timeout=5,
task_id="annotate_image",
)
The result can be extracted from XCOM:
annotate_image_result = BashOperator(
bash_command="echo {{ task_instance.xcom_pull('annotate_image')"
"['logoAnnotations'][0]['description'] }}",
task_id="annotate_image_result",
)
Templating¶
template_fields: Sequence[str] = (
"request",
"gcp_conn_id",
"impersonation_chain",
)
More information¶
CloudVisionCreateProductOperator¶
Creates and returns a new product resource.
Possible errors regarding the Product object provided:
Returns INVALID_ARGUMENT if
display_nameis missing or longer than 4096 characters.Returns INVALID_ARGUMENT if
descriptionis longer than 4096 characters.Returns INVALID_ARGUMENT if
product_categoryis missing or invalid.
For parameter definition, take a look at
CloudVisionCreateProductOperator
Using the operator¶
We are using the Product and Retry objects from Google libraries:
from google.cloud.vision_v1.types import Product # isort:skip
from google.api_core.retry import Retry # isort:skip
product = Product(display_name="My Product 1", product_category="toys")
The product_id argument can be omitted (it will be generated by the API):
product_create = CloudVisionCreateProductOperator(
location=LOCATION,
product=product,
retry=Retry(maximum=10.0),
timeout=5,
task_id="product_create",
)
Or it can be specified explicitly:
product_create_2 = CloudVisionCreateProductOperator(
product_id=GCP_VISION_PRODUCT_ID,
location=LOCATION,
product=product,
retry=Retry(maximum=10.0),
timeout=5,
task_id="product_create_2",
)
Templating¶
template_fields: Sequence[str] = (
"location",
"project_id",
"product_id",
"gcp_conn_id",
"impersonation_chain",
)
More information¶
CloudVisionDeleteProductOperator¶
Permanently deletes a product and its reference images.
Metadata of the product and all its images will be deleted right away, but search queries
against ProductSets containing the product may still work until all related
caches are refreshed.
Possible errors:
Returns NOT_FOUND if the product does not exist.
For parameter definition, take a look at
CloudVisionDeleteProductOperator
Using the operator¶
If product_id was generated by the API it can be extracted from XCOM:
product_delete = CloudVisionDeleteProductOperator(
location=LOCATION,
product_id="{{ task_instance.xcom_pull('product_create') }}",
task_id="product_delete",
)
Otherwise it can be specified explicitly:
product_delete_2 = CloudVisionDeleteProductOperator(
location=LOCATION, product_id=GCP_VISION_PRODUCT_ID, task_id="product_delete_2"
)
Templating¶
template_fields: Sequence[str] = (
"location",
"project_id",
"product_id",
"gcp_conn_id",
"impersonation_chain",
)
More information¶
CloudVisionGetProductOperator¶
Gets information associated with a Product.
Possible errors:
Returns NOT_FOUND if the
Productdoes not exist.
For parameter definition, take a look at
CloudVisionGetProductOperator
Using the operator¶
If product_id was generated by the API it can be extracted from XCOM:
product_get = CloudVisionGetProductOperator(
location=LOCATION,
product_id="{{ task_instance.xcom_pull('product_create') }}",
task_id="product_get",
)
Otherwise it can be specified explicitly:
product_get_2 = CloudVisionGetProductOperator(
location=LOCATION, product_id=GCP_VISION_PRODUCT_ID, task_id="product_get_2"
)
Templating¶
template_fields: Sequence[str] = (
"location",
"project_id",
"product_id",
"gcp_conn_id",
"impersonation_chain",
)
More information¶
CloudVisionProductSetCreateOperator¶
Creates a new ProductSet resource.
For parameter definition, take a look at
CloudVisionCreateProductSetOperator
Using the operator¶
We are using the ProductSet and Retry objects from Google libraries:
from google.cloud.vision_v1.types import ProductSet # isort:skip
from google.api_core.retry import Retry # isort:skip
product_set = ProductSet(display_name="My Product Set")
The product_set_id argument can be omitted (it will be generated by the API):
product_set_create = CloudVisionCreateProductSetOperator(
location=LOCATION,
product_set=product_set,
retry=Retry(maximum=10.0),
timeout=5,
task_id="product_set_create",
)
Or it can be specified explicitly:
product_set_create_2 = CloudVisionCreateProductSetOperator(
product_set_id=GCP_VISION_PRODUCT_SET_ID,
location=LOCATION,
product_set=product_set,
retry=Retry(maximum=10.0),
timeout=5,
task_id="product_set_create_2",
)
Templating¶
template_fields: Sequence[str] = (
"location",
"project_id",
"product_set_id",
"gcp_conn_id",
"impersonation_chain",
)
More information¶
CloudVisionDeleteProductSetOperator¶
Permanently deletes a ProductSet. Products and ReferenceImages in
the ProductSet are not deleted. The actual image files are not deleted from
Google Cloud Storage.
For parameter definition, take a look at
CloudVisionDeleteProductSetOperator
Using the operator¶
If product_set_id was generated by the API it can be extracted from XCOM:
product_set_delete = CloudVisionDeleteProductSetOperator(
location=LOCATION,
product_set_id=product_set_create_output,
task_id="product_set_delete",
)
Otherwise it can be specified explicitly:
product_set_delete_2 = CloudVisionDeleteProductSetOperator(
location=LOCATION, product_set_id=GCP_VISION_PRODUCT_SET_ID, task_id="product_set_delete_2"
)
Templating¶
template_fields: Sequence[str] = (
"location",
"project_id",
"product_set_id",
"gcp_conn_id",
"impersonation_chain",
)
More information¶
CloudVisionGetProductSetOperator¶
Gets information associated with a ProductSet.
For parameter definition, take a look at
CloudVisionGetProductSetOperator
Using the operator¶
If product_set_id was generated by the API it can be extracted from XCOM:
product_set_get = CloudVisionGetProductSetOperator(
location=LOCATION,
product_set_id=product_set_create_output,
task_id="product_set_get",
)
Otherwise it can be specified explicitly:
product_set_get_2 = CloudVisionGetProductSetOperator(
location=LOCATION, product_set_id=GCP_VISION_PRODUCT_SET_ID, task_id="product_set_get_2"
)
Templating¶
template_fields: Sequence[str] = (
"location",
"project_id",
"product_set_id",
"gcp_conn_id",
"impersonation_chain",
)
More information¶
CloudVisionUpdateProductSetOperator¶
Makes changes to a ProductSet resource. Only display_name can be updated
currently.
Note
To locate the ProductSet resource, its name in the form
projects/PROJECT_ID/locations/LOC_ID/productSets/PRODUCT_SET_ID is necessary.
You can provide the name directly as an attribute of the product_set object.
However, you can leave it blank and provide location and product_set_id instead (and
optionally project_id - if not present, the connection default will be used) and the
name will be created by the operator itself.
This mechanism exists for your convenience, to allow leaving the project_id empty and
having Airflow use the connection default project_id.
For parameter definition, take a look at
CloudVisionUpdateProductSetOperator
Using the operator¶
We are using the ProductSet object from the Google Cloud Vision library:
from google.cloud.vision_v1.types import ProductSet # isort:skip
product_set = ProductSet(display_name="My Product Set")
Initialization of the task:
If product_set_id was generated by the API it can be extracted from XCOM:
product_set_update = CloudVisionUpdateProductSetOperator(
location=LOCATION,
product_set_id=product_set_create_output,
product_set=ProductSet(display_name="My Product Set 2"),
task_id="product_set_update",
)
Otherwise it can be specified explicitly:
product_set_update_2 = CloudVisionUpdateProductSetOperator(
location=LOCATION,
product_set_id=GCP_VISION_PRODUCT_SET_ID,
product_set=ProductSet(display_name="My Product Set 2"),
task_id="product_set_update_2",
)
Templating¶
template_fields: Sequence[str] = (
"location",
"project_id",
"product_set_id",
"gcp_conn_id",
"impersonation_chain",
)
More information¶
CloudVisionUpdateProductOperator¶
Makes changes to a Product resource. Only the display_name,
description, and labels fields can be updated right now.
If labels are updated, the change will not be reflected in queries until the next index
time.
Note
To locate the Product resource, its name in the form
projects/PROJECT_ID/locations/LOC_ID/products/PRODUCT_ID is necessary.
You can provide the name directly as an attribute of the product object. However, you
can leave it blank and provide location and product_id instead (and optionally
project_id - if not present, the connection default will be used) and the name will
be created by the operator itself.
This mechanism exists for your convenience, to allow leaving the project_id empty and
having Airflow use the connection default project_id.
Possible errors:
Returns NOT_FOUND if the
Productdoes not exist.Returns INVALID_ARGUMENT if
display_nameis present inupdate_maskbut is missing from the request or longer than 4096 characters.Returns INVALID_ARGUMENT if
descriptionis present inupdate_maskbut is longer than 4096 characters.Returns INVALID_ARGUMENT if
product_categoryis present inupdate_mask.
For parameter definition, take a look at
CloudVisionUpdateProductOperator
Using the operator¶
We are using the Product object from the Google Cloud Vision library:
from google.cloud.vision_v1.types import Product # isort:skip
product = Product(display_name="My Product 1", product_category="toys")
If product_id was generated by the API it can be extracted from XCOM:
product_update = CloudVisionUpdateProductOperator(
location=LOCATION,
product_id="{{ task_instance.xcom_pull('product_create') }}",
product=Product(display_name="My Product 2", description="My updated description"),
task_id="product_update",
)
Otherwise it can be specified explicitly:
product_update_2 = CloudVisionUpdateProductOperator(
location=LOCATION,
product_id=GCP_VISION_PRODUCT_ID,
product=Product(display_name="My Product 2", description="My updated description"),
task_id="product_update_2",
)
Templating¶
template_fields: Sequence[str] = (
"location",
"project_id",
"product_id",
"gcp_conn_id",
"impersonation_chain",
)
More information¶
CloudVisionCreateReferenceImageOperator¶
Creates a new ReferenceImage resource.
For parameter definition, take a look at
CloudVisionCreateReferenceImageOperator
Using the operator¶
We are using the ReferenceImage and Retry objects from Google libraries:
from google.cloud.vision_v1.types import ReferenceImage # isort:skip
from google.api_core.retry import Retry # isort:skip
reference_image = ReferenceImage(uri=VISION_IMAGE_URL)
The product_set_id argument can be omitted (it will be generated by the API):
reference_image_create = CloudVisionCreateReferenceImageOperator(
location=LOCATION,
reference_image=reference_image,
product_id="{{ task_instance.xcom_pull('product_create') }}",
reference_image_id=GCP_VISION_REFERENCE_IMAGE_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id="reference_image_create",
)
Or it can be specified explicitly:
reference_image_create_2 = CloudVisionCreateReferenceImageOperator(
location=LOCATION,
reference_image=reference_image,
product_id=GCP_VISION_PRODUCT_ID,
reference_image_id=GCP_VISION_REFERENCE_IMAGE_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id="reference_image_create_2",
)
Templating¶
template_fields: Sequence[str] = (
"location",
"reference_image",
"product_id",
"reference_image_id",
"project_id",
"gcp_conn_id",
"impersonation_chain",
)
More information¶
See Google Cloud Vision ReferenceImage delete documentation.
CloudVisionDeleteReferenceImageOperator¶
Deletes a ReferenceImage resource.
For parameter definition, take a look at
CloudVisionDeleteReferenceImageOperator
Using the operator¶
We are using the ReferenceImage and Retry objects from Google libraries:
from google.cloud.vision_v1.types import ReferenceImage # isort:skip
from google.api_core.retry import Retry # isort:skip
reference_image = ReferenceImage(uri=VISION_IMAGE_URL)
The product_set_id argument can be omitted (it will be generated by the API):
reference_image_delete = CloudVisionDeleteReferenceImageOperator(
location=LOCATION,
product_id="{{ task_instance.xcom_pull('product_create') }}",
reference_image_id=GCP_VISION_REFERENCE_IMAGE_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id="reference_image_delete",
)
Or it can be specified explicitly:
reference_image_delete_2 = CloudVisionDeleteReferenceImageOperator(
location=LOCATION,
reference_image_id=GCP_VISION_REFERENCE_IMAGE_ID,
product_id=GCP_VISION_PRODUCT_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id="reference_image_delete_2",
)
Templating¶
template_fields: Sequence[str] = (
"location",
"reference_image",
"product_id",
"reference_image_id",
"project_id",
"gcp_conn_id",
"impersonation_chain",
)
More information¶
See Google Cloud Vision ReferenceImage create documentation.
CloudVisionRemoveProductFromProductSetOperator¶
Creates a new ReferenceImage resource.
For parameter definition, take a look at
CloudVisionRemoveProductFromProductSetOperator
Using the operator¶
We are using the Product,
ProductSet and Retry objects from
Google libraries:
from google.api_core.retry import Retry # isort:skip
from google.cloud.vision_v1.types import ProductSet # isort:skip
from google.cloud.vision_v1.types import Product # isort:skip
If product_set_id and product_id was generated by the API it can be extracted from XCOM:
remove_product_from_product_set = CloudVisionRemoveProductFromProductSetOperator(
location=LOCATION,
product_set_id=product_set_create_output,
product_id="{{ task_instance.xcom_pull('product_create') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="remove_product_from_product_set",
)
Otherwise it can be specified explicitly:
remove_product_from_product_set_2 = CloudVisionRemoveProductFromProductSetOperator(
location=LOCATION,
product_set_id=GCP_VISION_PRODUCT_SET_ID,
product_id=GCP_VISION_PRODUCT_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id="remove_product_from_product_set_2",
)
Templating¶
template_fields: Sequence[str] = (
"location",
"product_set_id",
"product_id",
"project_id",
"gcp_conn_id",
"impersonation_chain",
)
More information¶
See Google Cloud Vision Remove Product From Product Set documentation.
CloudVisionDetectTextOperator¶
Run text detection for an image.
For parameter definition, take a look at
CloudVisionDetectTextOperator
Using the operator¶
We are using the Retry objects from
Google libraries:
from google.api_core.retry import Retry # isort:skip
detect_text = CloudVisionDetectTextOperator(
image=DETECT_IMAGE,
retry=Retry(maximum=10.0),
timeout=5,
task_id="detect_text",
language_hints="en",
web_detection_params={"include_geo_results": True},
)
The result can be extracted from XCOM:
detect_text_result = BashOperator(
bash_command="echo {{ task_instance.xcom_pull('detect_text')['textAnnotations'][0] }}",
task_id="detect_text_result",
)
Templating¶
template_fields: Sequence[str] = (
"image",
"max_results",
"timeout",
"gcp_conn_id",
"impersonation_chain",
)
More information¶
See Google Cloud Vision Text Detection documentation.
CloudVisionTextDetectOperator¶
Run document text detection for an image.
For parameter definition, take a look at
CloudVisionTextDetectOperator
Using the operator¶
We are using the Retry objects from
Google libraries:
from google.api_core.retry import Retry # isort:skip
document_detect_text = CloudVisionTextDetectOperator(
image=DETECT_IMAGE, retry=Retry(maximum=10.0), timeout=5, task_id="document_detect_text"
)
The result can be extracted from XCOM:
document_detect_text_result = BashOperator(
bash_command="echo {{ task_instance.xcom_pull('document_detect_text')['textAnnotations'][0] }}",
task_id="document_detect_text_result",
)
Templating¶
template_fields: Sequence[str] = (
"image",
"max_results",
"timeout",
"gcp_conn_id",
"impersonation_chain",
) # Iterable[str]
More information¶
See Google Cloud Vision Document Text Detection documentation.
CloudVisionDetectImageLabelsOperator¶
Run image label detection for an image.
For parameter definition, take a look at
CloudVisionDetectImageLabelsOperator
Using the operator¶
We are using the Retry objects from
Google libraries:
from google.api_core.retry import Retry # isort:skip
detect_labels = CloudVisionDetectImageLabelsOperator(
image=DETECT_IMAGE, retry=Retry(maximum=10.0), timeout=5, task_id="detect_labels"
)
The result can be extracted from XCOM:
detect_labels_result = BashOperator(
bash_command="echo {{ task_instance.xcom_pull('detect_labels')['labelAnnotations'][0] }}",
task_id="detect_labels_result",
)
Templating¶
template_fields: Sequence[str] = (
"image",
"max_results",
"timeout",
"gcp_conn_id",
"impersonation_chain",
)
More information¶
See Google Cloud Vision Label Detection documentation.
CloudVisionDetectImageSafeSearchOperator¶
Run image label detection for an image.
For parameter definition, take a look at
CloudVisionDetectImageSafeSearchOperator
Using the operator¶
We are using the Retry objects from
Google libraries:
from google.api_core.retry import Retry # isort:skip
detect_safe_search = CloudVisionDetectImageSafeSearchOperator(
image=DETECT_IMAGE, retry=Retry(maximum=10.0), timeout=5, task_id="detect_safe_search"
)
The result can be extracted from XCOM:
detect_safe_search_result = BashOperator(
bash_command=f"echo {detect_safe_search.output}",
task_id="detect_safe_search_result",
)
Templating¶
template_fields: Sequence[str] = (
"image",
"max_results",
"timeout",
"gcp_conn_id",
"impersonation_chain",
)
More information¶
See Google Cloud Vision Safe Search Detection documentation.