Source code for airflow.providers.redis.hooks.redis
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""RedisHook module"""fromredisimportRedisfromairflow.hooks.baseimportBaseHook
[docs]classRedisHook(BaseHook):""" Wrapper for connection to interact with Redis in-memory data structure store You can set your db in the extra field of your connection as ``{"db": 3}``. Also you can set ssl parameters as: ``{"ssl": true, "ssl_cert_reqs": "require", "ssl_cert_file": "/path/to/cert.pem", etc}``. """
def__init__(self,redis_conn_id:str=default_conn_name)->None:""" Prepares hook to connect to a Redis database. :param conn_id: the name of the connection that has the parameters we need to connect to Redis. """super().__init__()self.redis_conn_id=redis_conn_idself.redis=Noneself.host=Noneself.port=Noneself.password=Noneself.db=None
[docs]defget_conn(self):"""Returns a Redis connection."""conn=self.get_connection(self.redis_conn_id)self.host=conn.hostself.port=conn.portself.password=Noneifstr(conn.password).lower()in['none','false','']elseconn.passwordself.db=conn.extra_dejson.get('db')# check for ssl parameters in conn.extrassl_arg_names=["ssl","ssl_cert_reqs","ssl_ca_certs","ssl_keyfile","ssl_cert_file","ssl_check_hostname",]ssl_args={name:valforname,valinconn.extra_dejson.items()ifnameinssl_arg_names}ifnotself.redis:self.log.debug('Initializing redis object for conn_id "%s" on %s:%s:%s',self.redis_conn_id,self.host,self.port,self.db,)self.redis=Redis(host=self.host,port=self.port,password=self.password,db=self.db,**ssl_args)returnself.redis