# Copyright 2009-2012 10gen, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you # may not use this file except in compliance with the License. You # may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. See the License for the specific language governing # permissions and limitations under the License. """Tools for connecting to MongoDB. .. seealso:: Module :mod:`~pymongo.master_slave_connection` for connecting to master-slave clusters, and :doc:`/examples/high_availability` for an example of how to connect to a replica set, or specify a list of mongos instances for automatic failover. To get a :class:`~pymongo.database.Database` instance from a :class:`MongoClient` use either dictionary-style or attribute-style access: .. doctest:: >>> from asyncio_mongo._pymongo import MongoClient >>> c = MongoClient() >>> c.test_database Database(MongoClient('localhost', 27017), u'test_database') >>> c['test-database'] Database(MongoClient('localhost', 27017), u'test-database') """ import datetime import random import socket import struct import time import warnings from asyncio_mongo._bson.py3compat import b from asyncio_mongo._pymongo import (auth, common, database, helpers, message, pool, uri_parser) from asyncio_mongo._pymongo.common import HAS_SSL from asyncio_mongo._pymongo.cursor_manager import CursorManager from asyncio_mongo._pymongo.errors import (AutoReconnect, ConfigurationError, ConnectionFailure, DuplicateKeyError, InvalidDocument, InvalidURI, OperationFailure) EMPTY = b("") def _partition_node(node): """Split a host:port string returned from mongod/s into a (host, int(port)) pair needed for socket.connect(). """ host = node port = 27017 idx = node.rfind(':') if idx != -1: host, port = node[:idx], int(node[idx + 1:]) if host.startswith('['): host = host[1:-1] return host, port class MongoClient(common.BaseObject): """Connection to MongoDB. """ HOST = "localhost" PORT = 27017 __max_bson_size = 4 * 1024 * 1024 def __init__(self, host=None, port=None, max_pool_size=100, document_class=dict, tz_aware=False, _connect=True, **kwargs): """Create a new connection to a single MongoDB instance at *host:port*. The resultant client object has connection-pooling built in. It also performs auto-reconnection when necessary. If an operation fails because of a connection error, :class:`~pymongo.errors.ConnectionFailure` is raised. If auto-reconnection will be performed, :class:`~pymongo.errors.AutoReconnect` will be raised. Application code should handle this exception (recognizing that the operation failed) and then continue to execute. Raises :class:`TypeError` if port is not an instance of ``int``. Raises :class:`~pymongo.errors.ConnectionFailure` if the connection cannot be made. The `host` parameter can be a full `mongodb URI `_, in addition to a simple hostname. It can also be a list of hostnames or URIs. Any port specified in the host string(s) will override the `port` parameter. If multiple mongodb URIs containing database or auth information are passed, the last database, username, and password present will be used. For username and passwords reserved characters like ':', '/', '+' and '@' must be escaped following RFC 2396. :Parameters: - `host` (optional): hostname or IP address of the instance to connect to, or a mongodb URI, or a list of hostnames / mongodb URIs. If `host` is an IPv6 literal it must be enclosed in '[' and ']' characters following the RFC2732 URL syntax (e.g. '[::1]' for localhost) - `port` (optional): port number on which to connect - `max_pool_size` (optional): The maximum number of connections that the pool will open simultaneously. If this is set, operations will block if there are `max_pool_size` outstanding connections from the pool. Defaults to 100. - `document_class` (optional): default class to use for documents returned from queries on this client - `tz_aware` (optional): if ``True``, :class:`~datetime.datetime` instances returned as values in a document by this :class:`MongoClient` will be timezone aware (otherwise they will be naive) | **Other optional parameters can be passed as keyword arguments:** - `socketTimeoutMS`: (integer) How long (in milliseconds) a send or receive on a socket can take before timing out. - `connectTimeoutMS`: (integer) How long (in milliseconds) a connection can take to be opened before timing out. - `waitQueueTimeoutMS`: (integer) How long (in milliseconds) a thread will wait for a socket from the pool if the pool has no free sockets. - `waitQueueMultiple`: (integer) Multiplied by max_pool_size to give the number of threads allowed to wait for a socket at one time. - `auto_start_request`: If ``True``, each thread that accesses this :class:`MongoClient` has a socket allocated to it for the thread's lifetime. This ensures consistent reads, even if you read after an unacknowledged write. Defaults to ``False`` - `use_greenlets`: If ``True``, :meth:`start_request()` will ensure that the current greenlet uses the same socket for all operations until :meth:`end_request()` | **Write Concern options:** - `w`: (integer or string) If this is a replica set, write operations will block until they have been replicated to the specified number or tagged set of servers. `w=` always includes the replica set primary (e.g. w=3 means write to the primary and wait until replicated to **two** secondaries). Passing w=0 **disables write acknowledgement** and all other write concern options. - `wtimeout`: (integer) Used in conjunction with `w`. Specify a value in milliseconds to control how long to wait for write propagation to complete. If replication does not complete in the given timeframe, a timeout exception is raised. - `j`: If ``True`` block until write operations have been committed to the journal. Ignored if the server is running without journaling. - `fsync`: If ``True`` force the database to fsync all files before returning. When used with `j` the server awaits the next group commit before returning. | **Replica set keyword arguments for connecting with a replica set - either directly or via a mongos:** | (ignored by standalone mongod instances) - `replicaSet`: (string) The name of the replica set to connect to. The driver will verify that the replica set it connects to matches this name. Implies that the hosts specified are a seed list and the driver should attempt to find all members of the set. *Ignored by mongos*. - `read_preference`: The read preference for this client. If connecting to a secondary then a read preference mode *other* than PRIMARY is required - otherwise all queries will throw :class:`~pymongo.errors.AutoReconnect` "not master". See :class:`~pymongo.read_preferences.ReadPreference` for all available read preference options. - `tag_sets`: Ignored unless connecting to a replica set via mongos. Specify a priority-order for tag sets, provide a list of tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag set, ``{}``, means "read from any member that matches the mode, ignoring tags. | **SSL configuration:** - `ssl`: If ``True``, create the connection to the server using SSL. - `ssl_keyfile`: The private keyfile used to identify the local connection against mongod. If included with the ``certfile` then only the ``ssl_certfile`` is needed. Implies ``ssl=True``. - `ssl_certfile`: The certificate file used to identify the local connection against mongod. Implies ``ssl=True``. - `ssl_cert_reqs`: Specifies whether a certificate is required from the other side of the connection, and whether it will be validated if provided. It must be one of the three values ``ssl.CERT_NONE`` (certificates ignored), ``ssl.CERT_OPTIONAL`` (not required, but validated if provided), or ``ssl.CERT_REQUIRED`` (required and validated). If the value of this parameter is not ``ssl.CERT_NONE``, then the ``ssl_ca_certs`` parameter must point to a file of CA certificates. Implies ``ssl=True``. - `ssl_ca_certs`: The ca_certs file contains a set of concatenated "certification authority" certificates, which are used to validate certificates passed from the other end of the connection. Implies ``ssl=True``. .. seealso:: :meth:`end_request` .. mongodoc:: connections .. versionchanged:: 2.5 Added additional ssl options .. versionadded:: 2.4 """ if host is None: host = self.HOST if isinstance(host, str): host = [host] if port is None: port = self.PORT if not isinstance(port, int): raise TypeError("port must be an instance of int") seeds = set() username = None password = None self.__default_database_name = None opts = {} for entity in host: if "://" in entity: if entity.startswith("mongodb://"): res = uri_parser.parse_uri(entity, port) seeds.update(res["nodelist"]) username = res["username"] or username password = res["password"] or password self.__default_database_name = ( res["database"] or self.__default_database_name) opts = res["options"] else: idx = entity.find("://") raise InvalidURI("Invalid URI scheme: " "%s" % (entity[:idx],)) else: seeds.update(uri_parser.split_hosts(entity, port)) if not seeds: raise ConfigurationError("need to specify at least one host") self.__nodes = seeds self.__host = None self.__port = None self.__is_primary = False self.__is_mongos = False # _pool_class option is for deep customization of PyMongo, e.g. Motor. # SHOULD NOT BE USED BY DEVELOPERS EXTERNAL TO 10GEN. pool_class = kwargs.pop('_pool_class', pool.Pool) options = {} for option, value in kwargs.items(): option, value = common.validate(option, value) options[option] = value options.update(opts) self.__max_pool_size = common.validate_positive_integer_or_none( 'max_pool_size', max_pool_size) self.__cursor_manager = CursorManager(self) self.__repl = options.get('replicaset') if len(seeds) == 1 and not self.__repl: self.__direct = True else: self.__direct = False self.__nodes = set() self.__net_timeout = options.get('sockettimeoutms') self.__conn_timeout = options.get('connecttimeoutms') self.__wait_queue_timeout = options.get('waitqueuetimeoutms') self.__wait_queue_multiple = options.get('waitqueuemultiple') self.__use_ssl = options.get('ssl', None) self.__ssl_keyfile = options.get('ssl_keyfile', None) self.__ssl_certfile = options.get('ssl_certfile', None) self.__ssl_cert_reqs = options.get('ssl_cert_reqs', None) self.__ssl_ca_certs = options.get('ssl_ca_certs', None) ssl_kwarg_keys = [k for k in list(kwargs.keys()) if k.startswith('ssl_')] if self.__use_ssl == False and ssl_kwarg_keys: raise ConfigurationError("ssl has not been enabled but the " "following ssl parameters have been set: " "%s. Please set `ssl=True` or remove." % ', '.join(ssl_kwarg_keys)) if self.__ssl_cert_reqs and not self.__ssl_ca_certs: raise ConfigurationError("If `ssl_cert_reqs` is not " "`ssl.CERT_NONE` then you must " "include `ssl_ca_certs` to be able " "to validate the server.") if ssl_kwarg_keys and self.__use_ssl is None: # ssl options imply ssl = True self.__use_ssl = True if self.__use_ssl and not HAS_SSL: raise ConfigurationError("The ssl module is not available. If you " "are using a python version previous to " "2.6 you must install the ssl package " "from PyPI.") self.__use_greenlets = options.get('use_greenlets', False) self.__pool = pool_class( None, self.__max_pool_size, self.__net_timeout, self.__conn_timeout, self.__use_ssl, use_greenlets=self.__use_greenlets, ssl_keyfile=self.__ssl_keyfile, ssl_certfile=self.__ssl_certfile, ssl_cert_reqs=self.__ssl_cert_reqs, ssl_ca_certs=self.__ssl_ca_certs, wait_queue_timeout=self.__wait_queue_timeout, wait_queue_multiple=self.__wait_queue_multiple) self.__document_class = document_class self.__tz_aware = common.validate_boolean('tz_aware', tz_aware) self.__auto_start_request = options.get('auto_start_request', False) # cache of existing indexes used by ensure_index ops self.__index_cache = {} self.__auth_credentials = {} super(MongoClient, self).__init__(**options) if self.slave_okay: warnings.warn("slave_okay is deprecated. Please " "use read_preference instead.", DeprecationWarning, stacklevel=2) if _connect: try: self.__find_node(seeds) except AutoReconnect as e: # ConnectionFailure makes more sense here than AutoReconnect raise ConnectionFailure(str(e)) if username: mechanism = options.get('authmechanism', 'MONGODB-CR') source = ( options.get('authsource') or self.__default_database_name or 'admin') credentials = auth._build_credentials_tuple(mechanism, source, str(username), str(password), options) try: self._cache_credentials(source, credentials, _connect) except OperationFailure as exc: raise ConfigurationError(str(exc)) def _cached(self, dbname, coll, index): """Test if `index` is cached. """ cache = self.__index_cache now = datetime.datetime.utcnow() return (dbname in cache and coll in cache[dbname] and index in cache[dbname][coll] and now < cache[dbname][coll][index]) def _cache_index(self, database, collection, index, cache_for): """Add an index to the index cache for ensure_index operations. """ now = datetime.datetime.utcnow() expire = datetime.timedelta(seconds=cache_for) + now if database not in self.__index_cache: self.__index_cache[database] = {} self.__index_cache[database][collection] = {} self.__index_cache[database][collection][index] = expire elif collection not in self.__index_cache[database]: self.__index_cache[database][collection] = {} self.__index_cache[database][collection][index] = expire else: self.__index_cache[database][collection][index] = expire def _purge_index(self, database_name, collection_name=None, index_name=None): """Purge an index from the index cache. If `index_name` is None purge an entire collection. If `collection_name` is None purge an entire database. """ if not database_name in self.__index_cache: return if collection_name is None: del self.__index_cache[database_name] return if not collection_name in self.__index_cache[database_name]: return if index_name is None: del self.__index_cache[database_name][collection_name] return if index_name in self.__index_cache[database_name][collection_name]: del self.__index_cache[database_name][collection_name][index_name] def _cache_credentials(self, source, credentials, connect=True): """Add credentials to the database authentication cache for automatic login when a socket is created. If `connect` is True, verify the credentials on the server first. """ if source in self.__auth_credentials: # Nothing to do if we already have these credentials. if credentials == self.__auth_credentials[source]: return raise OperationFailure('Another user is already authenticated ' 'to this database. You must logout first.') if connect: sock_info = self.__socket() try: # Since __check_auth was called in __socket # there is no need to call it here. auth.authenticate(credentials, sock_info, self.__simple_command) sock_info.authset.add(credentials) finally: self.__pool.maybe_return_socket(sock_info) self.__auth_credentials[source] = credentials def _purge_credentials(self, source): """Purge credentials from the database authentication cache. """ if source in self.__auth_credentials: del self.__auth_credentials[source] def __check_auth(self, sock_info): """Authenticate using cached database credentials. """ if self.__auth_credentials or sock_info.authset: cached = set(self.__auth_credentials.values()) authset = sock_info.authset.copy() # Logout any credentials that no longer exist in the cache. for credentials in authset - cached: self.__simple_command(sock_info, credentials[1], {'logout': 1}) sock_info.authset.discard(credentials) for credentials in cached - authset: auth.authenticate(credentials, sock_info, self.__simple_command) sock_info.authset.add(credentials) @property def host(self): """Current connected host. .. versionchanged:: 1.3 ``host`` is now a property rather than a method. """ return self.__host @property def port(self): """Current connected port. .. versionchanged:: 1.3 ``port`` is now a property rather than a method. """ return self.__port @property def is_primary(self): """If this instance is connected to a standalone, a replica set primary, or the master of a master-slave set. .. versionadded:: 2.3 """ return self.__is_primary @property def is_mongos(self): """If this instance is connected to mongos. .. versionadded:: 2.3 """ return self.__is_mongos @property def max_pool_size(self): """The maximum number of sockets the pool will open concurrently. When the pool has reached `max_pool_size`, operations block waiting for a socket to be returned to the pool. If ``waitQueueTimeoutMS`` is set, a blocked operation will raise :exc:`~pymongo.errors.ConnectionFailure` after a timeout. By default ``waitQueueTimeoutMS`` is not set. .. warning:: SIGNIFICANT BEHAVIOR CHANGE in 2.6. Previously, this parameter would limit only the idle sockets the pool would hold onto, not the number of open sockets. The default has also changed to 100. .. versionchanged:: 2.6 .. versionadded:: 1.11 """ return self.__max_pool_size @property def use_greenlets(self): """Whether calling :meth:`start_request` assigns greenlet-local, rather than thread-local, sockets. .. versionadded:: 2.4.2 """ return self.__use_greenlets @property def nodes(self): """List of all known nodes. Includes both nodes specified when this instance was created, as well as nodes discovered through the replica set discovery mechanism. .. versionadded:: 1.8 """ return self.__nodes @property def auto_start_request(self): """Is auto_start_request enabled? """ return self.__auto_start_request def get_document_class(self): return self.__document_class def set_document_class(self, klass): self.__document_class = klass document_class = property(get_document_class, set_document_class, doc="""Default class to use for documents returned from this client. .. versionadded:: 1.7 """) @property def tz_aware(self): """Does this client return timezone-aware datetimes? .. versionadded:: 1.8 """ return self.__tz_aware @property def max_bson_size(self): """Return the maximum size BSON object the connected server accepts in bytes. Defaults to 4MB in server < 1.7.4. .. versionadded:: 1.10 """ return self.__max_bson_size @property def max_message_size(self): """Return the maximum message size the connected server accepts in bytes. .. versionadded:: 2.6 """ return self.__max_message_size def __simple_command(self, sock_info, dbname, spec): """Send a command to the server. """ rqst_id, msg, _ = message.query(0, dbname + '.$cmd', 0, -1, spec) start = time.time() try: sock_info.sock.sendall(msg) response = self.__receive_message_on_socket(1, rqst_id, sock_info) except: sock_info.close() raise end = time.time() response = helpers._unpack_response(response)['data'][0] msg = "command %r failed: %%s" % spec helpers._check_command_response(response, None, msg) return response, end - start def __try_node(self, node): """Try to connect to this node and see if it works for our connection type. Returns ((host, port), ismaster, isdbgrid, res_time). :Parameters: - `node`: The (host, port) pair to try. """ self.disconnect() self.__host, self.__port = node # Call 'ismaster' directly so we can get a response time. sock_info = self.__socket() try: response, res_time = self.__simple_command(sock_info, 'admin', {'ismaster': 1}) finally: self.__pool.maybe_return_socket(sock_info) # Are we talking to a mongos? isdbgrid = response.get('msg', '') == 'isdbgrid' if "maxBsonObjectSize" in response: self.__max_bson_size = response["maxBsonObjectSize"] if "maxMessageSizeBytes" in response: self.__max_message_size = response["maxMessageSizeBytes"] else: self.__max_message_size = 2 * self.max_bson_size # Replica Set? if not self.__direct: # Check that this host is part of the given replica set. if self.__repl: set_name = response.get('setName') # The 'setName' field isn't returned by mongod before 1.6.2 # so we can't assume that if it's missing this host isn't in # the specified set. if set_name and set_name != self.__repl: raise ConfigurationError("%s:%d is not a member of " "replica set %s" % (node[0], node[1], self.__repl)) if "hosts" in response: self.__nodes = set([_partition_node(h) for h in response["hosts"]]) else: # The user passed a seed list of standalone or # mongos instances. self.__nodes.add(node) if response["ismaster"]: return node, True, isdbgrid, res_time elif "primary" in response: candidate = _partition_node(response["primary"]) return self.__try_node(candidate) # Explain why we aren't using this connection. raise AutoReconnect('%s:%d is not primary or master' % node) # Direct connection if response.get("arbiterOnly", False) and not self.__direct: raise ConfigurationError("%s:%d is an arbiter" % node) return node, response['ismaster'], isdbgrid, res_time def __pick_nearest(self, candidates): """Return the 'nearest' candidate based on response time. """ latency = self.secondary_acceptable_latency_ms # Only used for mongos high availability, res_time is in seconds. fastest = min([res_time for candidate, res_time in candidates]) near_candidates = [ candidate for candidate, res_time in candidates if res_time - fastest < latency / 1000.0 ] node = random.choice(near_candidates) # Clear the pool from the last choice. self.disconnect() self.__host, self.__port = node return node def __find_node(self, seeds=None): """Find a host, port pair suitable for our connection type. If only one host was supplied to __init__ see if we can connect to it. Don't check if the host is a master/primary so we can make a direct connection to read from a secondary or send commands to an arbiter. If more than one host was supplied treat them as a seed list for connecting to a replica set or to support high availability for mongos. If connecting to a replica set try to find the primary and fail if we can't, possibly updating any replSet information on success. If a mongos seed list was provided find the "nearest" mongos and return it. Otherwise we iterate through the list trying to find a host we can send write operations to. Sets __host and __port so that :attr:`host` and :attr:`port` will return the address of the connected host. Sets __is_primary to True if this is a primary or master, else False. Sets __is_mongos to True if the connection is to a mongos. """ errors = [] mongos_candidates = [] candidates = seeds or self.__nodes.copy() for candidate in candidates: try: node, ismaster, isdbgrid, res_time = self.__try_node(candidate) self.__is_primary = ismaster self.__is_mongos = isdbgrid # No need to calculate nearest if we only have one mongos. if isdbgrid and not self.__direct: mongos_candidates.append((node, res_time)) continue elif len(mongos_candidates): raise ConfigurationError("Seed list cannot contain a mix " "of mongod and mongos instances.") return node except OperationFailure: # The server is available but something failed, probably auth. raise except Exception as why: errors.append(str(why)) # If we have a mongos seed list, pick the "nearest" member. if len(mongos_candidates): self.__is_mongos = True return self.__pick_nearest(mongos_candidates) # Otherwise, try any hosts we discovered that were not in the seed list. for candidate in self.__nodes - candidates: try: node, ismaster, isdbgrid, _ = self.__try_node(candidate) self.__is_primary = ismaster self.__is_mongos = isdbgrid return node except Exception as why: errors.append(str(why)) # Couldn't find a suitable host. self.disconnect() raise AutoReconnect(', '.join(errors)) def __socket(self): """Get a SocketInfo from the pool. """ host, port = (self.__host, self.__port) if host is None or (port is None and '/' not in host): host, port = self.__find_node() try: if self.auto_start_request and not self.in_request(): self.start_request() sock_info = self.__pool.get_socket((host, port)) except socket.error as why: self.disconnect() # Check if a unix domain socket if host.endswith('.sock'): host_details = "%s:" % host else: host_details = "%s:%d:" % (host, port) raise AutoReconnect("could not connect to " "%s %s" % (host_details, str(why))) try: self.__check_auth(sock_info) except OperationFailure: self.__pool.maybe_return_socket(sock_info) raise return sock_info def _ensure_connected(self, dummy): """Ensure this client instance is connected to a mongod/s. """ host, port = (self.__host, self.__port) if host is None or (port is None and '/' not in host): self.__find_node() def disconnect(self): """Disconnect from MongoDB. Disconnecting will close all underlying sockets in the connection pool. If this instance is used again it will be automatically re-opened. Care should be taken to make sure that :meth:`disconnect` is not called in the middle of a sequence of operations in which ordering is important. This could lead to unexpected results. .. seealso:: :meth:`end_request` .. versionadded:: 1.3 """ self.__pool.reset() self.__host = None self.__port = None def close(self): """Alias for :meth:`disconnect` Disconnecting will close all underlying sockets in the connection pool. If this instance is used again it will be automatically re-opened. Care should be taken to make sure that :meth:`disconnect` is not called in the middle of a sequence of operations in which ordering is important. This could lead to unexpected results. .. seealso:: :meth:`end_request` .. versionadded:: 2.1 """ self.disconnect() def alive(self): """Return ``False`` if there has been an error communicating with the server, else ``True``. This method attempts to check the status of the server with minimal I/O. The current thread / greenlet retrieves a socket from the pool (its request socket if it's in a request, or a random idle socket if it's not in a request) and checks whether calling `select`_ on it raises an error. If there are currently no idle sockets, :meth:`alive` will attempt to actually connect to the server. A more certain way to determine server availability is:: client.admin.command('ping') .. _select: http://docs.python.org/2/library/select.html#select.select """ # In the common case, a socket is available and was used recently, so # calling select() on it is a reasonable attempt to see if the OS has # reported an error. Note this can be wasteful: __socket implicitly # calls select() if the socket hasn't been checked in the last second, # or it may create a new socket, in which case calling select() is # redundant. sock_info = None try: try: sock_info = self.__socket() return not pool._closed(sock_info.sock) except (socket.error, ConnectionFailure): return False finally: self.__pool.maybe_return_socket(sock_info) def set_cursor_manager(self, manager_class): """Set this client's cursor manager. Raises :class:`TypeError` if `manager_class` is not a subclass of :class:`~pymongo.cursor_manager.CursorManager`. A cursor manager handles closing cursors. Different managers can implement different policies in terms of when to actually kill a cursor that has been closed. :Parameters: - `manager_class`: cursor manager to use .. versionchanged:: 2.1+ Deprecated support for external cursor managers. """ warnings.warn("Support for external cursor managers is deprecated " "and will be removed in PyMongo 3.0.", DeprecationWarning, stacklevel=2) manager = manager_class(self) if not isinstance(manager, CursorManager): raise TypeError("manager_class must be a subclass of " "CursorManager") self.__cursor_manager = manager def __check_response_to_last_error(self, response): """Check a response to a lastError message for errors. `response` is a byte string representing a response to the message. If it represents an error response we raise OperationFailure. Return the response as a document. """ response = helpers._unpack_response(response) assert response["number_returned"] == 1 error = response["data"][0] helpers._check_command_response(error, self.disconnect) error_msg = error.get("err", "") if error_msg is None: return error if error_msg.startswith("not master"): self.disconnect() raise AutoReconnect(error_msg) details = error # mongos returns the error code in an error object # for some errors. if "errObjects" in error: for errobj in error["errObjects"]: if errobj["err"] == error_msg: details = errobj break if "code" in details: if details["code"] in (11000, 11001, 12582): raise DuplicateKeyError(details["err"], details["code"]) else: raise OperationFailure(details["err"], details["code"]) else: raise OperationFailure(details["err"]) def __check_bson_size(self, message): """Make sure the message doesn't include BSON documents larger than the connected server will accept. :Parameters: - `message`: message to check """ if len(message) == 3: (request_id, data, max_doc_size) = message if max_doc_size > self.__max_bson_size: raise InvalidDocument("BSON document too large (%d bytes)" " - the connected server supports" " BSON document sizes up to %d" " bytes." % (max_doc_size, self.__max_bson_size)) return (request_id, data) else: # get_more and kill_cursors messages # don't include BSON documents. return message def _send_message(self, message, with_last_error=False, check_primary=True): """Say something to Mongo. Raises ConnectionFailure if the message cannot be sent. Raises OperationFailure if `with_last_error` is ``True`` and the response to the getLastError call returns an error. Return the response from lastError, or ``None`` if `with_last_error` is ``False``. :Parameters: - `message`: message to send - `with_last_error`: check getLastError status after sending the message - `check_primary`: don't try to write to a non-primary; see kill_cursors for an exception to this rule """ if check_primary and not with_last_error and not self.is_primary: # The write won't succeed, bail as if we'd done a getLastError raise AutoReconnect("not master") sock_info = self.__socket() try: try: (request_id, data) = self.__check_bson_size(message) sock_info.sock.sendall(data) # Safe mode. We pack the message together with a lastError # message and send both. We then get the response (to the # lastError) and raise OperationFailure if it is an error # response. rv = None if with_last_error: response = self.__receive_message_on_socket(1, request_id, sock_info) rv = self.__check_response_to_last_error(response) return rv except OperationFailure: raise except (ConnectionFailure, socket.error) as e: self.disconnect() raise AutoReconnect(str(e)) except: sock_info.close() raise finally: self.__pool.maybe_return_socket(sock_info) def __receive_data_on_socket(self, length, sock_info): """Lowest level receive operation. Takes length to receive and repeatedly calls recv until able to return a buffer of that length, raising ConnectionFailure on error. """ message = EMPTY while length: chunk = sock_info.sock.recv(length) if chunk == EMPTY: raise ConnectionFailure("connection closed") length -= len(chunk) message += chunk return message def __receive_message_on_socket(self, operation, rqst_id, sock_info): """Receive a message in response to `rqst_id` on `sock`. Returns the response data with the header removed. """ header = self.__receive_data_on_socket(16, sock_info) length = struct.unpack(">> client = pymongo.MongoClient(auto_start_request=False) >>> db = client.test >>> _id = db.test_collection.insert({}) >>> with client.start_request(): ... for i in range(100): ... db.test_collection.update({'_id': _id}, {'$set': {'i':i}}) ... ... # Definitely read the document after the final update completes ... print(db.test_collection.find({'_id': _id})) If a thread or greenlet calls start_request multiple times, an equal number of calls to :meth:`end_request` is required to end the request. .. versionchanged:: 2.4 Now counts the number of calls to start_request and doesn't end request until an equal number of calls to end_request. .. versionadded:: 2.2 The :class:`~pymongo.pool.Request` return value. :meth:`start_request` previously returned None """ self.__pool.start_request() return pool.Request(self) def in_request(self): """True if this thread is in a request, meaning it has a socket reserved for its exclusive use. """ return self.__pool.in_request() def end_request(self): """Undo :meth:`start_request`. If :meth:`end_request` is called as many times as :meth:`start_request`, the request is over and this thread's connection returns to the pool. Extra calls to :meth:`end_request` have no effect. Ending a request allows the :class:`~socket.socket` that has been reserved for this thread by :meth:`start_request` to be returned to the pool. Other threads will then be able to re-use that :class:`~socket.socket`. If your application uses many threads, or has long-running threads that infrequently perform MongoDB operations, then judicious use of this method can lead to performance gains. Care should be taken, however, to make sure that :meth:`end_request` is not called in the middle of a sequence of operations in which ordering is important. This could lead to unexpected results. """ self.__pool.end_request() def __eq__(self, other): if isinstance(other, self.__class__): us = (self.__host, self.__port) them = (other.__host, other.__port) return us == them return NotImplemented def __ne__(self, other): return not self == other def __repr__(self): if len(self.__nodes) == 1: return "MongoClient(%r, %r)" % (self.__host, self.__port) else: return "MongoClient(%r)" % ["%s:%d" % n for n in self.__nodes] def __getattr__(self, name): """Get a database by name. Raises :class:`~pymongo.errors.InvalidName` if an invalid database name is used. :Parameters: - `name`: the name of the database to get """ return database.Database(self, name) def __getitem__(self, name): """Get a database by name. Raises :class:`~pymongo.errors.InvalidName` if an invalid database name is used. :Parameters: - `name`: the name of the database to get """ return self.__getattr__(name) def close_cursor(self, cursor_id): """Close a single database cursor. Raises :class:`TypeError` if `cursor_id` is not an instance of ``(int, long)``. What closing the cursor actually means depends on this client's cursor manager. :Parameters: - `cursor_id`: id of cursor to close """ if not isinstance(cursor_id, int): raise TypeError("cursor_id must be an instance of (int, long)") self.__cursor_manager.close(cursor_id) def kill_cursors(self, cursor_ids): """Send a kill cursors message with the given ids. Raises :class:`TypeError` if `cursor_ids` is not an instance of ``list``. :Parameters: - `cursor_ids`: list of cursor ids to kill """ if not isinstance(cursor_ids, list): raise TypeError("cursor_ids must be a list") return self._send_message( message.kill_cursors(cursor_ids), check_primary=False) def server_info(self): """Get information about the MongoDB server we're connected to. """ return self.admin.command("buildinfo") def database_names(self): """Get a list of the names of all databases on the connected server. """ return [db["name"] for db in self.admin.command("listDatabases")["databases"]] def drop_database(self, name_or_database): """Drop a database. Raises :class:`TypeError` if `name_or_database` is not an instance of :class:`basestring` (:class:`str` in python 3) or Database. :Parameters: - `name_or_database`: the name of a database to drop, or a :class:`~pymongo.database.Database` instance representing the database to drop """ name = name_or_database if isinstance(name, database.Database): name = name.name if not isinstance(name, str): raise TypeError("name_or_database must be an instance of " "%s or Database" % (str.__name__,)) self._purge_index(name) self[name].command("dropDatabase") def copy_database(self, from_name, to_name, from_host=None, username=None, password=None): """Copy a database, potentially from another host. Raises :class:`TypeError` if `from_name` or `to_name` is not an instance of :class:`basestring` (:class:`str` in python 3). Raises :class:`~pymongo.errors.InvalidName` if `to_name` is not a valid database name. If `from_host` is ``None`` the current host is used as the source. Otherwise the database is copied from `from_host`. If the source database requires authentication, `username` and `password` must be specified. :Parameters: - `from_name`: the name of the source database - `to_name`: the name of the target database - `from_host` (optional): host name to copy from - `username` (optional): username for source database - `password` (optional): password for source database .. note:: Specifying `username` and `password` requires server version **>= 1.3.3+**. .. versionadded:: 1.5 """ if not isinstance(from_name, str): raise TypeError("from_name must be an instance " "of %s" % (str.__name__,)) if not isinstance(to_name, str): raise TypeError("to_name must be an instance " "of %s" % (str.__name__,)) database._check_name(to_name) command = {"fromdb": from_name, "todb": to_name} if from_host is not None: command["fromhost"] = from_host try: self.start_request() if username is not None: nonce = self.admin.command("copydbgetnonce", fromhost=from_host)["nonce"] command["username"] = username command["nonce"] = nonce command["key"] = auth._auth_key(nonce, username, password) return self.admin.command("copydb", **command) finally: self.end_request() def get_default_database(self): """Get the database named in the MongoDB connection URI. >>> uri = 'mongodb://host/my_database' >>> client = MongoClient(uri) >>> db = client.get_default_database() >>> assert db.name == 'my_database' Useful in scripts where you want to choose which database to use based only on the URI in a configuration file. """ if self.__default_database_name is None: raise ConfigurationError('No default database defined') return self[self.__default_database_name] @property def is_locked(self): """Is this server locked? While locked, all write operations are blocked, although read operations may still be allowed. Use :meth:`unlock` to unlock. .. versionadded:: 2.0 """ ops = self.admin.current_op() return bool(ops.get('fsyncLock', 0)) def fsync(self, **kwargs): """Flush all pending writes to datafiles. :Parameters: Optional parameters can be passed as keyword arguments: - `lock`: If True lock the server to disallow writes. - `async`: If True don't block while synchronizing. .. warning:: `async` and `lock` can not be used together. .. warning:: MongoDB does not support the `async` option on Windows and will raise an exception on that platform. .. versionadded:: 2.0 """ self.admin.command("fsync", **kwargs) def unlock(self): """Unlock a previously locked server. .. versionadded:: 2.0 """ self.admin['$cmd'].sys.unlock.find_one() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.disconnect() def __iter__(self): return self def __next__(self): raise TypeError("'MongoClient' object is not iterable")