Fix default port, align with asyncio_redis, fix pool
* Pool had broken methods that didn't make sense, so deleted * Fix port to be right mongodb default port * Change pool_size to poolsize to align with asyncio_redis * Write logs to own logger
This commit is contained in:
@@ -1,8 +1,8 @@
|
|||||||
|
import logging
|
||||||
|
from asyncio_mongo.log import logger
|
||||||
from asyncio_mongo.database import Database
|
from asyncio_mongo.database import Database
|
||||||
from .protocol import MongoProtocol
|
from .protocol import MongoProtocol
|
||||||
from asyncio.log import logger
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
|
||||||
|
|
||||||
__all__ = ['Connection']
|
__all__ = ['Connection']
|
||||||
|
|
||||||
@@ -24,7 +24,7 @@ class Connection:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def create(cls, host='localhost', port=6379, loop=None, password=None, db=0, auto_reconnect=True):
|
def create(cls, host='localhost', port=27017, loop=None, auto_reconnect=True):
|
||||||
connection = cls()
|
connection = cls()
|
||||||
|
|
||||||
connection.host = host
|
connection.host = host
|
||||||
@@ -33,7 +33,7 @@ class Connection:
|
|||||||
connection._retry_interval = .5
|
connection._retry_interval = .5
|
||||||
|
|
||||||
# Create protocol instance
|
# Create protocol instance
|
||||||
protocol_factory = type('MongoProtocol', (cls.protocol,), { 'password': password, 'db': db })
|
protocol_factory = type('MongoProtocol', (cls.protocol,), {})
|
||||||
|
|
||||||
if auto_reconnect:
|
if auto_reconnect:
|
||||||
class protocol_factory(protocol_factory):
|
class protocol_factory(protocol_factory):
|
||||||
|
|||||||
@@ -38,22 +38,22 @@ class Pool:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def create(cls, host='localhost', port=6379, loop=None, password=None, db=0, pool_size=1, auto_reconnect=True):
|
def create(cls, host='localhost', port=27017, loop=None, poolsize=1, auto_reconnect=True):
|
||||||
"""
|
"""
|
||||||
Create a new connection instance.
|
Create a new pool instance.
|
||||||
"""
|
"""
|
||||||
self = cls()
|
self = cls()
|
||||||
self._host = host
|
self._host = host
|
||||||
self._port = port
|
self._port = port
|
||||||
self._pool_size = pool_size
|
self._pool_size = poolsize
|
||||||
|
|
||||||
# Create connections
|
# Create connections
|
||||||
self._connections = []
|
self._connections = []
|
||||||
|
|
||||||
for i in range(pool_size):
|
for i in range(poolsize):
|
||||||
connection_class = cls.get_connection_class()
|
connection_class = cls.get_connection_class()
|
||||||
connection = yield from connection_class.create(host=host, port=port, loop=loop,
|
connection = yield from connection_class.create(host=host, port=port, loop=loop,
|
||||||
password=password, db=db, auto_reconnect=auto_reconnect)
|
auto_reconnect=auto_reconnect)
|
||||||
self._connections.append(connection)
|
self._connections.append(connection)
|
||||||
|
|
||||||
return self
|
return self
|
||||||
@@ -66,13 +66,6 @@ class Pool:
|
|||||||
""" Number of parallel connections in the pool."""
|
""" Number of parallel connections in the pool."""
|
||||||
return self._poolsize
|
return self._poolsize
|
||||||
|
|
||||||
@property
|
|
||||||
def connections_in_use(self):
|
|
||||||
"""
|
|
||||||
Return how many protocols are in use.
|
|
||||||
"""
|
|
||||||
return sum([ 1 for c in self._connections if c.protocol.in_use ])
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def connections_connected(self):
|
def connections_connected(self):
|
||||||
"""
|
"""
|
||||||
@@ -93,7 +86,7 @@ class Pool:
|
|||||||
self._shuffle_connections()
|
self._shuffle_connections()
|
||||||
|
|
||||||
for c in self._connections:
|
for c in self._connections:
|
||||||
if c.protocol.is_connected and not c.protocol.in_use:
|
if c.protocol.is_connected:
|
||||||
return c
|
return c
|
||||||
|
|
||||||
def _shuffle_connections(self):
|
def _shuffle_connections(self):
|
||||||
@@ -116,5 +109,5 @@ class Pool:
|
|||||||
if connection:
|
if connection:
|
||||||
return getattr(connection, name)
|
return getattr(connection, name)
|
||||||
else:
|
else:
|
||||||
raise NoAvailableConnectionsInPoolError('No available connections in the pool: size=%s, in_use=%s, connected=%s' % (
|
raise NoAvailableConnectionsInPoolError('No available connections in the pool: size=%s, connected=%s' % (
|
||||||
self.pool_size, self.connections_in_use, self.connections_connected))
|
self.pool_size, self.connections_connected))
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ class MongoProtocol(asyncio.Protocol):
|
|||||||
def connection_made(self, transport):
|
def connection_made(self, transport):
|
||||||
self.transport = transport
|
self.transport = transport
|
||||||
self._is_connected = True
|
self._is_connected = True
|
||||||
logger.log(logging.INFO, 'Mongo connection made with %')
|
logger.log(logging.INFO, 'Mongo connection made')
|
||||||
|
|
||||||
def connection_lost(self, exc):
|
def connection_lost(self, exc):
|
||||||
self._is_connected = False
|
self._is_connected = False
|
||||||
@@ -62,6 +62,11 @@ class MongoProtocol(asyncio.Protocol):
|
|||||||
|
|
||||||
logger.log(logging.INFO, 'Mongo connection lost')
|
logger.log(logging.INFO, 'Mongo connection lost')
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_connected(self):
|
||||||
|
""" True when the underlying transport is connected. """
|
||||||
|
return self._is_connected
|
||||||
|
|
||||||
def data_received(self, data):
|
def data_received(self, data):
|
||||||
while self.__waiting_header:
|
while self.__waiting_header:
|
||||||
self.__buffer += data
|
self.__buffer += data
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ class TestMongoConnectionMethods(MongoTest):
|
|||||||
@async
|
@async
|
||||||
def test_pool(self):
|
def test_pool(self):
|
||||||
# MongoConnectionPool returns deferred, which gets MongoAPI
|
# MongoConnectionPool returns deferred, which gets MongoAPI
|
||||||
pool = asyncio_mongo.Pool.create(mongo_host, mongo_port, pool_size=2)
|
pool = asyncio_mongo.Pool.create(mongo_host, mongo_port, poolsize=2)
|
||||||
self.assertTrue(inspect.isgenerator(pool))
|
self.assertTrue(inspect.isgenerator(pool))
|
||||||
rapi = yield from pool
|
rapi = yield from pool
|
||||||
print('rapi %s' % rapi.__class__)
|
print('rapi %s' % rapi.__class__)
|
||||||
|
|||||||
Reference in New Issue
Block a user