Source code for airflow.providers.neo4j.hooks.neo4j
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module allows to connect to a Neo4j database."""from__future__importannotationsfromtypingimportTYPE_CHECKING,Anyfromurllib.parseimporturlsplitfromneo4jimportDriver,GraphDatabasefromairflow.hooks.baseimportBaseHookifTYPE_CHECKING:fromairflow.modelsimportConnection
[docs]classNeo4jHook(BaseHook):""" Interact with Neo4j. Performs a connection to Neo4j and runs the query. :param neo4j_conn_id: Reference to :ref:`Neo4j connection id <howto/connection:neo4j>`. """
[docs]defget_conn(self)->Driver:"""Initiate a new Neo4j connection with username, password and database schema."""ifself.clientisnotNone:returnself.clientself.connection=self.get_connection(self.neo4j_conn_id)uri=self.get_uri(self.connection)self.log.info("URI: %s",uri)is_encrypted=self.connection.extra_dejson.get("encrypted",False)self.client=self.get_client(self.connection,is_encrypted,uri)returnself.client
[docs]defget_client(self,conn:Connection,encrypted:bool,uri:str)->Driver:""" Determine that relevant driver based on extras. :param conn: Connection object. :param encrypted: boolean if encrypted connection or not. :param uri: uri string for connection. :return: Driver """parsed_uri=urlsplit(uri)kwargs:dict[str,Any]={}ifparsed_uri.schemein["bolt","neo4j"]:kwargs["encrypted"]=encryptedreturnGraphDatabase.driver(uri,auth=(conn.login,conn.password),**kwargs)
[docs]defget_uri(self,conn:Connection)->str:""" Build the uri based on extras. - Default - uses bolt scheme(bolt://) - neo4j_scheme - neo4j:// - certs_self_signed - neo4j+ssc:// - certs_trusted_ca - neo4j+s:// :param conn: connection object. :return: uri """use_neo4j_scheme=conn.extra_dejson.get("neo4j_scheme",False)scheme="neo4j"ifuse_neo4j_schemeelse"bolt"# Self signed certificatesssc=conn.extra_dejson.get("certs_self_signed",False)# Only certificates signed by CA.trusted_ca=conn.extra_dejson.get("certs_trusted_ca",False)encryption_scheme=""ifssc:encryption_scheme="+ssc"eliftrusted_ca:encryption_scheme="+s"returnf"{scheme}{encryption_scheme}://{conn.host}:{7687ifconn.portisNoneelseconn.port}"
[docs]defrun(self,query)->list[Any]:""" Create a neo4j session and execute the query in the session. :param query: Neo4j query :return: Result """driver=self.get_conn()ifnotself.connection.schema:withdriver.session()assession:result=session.run(query)returnresult.data()else:withdriver.session(database=self.connection.schema)assession:result=session.run(query)returnresult.data()