* Examples work * setup.py kinda updasted * Fork of txmongo but with new pymongo embedded
212 lines
6.4 KiB
Python
212 lines
6.4 KiB
Python
# Copyright 2012 10gen, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License",
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Utilities for choosing which member of a replica set to read from."""
|
|
|
|
import random
|
|
|
|
from asyncio_mongo._pymongo.errors import ConfigurationError
|
|
|
|
|
|
class ReadPreference:
|
|
"""An enum that defines the read preference modes supported by PyMongo.
|
|
Used in three cases:
|
|
|
|
:class:`~pymongo.mongo_client.MongoClient` connected to a single host:
|
|
|
|
* `PRIMARY`: Queries are allowed if the host is standalone or the replica
|
|
set primary.
|
|
* All other modes allow queries to standalone servers, to the primary, or
|
|
to secondaries.
|
|
|
|
:class:`~pymongo.mongo_client.MongoClient` connected to a mongos, with a
|
|
sharded cluster of replica sets:
|
|
|
|
* `PRIMARY`: Queries are sent to the primary of a shard.
|
|
* `PRIMARY_PREFERRED`: Queries are sent to the primary if available,
|
|
otherwise a secondary.
|
|
* `SECONDARY`: Queries are distributed among shard secondaries. An error
|
|
is raised if no secondaries are available.
|
|
* `SECONDARY_PREFERRED`: Queries are distributed among shard secondaries,
|
|
or the primary if no secondary is available.
|
|
* `NEAREST`: Queries are distributed among all members of a shard.
|
|
|
|
:class:`~pymongo.mongo_replica_set_client.MongoReplicaSetClient`:
|
|
|
|
* `PRIMARY`: Queries are sent to the primary of the replica set.
|
|
* `PRIMARY_PREFERRED`: Queries are sent to the primary if available,
|
|
otherwise a secondary.
|
|
* `SECONDARY`: Queries are distributed among secondaries. An error
|
|
is raised if no secondaries are available.
|
|
* `SECONDARY_PREFERRED`: Queries are distributed among secondaries,
|
|
or the primary if no secondary is available.
|
|
* `NEAREST`: Queries are distributed among all members.
|
|
"""
|
|
|
|
PRIMARY = 0
|
|
PRIMARY_PREFERRED = 1
|
|
SECONDARY = 2
|
|
SECONDARY_ONLY = 2
|
|
SECONDARY_PREFERRED = 3
|
|
NEAREST = 4
|
|
|
|
# For formatting error messages
|
|
modes = {
|
|
ReadPreference.PRIMARY: 'PRIMARY',
|
|
ReadPreference.PRIMARY_PREFERRED: 'PRIMARY_PREFERRED',
|
|
ReadPreference.SECONDARY: 'SECONDARY',
|
|
ReadPreference.SECONDARY_PREFERRED: 'SECONDARY_PREFERRED',
|
|
ReadPreference.NEAREST: 'NEAREST',
|
|
}
|
|
|
|
_mongos_modes = [
|
|
'primary',
|
|
'primaryPreferred',
|
|
'secondary',
|
|
'secondaryPreferred',
|
|
'nearest',
|
|
]
|
|
|
|
def mongos_mode(mode):
|
|
return _mongos_modes[mode]
|
|
|
|
def mongos_enum(enum):
|
|
return _mongos_modes.index(enum)
|
|
|
|
def select_primary(members):
|
|
for member in members:
|
|
if member.is_primary:
|
|
if member.up:
|
|
return member
|
|
else:
|
|
return None
|
|
|
|
return None
|
|
|
|
|
|
def select_member_with_tags(members, tags, secondary_only, latency):
|
|
candidates = []
|
|
|
|
for candidate in members:
|
|
if not candidate.up:
|
|
continue
|
|
|
|
if secondary_only and candidate.is_primary:
|
|
continue
|
|
|
|
if not (candidate.is_primary or candidate.is_secondary):
|
|
# In RECOVERING or similar state
|
|
continue
|
|
|
|
if candidate.matches_tags(tags):
|
|
candidates.append(candidate)
|
|
|
|
if not candidates:
|
|
return None
|
|
|
|
# ping_time is in seconds
|
|
fastest = min([candidate.get_avg_ping_time() for candidate in candidates])
|
|
near_candidates = [
|
|
candidate for candidate in candidates
|
|
if candidate.get_avg_ping_time() - fastest < latency / 1000.]
|
|
|
|
return random.choice(near_candidates)
|
|
|
|
|
|
def select_member(
|
|
members,
|
|
mode=ReadPreference.PRIMARY,
|
|
tag_sets=None,
|
|
latency=15
|
|
):
|
|
"""Return a Member or None.
|
|
"""
|
|
if tag_sets is None:
|
|
tag_sets = [{}]
|
|
|
|
# For brevity
|
|
PRIMARY = ReadPreference.PRIMARY
|
|
PRIMARY_PREFERRED = ReadPreference.PRIMARY_PREFERRED
|
|
SECONDARY = ReadPreference.SECONDARY
|
|
SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED
|
|
NEAREST = ReadPreference.NEAREST
|
|
|
|
if mode == PRIMARY:
|
|
if tag_sets != [{}]:
|
|
raise ConfigurationError("PRIMARY cannot be combined with tags")
|
|
return select_primary(members)
|
|
|
|
elif mode == PRIMARY_PREFERRED:
|
|
# Recurse.
|
|
candidate_primary = select_member(members, PRIMARY, [{}], latency)
|
|
if candidate_primary:
|
|
return candidate_primary
|
|
else:
|
|
return select_member(members, SECONDARY, tag_sets, latency)
|
|
|
|
elif mode == SECONDARY:
|
|
for tags in tag_sets:
|
|
candidate = select_member_with_tags(members, tags, True, latency)
|
|
if candidate:
|
|
return candidate
|
|
|
|
return None
|
|
|
|
elif mode == SECONDARY_PREFERRED:
|
|
# Recurse.
|
|
candidate_secondary = select_member(
|
|
members, SECONDARY, tag_sets, latency)
|
|
if candidate_secondary:
|
|
return candidate_secondary
|
|
else:
|
|
return select_member(members, PRIMARY, [{}], latency)
|
|
|
|
elif mode == NEAREST:
|
|
for tags in tag_sets:
|
|
candidate = select_member_with_tags(members, tags, False, latency)
|
|
if candidate:
|
|
return candidate
|
|
|
|
# Ran out of tags.
|
|
return None
|
|
|
|
else:
|
|
raise ConfigurationError("Invalid mode %s" % repr(mode))
|
|
|
|
|
|
"""Commands that may be sent to replica-set secondaries, depending on
|
|
ReadPreference and tags. All other commands are always run on the primary.
|
|
"""
|
|
secondary_ok_commands = frozenset([
|
|
"group", "aggregate", "collstats", "dbstats", "count", "distinct",
|
|
"geonear", "geosearch", "geowalk", "mapreduce", "getnonce", "authenticate",
|
|
"text",
|
|
])
|
|
|
|
|
|
class MovingAverage(object):
|
|
def __init__(self, samples):
|
|
"""Immutable structure to track a 5-sample moving average.
|
|
"""
|
|
self.samples = samples[-5:]
|
|
assert self.samples
|
|
self.average = sum(self.samples) / float(len(self.samples))
|
|
|
|
def clone_with(self, sample):
|
|
"""Get a copy of this instance plus a new sample"""
|
|
return MovingAverage(self.samples + [sample])
|
|
|
|
def get(self):
|
|
return self.average
|