Microsoft Graph API Operators

Prerequisite Tasks

To use these operators, you must do a few things:

MSGraphAsyncOperator

Use the MSGraphAsyncOperator to call Microsoft Graph API.

Below is an example of using this operator to get a Sharepoint site.

tests/system/providers/microsoft/azure/example_msgraph.py[source]

    site_task = MSGraphAsyncOperator(
        task_id="news_site",
        conn_id="msgraph_api",
        url="sites/850v1v.sharepoint.com:/sites/news",
        result_processor=lambda context, response: response["id"].split(",")[1],  # only keep site_id
    )

Below is an example of using this operator to get a Sharepoint site pages.

tests/system/providers/microsoft/azure/example_msgraph.py[source]

    site_pages_task = MSGraphAsyncOperator(
        task_id="news_pages",
        conn_id="msgraph_api",
        api_version="beta",
        url=("sites/%s/pages" % "{{ ti.xcom_pull(task_ids='news_site') }}"),  # noqa: UP031
    )

Below is an example of using this operator to get PowerBI workspaces.

tests/system/providers/microsoft/azure/example_powerbi.py[source]

    workspaces_task = MSGraphAsyncOperator(
        task_id="workspaces",
        conn_id="powerbi",
        url="myorg/admin/workspaces/modified",
        result_processor=lambda context, response: list(map(lambda workspace: workspace["id"], response)),
    )

Below is an example of using this operator to get PowerBI workspaces info.

tests/system/providers/microsoft/azure/example_powerbi.py[source]

    workspaces_info_task = MSGraphAsyncOperator(
        task_id="get_workspace_info",
        conn_id="powerbi",
        url="myorg/admin/workspaces/getInfo",
        method="POST",
        query_parameters={
            "lineage": True,
            "datasourceDetails": True,
            "datasetSchema": True,
            "datasetExpressions": True,
            "getArtifactUsers": True,
        },
        data={"workspaces": workspaces_task.output},
        result_processor=lambda context, response: {"scanId": response["id"]},
    )

Below is an example of using this operator to refresh PowerBI dataset.

tests/system/providers/microsoft/azure/example_powerbi.py[source]

    refresh_dataset_task = MSGraphAsyncOperator(
        task_id="refresh_dataset",
        conn_id="powerbi_api",
        url="myorg/groups/{workspaceId}/datasets/{datasetId}/refreshes",
        method="POST",
        path_parameters={
            "workspaceId": "9a7e14c6-9a7d-4b4c-b0f2-799a85e60a51",
            "datasetId": "ffb6096e-d409-4826-aaeb-b5d4b165dc4d",
        },
        data={"type": "full"},  # Needed for enhanced refresh
        result_processor=lambda context, response: response["requestid"],
    )

    refresh_dataset_history_task = MSGraphSensor(
        task_id="refresh_dataset_history",
        conn_id="powerbi_api",
        url="myorg/groups/{workspaceId}/datasets/{datasetId}/refreshes/{refreshId}",
        path_parameters={
            "workspaceId": "9a7e14c6-9a7d-4b4c-b0f2-799a85e60a51",
            "datasetId": "ffb6096e-d409-4826-aaeb-b5d4b165dc4d",
            "refreshId": refresh_dataset_task.output,
        },
        timeout=350.0,
        event_processor=lambda context, event: event["status"] == "Completed",
    )

Below is an example of using this operator to create an item schedule in Fabric.

tests/system/providers/microsoft/azure/example_msfabric.py[source]

    # https://learn.microsoft.com/en-us/rest/api/fabric/core/job-scheduler/create-item-schedule?tabs=HTTP
    workspaces_task = MSGraphAsyncOperator(
        task_id="schedule_datapipeline",
        conn_id="powerbi",
        method="POST",
        url="workspaces/{workspaceId}/items/{itemId}/jobs/instances",
        path_parameters={
            "workspaceId": "e90b2873-4812-4dfb-9246-593638165644",
            "itemId": "65448530-e5ec-4aeb-a97e-7cebf5d67c18",
        },
        query_parameters={"jobType": "Pipeline"},
        dag=dag,
        outlets=[
            Dataset(
                "workspaces/e90b2873-4812-4dfb-9246-593638165644/items/65448530-e5ec-4aeb-a97e-7cebf5d67c18/jobs/instances?jobType=Pipeline"
            )
        ],
    )

Was this entry helpful?