members.lumbung.space/members_lumbung_space/redis.py
cellarspoon 950c369455
Some checks failed
continuous-integration/drone/push Build is failing
feat: resource map
2022-01-10 16:20:47 +01:00

51 lines
1.3 KiB
Python

"""Redis cache."""
import json
from aioredis import create_redis_pool
async def init_redis(app):
from members_lumbung_space.config import REDIS_DB, REDIS_HOST, REDIS_PORT
redis = Redis()
app.state.redis = await redis.create_pool(
f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}?encoding=utf-8"
)
class Redis:
"""Redis cache."""
def __init__(self):
"""Initialise the object."""
self._redis = None
async def create_pool(self, conn):
"""Initialise pool."""
self._redis = await create_redis_pool(conn)
return self
async def keys(self, pattern):
"""Retrieve keys that match a pattern."""
return await self._redis.keys(pattern)
async def set(self, key, value, dumps=True):
"""Set a key."""
if dumps:
return await self._redis.set(key, json.dumps(value))
return await self._redis.set(key, value)
async def get(self, key, loads=True):
"""Get a specific key."""
if loads:
value = await self._redis.get(key)
if value:
return json.loads(value)
return await self._redis.get(key)
async def close(self):
"""Close the connection."""
self.redis_cache.close()
await self.redis_cache.wait_closed()