Source code for gremlin.registry
"""Simple interface for keeping track of DB connections and aliases"""
import asyncio
from urllib.parse import urlparse
from aiogremlin.driver import connection
[docs]class RegistryConnection:
"""Wrapper for :py:class:`aiogremlin.driver.connection.Connection`"""
def __init__(self, conn, alias):
self._conn = conn
self._alias = alias
def __repr__(self):
return 'Connection at {} aliased as {}'.format(self.uri, self._alias)
@property
def uri(self):
return self._conn._url
@property
def alias(self):
return self._alias
@alias.setter
def alias(self, alias):
self._alias = alias
@property
def loop(self):
return self._conn._loop
[docs] async def write(self, message):
"""Write message to Gremlin Server"""
return await self._conn.write(message)
[docs] async def close(self):
"""Close underlying connections"""
return await self._conn.close()
[docs]class ConnectionRegistry:
"""Issue and keep track of database connections."""
current = None
connections = {}
_loop = asyncio.get_event_loop()
@classmethod
[docs] def get(cls, descriptors, config=None):
"""Get a connection from the registry based on the descriptor"""
primary, secondary = descriptors
if primary:
cls.current = cls.connections.get(primary)
if not cls.current:
cls.currernt = cls._get_connection(primary, secondary, config)
elif secondary:
cls._replace_alias(cls.current, secondary)
elif not cls.current:
cls.current = cls._get_connection(config.uri, None, config)
return cls.current
@classmethod
[docs] def close(cls, connection_str=None):
"""Close DB connections"""
if connection_str:
conn = cls.connections.pop(connection_str)
if not conn:
raise RuntimeError('Specified conn does not exist')
if connection_str == conn.uri:
cls.connections.pop(conn.alias)
else:
cls.connections.pop(conn.uri)
if conn is cls.current:
cls.current = None
conns = [conn]
else:
keys= set(cls.connections.keys())
conns = set()
for k in keys:
conn = cls.connections.pop(k)
conns.add(conn)
cls.current = None
async def go(conns):
for conn in conns:
await conn.close()
cls._loop.run_until_complete(go(conns))
@classmethod
[docs] def set_connection_alias(cls, descriptors, config):
primary, secondary = descriptors
if not secondary:
raise RuntimeError('Improperly formatted alias descriptors')
conn = cls.connections.get(primary)
if not conn:
conn = cls._get_connection(primary, secondary, config)
else:
cls._replace_alias(conn, secondary)
@classmethod
def _replace_alias(cls, conn, alias):
if conn.alias != alias:
cls.connections.pop(conn.alias)
cls.connections[alias] = conn
conn.alias = alias
print('Alias-- {} --created for database at {}'.format(
alias, conn.uri))
@classmethod
[docs] def set_current_connection(cls, descriptors, config):
primary, secondary = descriptors
if not primary:
raise RuntimeError('Please specify connection to set')
conn = cls.connections.get(primary)
if not conn:
conn = cls._get_connection(primary, secondary, config)
cls.current = conn
print('Now using connection at {}'.format(conn.uri))
@classmethod
def _get_connection(cls, primary, secondary, config):
try:
conn = cls._loop.run_until_complete(
connection.Connection.open(
primary, cls._loop, username=config.username,
password=config.password,
response_timeout=config.response_timeout,
ssl_context=config.ssl_context))
except:
raise Exception(
'Unable to establish connection at URI: {}'.format(primary))
else:
if not secondary:
secondary = cls._create_secondary(primary)
conn = RegistryConnection(conn, secondary)
cls.connections[primary] = conn
cls.connections[secondary] = conn
print('Alias-- {} --created for database at {}'.format(
secondary, primary))
return conn
@classmethod
def _create_secondary(cls, primary):
alias = urlparse(primary).netloc.split(':')[0]
return alias