airflow.providers.apache.pinot.hooks.pinot

Module Contents

Classes

PinotAdminHook

This hook is a wrapper around the pinot-admin.sh script.

PinotDbApiHook

Interact with Pinot Broker Query API

class airflow.providers.apache.pinot.hooks.pinot.PinotAdminHook(conn_id='pinot_admin_default', cmd_path='pinot-admin.sh', pinot_admin_system_exit=False)[source]

Bases: airflow.hooks.base.BaseHook

This hook is a wrapper around the pinot-admin.sh script. For now, only small subset of its subcommands are implemented, which are required to ingest offline data into Apache Pinot (i.e., AddSchema, AddTable, CreateSegment, and UploadSegment). Their command options are based on Pinot v0.1.0.

Unfortunately, as of v0.1.0, pinot-admin.sh always exits with status code 0. To address this behavior, users can use the pinot_admin_system_exit flag. If its value is set to false, this hook evaluates the result based on the output message instead of the status code. This Pinot's behavior is supposed to be improved in the next release, which will include the following PR: https://github.com/apache/incubator-pinot/pull/4110

Parameters
  • conn_id (str) -- The name of the connection to use.

  • cmd_path (str) -- The filepath to the pinot-admin.sh executable

  • pinot_admin_system_exit (bool) -- If true, the result is evaluated based on the status code. Otherwise, the result is evaluated as a failure if "Error" or "Exception" is in the output message.

get_conn(self)[source]

Returns connection for the hook.

add_schema(self, schema_file, with_exec=True)[source]

Add Pinot schema by run AddSchema command

Parameters
  • schema_file (str) -- Pinot schema file

  • with_exec (bool) -- bool

add_table(self, file_path, with_exec=True)[source]

Add Pinot table with run AddTable command

Parameters
  • file_path (str) -- Pinot table configure file

  • with_exec (bool) -- bool

create_segment(self, generator_config_file=None, data_dir=None, segment_format=None, out_dir=None, overwrite=None, table_name=None, segment_name=None, time_column_name=None, schema_file=None, reader_config_file=None, enable_star_tree_index=None, star_tree_index_spec_file=None, hll_size=None, hll_columns=None, hll_suffix=None, num_threads=None, post_creation_verification=None, retry=None)[source]

Create Pinot segment by run CreateSegment command

upload_segment(self, segment_dir, table_name=None)[source]

Upload Segment with run UploadSegment command

Parameters
  • segment_dir (str) --

  • table_name (Optional[str]) --

Returns

Return type

Any

run_cli(self, cmd, verbose=True)[source]

Run command with pinot-admin.sh

Parameters
  • cmd (List[str]) -- List of command going to be run by pinot-admin.sh script

  • verbose (bool) --

class airflow.providers.apache.pinot.hooks.pinot.PinotDbApiHook(*args, schema=None, **kwargs)[source]

Bases: airflow.hooks.dbapi.DbApiHook

Interact with Pinot Broker Query API

This hook uses standard-SQL endpoint since PQL endpoint is soon to be deprecated. https://docs.pinot.apache.org/users/api/querying-pinot-using-standard-sql

conn_name_attr = pinot_broker_conn_id[source]
default_conn_name = pinot_broker_default[source]
supports_autocommit = False[source]
get_conn(self)[source]

Establish a connection to pinot broker through pinot dbapi.

get_uri(self)[source]

Get the connection uri for pinot broker.

e.g: http://localhost:9000/query/sql

get_records(self, sql, parameters=None)[source]

Executes the sql and returns a set of records.

Parameters
  • sql (str) -- the sql statement to be executed (str) or a list of sql statements to execute

  • parameters (Optional[Union[Dict[str, Any], Iterable[Any]]]) -- The parameters to render the SQL query with.

get_first(self, sql, parameters=None)[source]

Executes the sql and returns the first resulting row.

Parameters
  • sql (str) -- the sql statement to be executed (str) or a list of sql statements to execute

  • parameters (Optional[Union[Dict[str, Any], Iterable[Any]]]) -- The parameters to render the SQL query with.

abstract set_autocommit(self, conn, autocommit)[source]

Sets the autocommit flag on the connection

abstract insert_rows(self, table, rows, target_fields=None, commit_every=1000, replace=False, **kwargs)[source]

A generic way to insert a set of tuples into a table, a new transaction is created every commit_every rows

Parameters
  • table (str) -- Name of the target table

  • rows (str) -- The rows to insert into the table

  • target_fields (Optional[str]) -- The names of the columns to fill in the table

  • commit_every (int) -- The maximum number of rows to insert in one transaction. Set to 0 to insert all rows in one transaction.

  • replace (bool) -- Whether to replace instead of insert

Was this entry helpful?