Source code for airflow.providers.apache.kylin.operators.kylin_cube

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import time
from datetime import datetime
from typing import TYPE_CHECKING, Sequence

from kylinpy import kylinpy

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.apache.kylin.hooks.kylin import KylinHook

if TYPE_CHECKING:
    from airflow.utils.context import Context


[docs]class KylinCubeOperator(BaseOperator): """ Submit request about Kylin build/refresh/merge and track job status. For more detail information in `Apache Kylin <http://kylin.apache.org/>`_ :param kylin_conn_id: The connection id as configured in Airflow administration. :param project: kylin project name, this param will overwrite the project in kylin_conn_id: :param cube: kylin cube name :param dsn: (dsn , dsn url of kylin connection ,which will overwrite kylin_conn_id. for example: kylin://ADMIN:KYLIN@sandbox/learn_kylin?timeout=60&is_debug=1) :param command: (kylin command include 'build', 'merge', 'refresh', 'delete', 'build_streaming', 'merge_streaming', 'refresh_streaming', 'disable', 'enable', 'purge', 'clone', 'drop'. build - use /kylin/api/cubes/{cubeName}/build rest api,and buildType is 'BUILD', and you should give start_time and end_time refresh - use build rest api,and buildType is 'REFRESH' merge - use build rest api,and buildType is 'MERGE' build_streaming - use /kylin/api/cubes/{cubeName}/build2 rest api,and buildType is 'BUILD' and you should give offset_start and offset_end refresh_streaming - use build2 rest api,and buildType is 'REFRESH' merge_streaming - use build2 rest api,and buildType is 'MERGE' delete - delete segment, and you should give segment_name value disable - disable cube enable - enable cube purge - purge cube clone - clone cube,new cube name is {cube_name}_clone drop - drop cube) :param start_time: build segment start time :param end_time: build segment end time :param offset_start: streaming build segment start time :param offset_end: streaming build segment end time :param segment_name: segment name :param is_track_job: (whether to track job status. if value is True,will track job until job status is in("FINISHED", "ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED") or timeout) :param interval: track job status,default value is 60s :param timeout: timeout value,default value is 1 day,60 * 60 * 24 s :param eager_error_status: (jobs error status,if job status in this list ,this task will be error. default value is tuple(["ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"])) """
[docs] template_fields: Sequence[str] = ( "project", "cube", "dsn", "command", "start_time", "end_time", "segment_name", "offset_start", "offset_end", )
[docs] ui_color = "#E79C46"
[docs] build_command = { "fullbuild", "build", "merge", "refresh", "build_streaming", "merge_streaming", "refresh_streaming", }
[docs] jobs_end_status = {"FINISHED", "ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"}
def __init__( self, *, kylin_conn_id: str = "kylin_default", project: str | None = None, cube: str | None = None, dsn: str | None = None, command: str | None = None, start_time: str | None = None, end_time: str | None = None, offset_start: str | None = None, offset_end: str | None = None, segment_name: str | None = None, is_track_job: bool = False, interval: int = 60, timeout: int = 60 * 60 * 24, eager_error_status=("ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"), **kwargs, ): super().__init__(**kwargs) self.kylin_conn_id = kylin_conn_id self.project = project self.cube = cube self.dsn = dsn self.command = command self.start_time = start_time self.end_time = end_time self.segment_name = segment_name self.offset_start = offset_start self.offset_end = offset_end self.is_track_job = is_track_job self.interval = interval self.timeout = timeout self.eager_error_status = eager_error_status self.jobs_error_status = [stat.upper() for stat in eager_error_status]
[docs] def execute(self, context: Context): _hook = KylinHook(kylin_conn_id=self.kylin_conn_id, project=self.project, dsn=self.dsn) _support_invoke_command = kylinpy.CubeSource.support_invoke_command if not self.command: raise AirflowException(f"Kylin:Command {self.command} can not be empty") if self.command.lower() not in _support_invoke_command: raise AirflowException( f"Kylin:Command {self.command} can not match kylin command list {_support_invoke_command}" ) kylinpy_params = { "start": datetime.fromtimestamp(int(self.start_time) / 1000) if self.start_time else None, "end": datetime.fromtimestamp(int(self.end_time) / 1000) if self.end_time else None, "name": self.segment_name, "offset_start": int(self.offset_start) if self.offset_start else None, "offset_end": int(self.offset_end) if self.offset_end else None, } rsp_data = _hook.cube_run(self.cube, self.command.lower(), **kylinpy_params) if self.is_track_job and self.command.lower() in self.build_command: started_at = time.monotonic() job_id = rsp_data.get("uuid") if job_id is None: raise AirflowException("kylin job id is None") self.log.info("kylin job id: %s", job_id) job_status = None while job_status not in self.jobs_end_status: if time.monotonic() - started_at > self.timeout: raise AirflowException(f"kylin job {job_id} timeout") time.sleep(self.interval) job_status = _hook.get_job_status(job_id) self.log.info("Kylin job status is %s ", job_status) if job_status in self.jobs_error_status: raise AirflowException(f"Kylin job {job_id} status {job_status} is error ") if self.do_xcom_push: return rsp_data

Was this entry helpful?