#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import time
from datetime import datetime
from typing import TYPE_CHECKING, Optional, Sequence
from kylinpy import kylinpy
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.apache.kylin.hooks.kylin import KylinHook
if TYPE_CHECKING:
from airflow.utils.context import Context
[docs]class KylinCubeOperator(BaseOperator):
"""
This operator is used to submit request about kylin build/refresh/merge,
and can track job status . so users can easier to build kylin job
For more detail information in
`Apache Kylin <http://kylin.apache.org/>`_
:param kylin_conn_id: The connection id as configured in Airflow administration.
:param project: kylin project name, this param will overwrite the project in kylin_conn_id:
:param cube: kylin cube name
:param dsn: (dsn , dsn url of kylin connection ,which will overwrite kylin_conn_id.
for example: kylin://ADMIN:KYLIN@sandbox/learn_kylin?timeout=60&is_debug=1)
:param command: (kylin command include 'build', 'merge', 'refresh', 'delete',
'build_streaming', 'merge_streaming', 'refresh_streaming', 'disable', 'enable',
'purge', 'clone', 'drop'.
build - use /kylin/api/cubes/{cubeName}/build rest api,and buildType is ‘BUILD’,
and you should give start_time and end_time
refresh - use build rest api,and buildType is ‘REFRESH’
merge - use build rest api,and buildType is ‘MERGE’
build_streaming - use /kylin/api/cubes/{cubeName}/build2 rest api,and buildType is ‘BUILD’
and you should give offset_start and offset_end
refresh_streaming - use build2 rest api,and buildType is ‘REFRESH’
merge_streaming - use build2 rest api,and buildType is ‘MERGE’
delete - delete segment, and you should give segment_name value
disable - disable cube
enable - enable cube
purge - purge cube
clone - clone cube,new cube name is {cube_name}_clone
drop - drop cube)
:param start_time: build segment start time
:param end_time: build segment end time
:param offset_start: streaming build segment start time
:param offset_end: streaming build segment end time
:param segment_name: segment name
:param is_track_job: (whether to track job status. if value is True,will track job until
job status is in("FINISHED", "ERROR", "DISCARDED", "KILLED", "SUICIDAL",
"STOPPED") or timeout)
:param interval: track job status,default value is 60s
:param timeout: timeout value,default value is 1 day,60 * 60 * 24 s
:param eager_error_status: (jobs error status,if job status in this list ,this task will be error.
default value is tuple(["ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"]))
"""
[docs] template_fields: Sequence[str] = (
'project',
'cube',
'dsn',
'command',
'start_time',
'end_time',
'segment_name',
'offset_start',
'offset_end',
)
[docs] build_command = {
'fullbuild',
'build',
'merge',
'refresh',
'build_streaming',
'merge_streaming',
'refresh_streaming',
}
[docs] jobs_end_status = {"FINISHED", "ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"}
def __init__(
self,
*,
kylin_conn_id: str = 'kylin_default',
project: Optional[str] = None,
cube: Optional[str] = None,
dsn: Optional[str] = None,
command: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
offset_start: Optional[str] = None,
offset_end: Optional[str] = None,
segment_name: Optional[str] = None,
is_track_job: bool = False,
interval: int = 60,
timeout: int = 60 * 60 * 24,
eager_error_status=("ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"),
**kwargs,
):
super().__init__(**kwargs)
self.kylin_conn_id = kylin_conn_id
self.project = project
self.cube = cube
self.dsn = dsn
self.command = command
self.start_time = start_time
self.end_time = end_time
self.segment_name = segment_name
self.offset_start = offset_start
self.offset_end = offset_end
self.is_track_job = is_track_job
self.interval = interval
self.timeout = timeout
self.eager_error_status = eager_error_status
self.jobs_error_status = [stat.upper() for stat in eager_error_status]
[docs] def execute(self, context: 'Context'):
_hook = KylinHook(kylin_conn_id=self.kylin_conn_id, project=self.project, dsn=self.dsn)
_support_invoke_command = kylinpy.CubeSource.support_invoke_command
if not self.command:
raise AirflowException(f'Kylin:Command {self.command} can not be empty')
if self.command.lower() not in _support_invoke_command:
raise AirflowException(
f'Kylin:Command {self.command} can not match kylin command list {_support_invoke_command}'
)
kylinpy_params = {
'start': datetime.fromtimestamp(int(self.start_time) / 1000) if self.start_time else None,
'end': datetime.fromtimestamp(int(self.end_time) / 1000) if self.end_time else None,
'name': self.segment_name,
'offset_start': int(self.offset_start) if self.offset_start else None,
'offset_end': int(self.offset_end) if self.offset_end else None,
}
rsp_data = _hook.cube_run(self.cube, self.command.lower(), **kylinpy_params)
if self.is_track_job and self.command.lower() in self.build_command:
started_at = time.monotonic()
job_id = rsp_data.get("uuid")
if job_id is None:
raise AirflowException("kylin job id is None")
self.log.info("kylin job id: %s", job_id)
job_status = None
while job_status not in self.jobs_end_status:
if time.monotonic() - started_at > self.timeout:
raise AirflowException(f'kylin job {job_id} timeout')
time.sleep(self.interval)
job_status = _hook.get_job_status(job_id)
self.log.info('Kylin job status is %s ', job_status)
if job_status in self.jobs_error_status:
raise AirflowException(f'Kylin job {job_id} status {job_status} is error ')
if self.do_xcom_push:
return rsp_data