airflow.providers.apache.pinot.hooks.pinot

Module Contents

class airflow.providers.apache.pinot.hooks.pinot.PinotAdminHook(conn_id: str = 'pinot_admin_default', cmd_path: str = 'pinot-admin.sh', pinot_admin_system_exit: bool = False)[source]

Bases: airflow.hooks.base.BaseHook

This hook is a wrapper around the pinot-admin.sh script. For now, only small subset of its subcommands are implemented, which are required to ingest offline data into Apache Pinot (i.e., AddSchema, AddTable, CreateSegment, and UploadSegment). Their command options are based on Pinot v0.1.0.

Unfortunately, as of v0.1.0, pinot-admin.sh always exits with status code 0. To address this behavior, users can use the pinot_admin_system_exit flag. If its value is set to false, this hook evaluates the result based on the output message instead of the status code. This Pinot's behavior is supposed to be improved in the next release, which will include the following PR: https://github.com/apache/incubator-pinot/pull/4110

Parameters
  • conn_id (str) -- The name of the connection to use.

  • cmd_path (str) -- The filepath to the pinot-admin.sh executable

  • pinot_admin_system_exit (bool) -- If true, the result is evaluated based on the status code. Otherwise, the result is evaluated as a failure if "Error" or "Exception" is in the output message.

get_conn(self)[source]
add_schema(self, schema_file: str, with_exec: bool = True)[source]

Add Pinot schema by run AddSchema command

Parameters
  • schema_file (str) -- Pinot schema file

  • with_exec (bool) -- bool

add_table(self, file_path: str, with_exec: bool = True)[source]

Add Pinot table with run AddTable command

Parameters
  • file_path (str) -- Pinot table configure file

  • with_exec (bool) -- bool

create_segment(self, generator_config_file: Optional[str] = None, data_dir: Optional[str] = None, segment_format: Optional[str] = None, out_dir: Optional[str] = None, overwrite: Optional[str] = None, table_name: Optional[str] = None, segment_name: Optional[str] = None, time_column_name: Optional[str] = None, schema_file: Optional[str] = None, reader_config_file: Optional[str] = None, enable_star_tree_index: Optional[str] = None, star_tree_index_spec_file: Optional[str] = None, hll_size: Optional[str] = None, hll_columns: Optional[str] = None, hll_suffix: Optional[str] = None, num_threads: Optional[str] = None, post_creation_verification: Optional[str] = None, retry: Optional[str] = None)[source]

Create Pinot segment by run CreateSegment command

upload_segment(self, segment_dir: str, table_name: Optional[str] = None)[source]

Upload Segment with run UploadSegment command

Parameters
  • segment_dir --

  • table_name --

Returns

run_cli(self, cmd: List[str], verbose: bool = True)[source]

Run command with pinot-admin.sh

Parameters
  • cmd (list) -- List of command going to be run by pinot-admin.sh script

  • verbose (bool) --

class airflow.providers.apache.pinot.hooks.pinot.PinotDbApiHook[source]

Bases: airflow.hooks.dbapi.DbApiHook

Interact with Pinot Broker Query API

This hook uses standard-SQL endpoint since PQL endpoint is soon to be deprecated. https://docs.pinot.apache.org/users/api/querying-pinot-using-standard-sql

conn_name_attr = pinot_broker_conn_id[source]
default_conn_name = pinot_broker_default[source]
supports_autocommit = False[source]
get_conn(self)[source]

Establish a connection to pinot broker through pinot dbapi.

get_uri(self)[source]

Get the connection uri for pinot broker.

e.g: http://localhost:9000/query/sql

get_records(self, sql: str, parameters: Optional[Union[Dict[str, Any], Iterable[Any]]] = None)[source]

Executes the sql and returns a set of records.

Parameters
  • sql (str) -- the sql statement to be executed (str) or a list of sql statements to execute

  • parameters (dict or iterable) -- The parameters to render the SQL query with.

get_first(self, sql: str, parameters: Optional[Union[Dict[str, Any], Iterable[Any]]] = None)[source]

Executes the sql and returns the first resulting row.

Parameters
  • sql (str or list) -- the sql statement to be executed (str) or a list of sql statements to execute

  • parameters (dict or iterable) -- The parameters to render the SQL query with.

set_autocommit(self, conn: Connection, autocommit: Any)[source]
insert_rows(self, table: str, rows: str, target_fields: Optional[str] = None, commit_every: int = 1000, replace: bool = False, **kwargs)[source]

Was this entry helpful?