Source code for airflow.providers.apache.kylin.operators.kylin_cube

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import time
from datetime import datetime
from typing import TYPE_CHECKING, Optional, Sequence

from kylinpy import kylinpy

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.apache.kylin.hooks.kylin import KylinHook

if TYPE_CHECKING:
    from airflow.utils.context import Context


[docs]class KylinCubeOperator(BaseOperator): """ This operator is used to submit request about kylin build/refresh/merge, and can track job status . so users can easier to build kylin job For more detail information in `Apache Kylin <http://kylin.apache.org/>`_ :param kylin_conn_id: The connection id as configured in Airflow administration. :param project: kylin project name, this param will overwrite the project in kylin_conn_id: :param cube: kylin cube name :param dsn: (dsn , dsn url of kylin connection ,which will overwrite kylin_conn_id. for example: kylin://ADMIN:KYLIN@sandbox/learn_kylin?timeout=60&is_debug=1) :param command: (kylin command include 'build', 'merge', 'refresh', 'delete', 'build_streaming', 'merge_streaming', 'refresh_streaming', 'disable', 'enable', 'purge', 'clone', 'drop'. build - use /kylin/api/cubes/{cubeName}/build rest api,and buildType is ‘BUILD’, and you should give start_time and end_time refresh - use build rest api,and buildType is ‘REFRESH’ merge - use build rest api,and buildType is ‘MERGE’ build_streaming - use /kylin/api/cubes/{cubeName}/build2 rest api,and buildType is ‘BUILD’ and you should give offset_start and offset_end refresh_streaming - use build2 rest api,and buildType is ‘REFRESH’ merge_streaming - use build2 rest api,and buildType is ‘MERGE’ delete - delete segment, and you should give segment_name value disable - disable cube enable - enable cube purge - purge cube clone - clone cube,new cube name is {cube_name}_clone drop - drop cube) :param start_time: build segment start time :param end_time: build segment end time :param offset_start: streaming build segment start time :param offset_end: streaming build segment end time :param segment_name: segment name :param is_track_job: (whether to track job status. if value is True,will track job until job status is in("FINISHED", "ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED") or timeout) :param interval: track job status,default value is 60s :param timeout: timeout value,default value is 1 day,60 * 60 * 24 s :param eager_error_status: (jobs error status,if job status in this list ,this task will be error. default value is tuple(["ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"])) """
[docs] template_fields: Sequence[str] = ( 'project', 'cube', 'dsn', 'command', 'start_time', 'end_time', 'segment_name', 'offset_start', 'offset_end',
)
[docs] ui_color = '#E79C46'
[docs] build_command = { 'fullbuild', 'build', 'merge', 'refresh', 'build_streaming', 'merge_streaming', 'refresh_streaming',
}
[docs] jobs_end_status = {"FINISHED", "ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"}
def __init__( self, *, kylin_conn_id: str = 'kylin_default', project: Optional[str] = None, cube: Optional[str] = None, dsn: Optional[str] = None, command: Optional[str] = None, start_time: Optional[str] = None, end_time: Optional[str] = None, offset_start: Optional[str] = None, offset_end: Optional[str] = None, segment_name: Optional[str] = None, is_track_job: bool = False, interval: int = 60, timeout: int = 60 * 60 * 24, eager_error_status=("ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"), **kwargs, ): super().__init__(**kwargs) self.kylin_conn_id = kylin_conn_id self.project = project self.cube = cube self.dsn = dsn self.command = command self.start_time = start_time self.end_time = end_time self.segment_name = segment_name self.offset_start = offset_start self.offset_end = offset_end self.is_track_job = is_track_job self.interval = interval self.timeout = timeout self.eager_error_status = eager_error_status self.jobs_error_status = [stat.upper() for stat in eager_error_status]
[docs] def execute(self, context: 'Context'): _hook = KylinHook(kylin_conn_id=self.kylin_conn_id, project=self.project, dsn=self.dsn) _support_invoke_command = kylinpy.CubeSource.support_invoke_command if not self.command: raise AirflowException(f'Kylin:Command {self.command} can not be empty') if self.command.lower() not in _support_invoke_command: raise AirflowException( f'Kylin:Command {self.command} can not match kylin command list {_support_invoke_command}' ) kylinpy_params = { 'start': datetime.fromtimestamp(int(self.start_time) / 1000) if self.start_time else None, 'end': datetime.fromtimestamp(int(self.end_time) / 1000) if self.end_time else None, 'name': self.segment_name, 'offset_start': int(self.offset_start) if self.offset_start else None, 'offset_end': int(self.offset_end) if self.offset_end else None, } rsp_data = _hook.cube_run(self.cube, self.command.lower(), **kylinpy_params) if self.is_track_job and self.command.lower() in self.build_command: started_at = time.monotonic() job_id = rsp_data.get("uuid") if job_id is None: raise AirflowException("kylin job id is None") self.log.info("kylin job id: %s", job_id) job_status = None while job_status not in self.jobs_end_status: if time.monotonic() - started_at > self.timeout: raise AirflowException(f'kylin job {job_id} timeout') time.sleep(self.interval) job_status = _hook.get_job_status(job_id) self.log.info('Kylin job status is %s ', job_status) if job_status in self.jobs_error_status: raise AirflowException(f'Kylin job {job_id} status {job_status} is error ') if self.do_xcom_push: return rsp_data

Was this entry helpful?