Source code for airflow.providers.apache.cassandra.hooks.cassandra
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains hook to integrate with Apache Cassandra."""fromtypingimportAny,Dict,Unionfromcassandra.authimportPlainTextAuthProviderfromcassandra.clusterimportCluster,Sessionfromcassandra.policiesimport(DCAwareRoundRobinPolicy,RoundRobinPolicy,TokenAwarePolicy,WhiteListRoundRobinPolicy,)fromairflow.hooks.baseimportBaseHookfromairflow.utils.log.logging_mixinimportLoggingMixin
[docs]classCassandraHook(BaseHook,LoggingMixin):""" Hook used to interact with Cassandra Contact points can be specified as a comma-separated string in the 'hosts' field of the connection. Port can be specified in the port field of the connection. If SSL is enabled in Cassandra, pass in a dict in the extra field as kwargs for ``ssl.wrap_socket()``. For example:: { 'ssl_options' : { 'ca_certs' : PATH_TO_CA_CERTS } } Default load balancing policy is RoundRobinPolicy. To specify a different LB policy:: - DCAwareRoundRobinPolicy { 'load_balancing_policy': 'DCAwareRoundRobinPolicy', 'load_balancing_policy_args': { 'local_dc': LOCAL_DC_NAME, // optional 'used_hosts_per_remote_dc': SOME_INT_VALUE, // optional } } - WhiteListRoundRobinPolicy { 'load_balancing_policy': 'WhiteListRoundRobinPolicy', 'load_balancing_policy_args': { 'hosts': ['HOST1', 'HOST2', 'HOST3'] } } - TokenAwarePolicy { 'load_balancing_policy': 'TokenAwarePolicy', 'load_balancing_policy_args': { 'child_load_balancing_policy': CHILD_POLICY_NAME, // optional 'child_load_balancing_policy_args': { ... } // optional } } For details of the Cluster config, see cassandra.cluster. """
[docs]defget_conn(self)->Session:"""Returns a cassandra Session object"""ifself.sessionandnotself.session.is_shutdown:returnself.sessionself.session=self.cluster.connect(self.keyspace)returnself.session
[docs]defshutdown_cluster(self)->None:"""Closes all sessions and connections associated with this Cluster."""ifnotself.cluster.is_shutdown:self.cluster.shutdown()
@staticmethod
[docs]defget_lb_policy(policy_name:str,policy_args:Dict[str,Any])->Policy:""" Creates load balancing policy. :param policy_name: Name of the policy to use. :param policy_args: Parameters for the policy. """ifpolicy_name=='DCAwareRoundRobinPolicy':local_dc=policy_args.get('local_dc','')used_hosts_per_remote_dc=int(policy_args.get('used_hosts_per_remote_dc',0))returnDCAwareRoundRobinPolicy(local_dc,used_hosts_per_remote_dc)ifpolicy_name=='WhiteListRoundRobinPolicy':hosts=policy_args.get('hosts')ifnothosts:raiseException('Hosts must be specified for WhiteListRoundRobinPolicy')returnWhiteListRoundRobinPolicy(hosts)ifpolicy_name=='TokenAwarePolicy':allowed_child_policies=('RoundRobinPolicy','DCAwareRoundRobinPolicy','WhiteListRoundRobinPolicy',)child_policy_name=policy_args.get('child_load_balancing_policy','RoundRobinPolicy')child_policy_args=policy_args.get('child_load_balancing_policy_args',{})ifchild_policy_namenotinallowed_child_policies:returnTokenAwarePolicy(RoundRobinPolicy())else:child_policy=CassandraHook.get_lb_policy(child_policy_name,child_policy_args)returnTokenAwarePolicy(child_policy)# Fallback to default RoundRobinPolicyreturnRoundRobinPolicy()
[docs]deftable_exists(self,table:str)->bool:""" Checks if a table exists in Cassandra :param table: Target Cassandra table. Use dot notation to target a specific keyspace. """keyspace=self.keyspaceif'.'intable:keyspace,table=table.split('.',1)cluster_metadata=self.get_conn().cluster.metadatareturnkeyspaceincluster_metadata.keyspacesandtableincluster_metadata.keyspaces[keyspace].tables
[docs]defrecord_exists(self,table:str,keys:Dict[str,str])->bool:""" Checks if a record exists in Cassandra :param table: Target Cassandra table. Use dot notation to target a specific keyspace. :param keys: The keys and their values to check the existence. """keyspace=self.keyspaceif'.'intable:keyspace,table=table.split('.',1)ks_str=" AND ".join(f"{key}=%({key})s"forkeyinkeys.keys())query=f"SELECT * FROM {keyspace}.{table} WHERE {ks_str}"try:result=self.get_conn().execute(query,keys)returnresult.one()isnotNoneexceptException:returnFalse