Skip to content

Commit 0cfba49

Browse files
authored
PYTHON-5662 - Add support for server selection's deprioritized servers to all topologies (#2639)
1 parent f813437 commit 0cfba49

34 files changed

+1651
-72
lines changed

pymongo/asynchronous/mongo_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2825,7 +2825,7 @@ async def run(self) -> T:
28252825
if self._last_error is None:
28262826
self._last_error = exc
28272827

2828-
if self._client.topology_description.topology_type == TOPOLOGY_TYPE.Sharded:
2828+
if self._server is not None:
28292829
self._deprioritized_servers.append(self._server)
28302830

28312831
def _is_not_eligible_for_retry(self) -> bool:

pymongo/asynchronous/topology.py

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ async def select_servers(
268268
server_selection_timeout: Optional[float] = None,
269269
address: Optional[_Address] = None,
270270
operation_id: Optional[int] = None,
271+
deprioritized_servers: Optional[list[Server]] = None,
271272
) -> list[Server]:
272273
"""Return a list of Servers matching selector, or time out.
273274
@@ -295,7 +296,12 @@ async def select_servers(
295296

296297
async with self._lock:
297298
server_descriptions = await self._select_servers_loop(
298-
selector, server_timeout, operation, operation_id, address
299+
selector,
300+
server_timeout,
301+
operation,
302+
operation_id,
303+
address,
304+
deprioritized_servers=deprioritized_servers,
299305
)
300306

301307
return [
@@ -309,6 +315,7 @@ async def _select_servers_loop(
309315
operation: str,
310316
operation_id: Optional[int],
311317
address: Optional[_Address],
318+
deprioritized_servers: Optional[list[Server]] = None,
312319
) -> list[ServerDescription]:
313320
"""select_servers() guts. Hold the lock when calling this."""
314321
now = time.monotonic()
@@ -327,7 +334,12 @@ async def _select_servers_loop(
327334
)
328335

329336
server_descriptions = self._description.apply_selector(
330-
selector, address, custom_selector=self._settings.server_selector
337+
selector,
338+
address,
339+
custom_selector=self._settings.server_selector,
340+
deprioritized_servers=[server.description for server in deprioritized_servers]
341+
if deprioritized_servers
342+
else None,
331343
)
332344

333345
while not server_descriptions:
@@ -388,9 +400,13 @@ async def _select_server(
388400
operation_id: Optional[int] = None,
389401
) -> Server:
390402
servers = await self.select_servers(
391-
selector, operation, server_selection_timeout, address, operation_id
403+
selector,
404+
operation,
405+
server_selection_timeout,
406+
address,
407+
operation_id,
408+
deprioritized_servers,
392409
)
393-
servers = _filter_servers(servers, deprioritized_servers)
394410
if len(servers) == 1:
395411
return servers[0]
396412
server1, server2 = random.sample(servers, 2)
@@ -1119,16 +1135,3 @@ def _is_stale_server_description(current_sd: ServerDescription, new_sd: ServerDe
11191135
if current_tv["processId"] != new_tv["processId"]:
11201136
return False
11211137
return current_tv["counter"] > new_tv["counter"]
1122-
1123-
1124-
def _filter_servers(
1125-
candidates: list[Server], deprioritized_servers: Optional[list[Server]] = None
1126-
) -> list[Server]:
1127-
"""Filter out deprioritized servers from a list of server candidates."""
1128-
if not deprioritized_servers:
1129-
return candidates
1130-
1131-
filtered = [server for server in candidates if server not in deprioritized_servers]
1132-
1133-
# If not possible to pick a prioritized server, return the original list
1134-
return filtered or candidates

pymongo/server_selectors.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,16 @@ class Selection:
3434

3535
@classmethod
3636
def from_topology_description(cls, topology_description: TopologyDescription) -> Selection:
37-
known_servers = topology_description.known_servers
37+
candidate_servers = topology_description.candidate_servers
3838
primary = None
39-
for sd in known_servers:
39+
for sd in candidate_servers:
4040
if sd.server_type == SERVER_TYPE.RSPrimary:
4141
primary = sd
4242
break
4343

4444
return Selection(
4545
topology_description,
46-
topology_description.known_servers,
46+
topology_description.candidate_servers,
4747
topology_description.common_wire_version,
4848
primary,
4949
)

pymongo/synchronous/mongo_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2815,7 +2815,7 @@ def run(self) -> T:
28152815
if self._last_error is None:
28162816
self._last_error = exc
28172817

2818-
if self._client.topology_description.topology_type == TOPOLOGY_TYPE.Sharded:
2818+
if self._server is not None:
28192819
self._deprioritized_servers.append(self._server)
28202820

28212821
def _is_not_eligible_for_retry(self) -> bool:

pymongo/synchronous/topology.py

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ def select_servers(
268268
server_selection_timeout: Optional[float] = None,
269269
address: Optional[_Address] = None,
270270
operation_id: Optional[int] = None,
271+
deprioritized_servers: Optional[list[Server]] = None,
271272
) -> list[Server]:
272273
"""Return a list of Servers matching selector, or time out.
273274
@@ -295,7 +296,12 @@ def select_servers(
295296

296297
with self._lock:
297298
server_descriptions = self._select_servers_loop(
298-
selector, server_timeout, operation, operation_id, address
299+
selector,
300+
server_timeout,
301+
operation,
302+
operation_id,
303+
address,
304+
deprioritized_servers=deprioritized_servers,
299305
)
300306

301307
return [
@@ -309,6 +315,7 @@ def _select_servers_loop(
309315
operation: str,
310316
operation_id: Optional[int],
311317
address: Optional[_Address],
318+
deprioritized_servers: Optional[list[Server]] = None,
312319
) -> list[ServerDescription]:
313320
"""select_servers() guts. Hold the lock when calling this."""
314321
now = time.monotonic()
@@ -327,7 +334,12 @@ def _select_servers_loop(
327334
)
328335

329336
server_descriptions = self._description.apply_selector(
330-
selector, address, custom_selector=self._settings.server_selector
337+
selector,
338+
address,
339+
custom_selector=self._settings.server_selector,
340+
deprioritized_servers=[server.description for server in deprioritized_servers]
341+
if deprioritized_servers
342+
else None,
331343
)
332344

333345
while not server_descriptions:
@@ -388,9 +400,13 @@ def _select_server(
388400
operation_id: Optional[int] = None,
389401
) -> Server:
390402
servers = self.select_servers(
391-
selector, operation, server_selection_timeout, address, operation_id
403+
selector,
404+
operation,
405+
server_selection_timeout,
406+
address,
407+
operation_id,
408+
deprioritized_servers,
392409
)
393-
servers = _filter_servers(servers, deprioritized_servers)
394410
if len(servers) == 1:
395411
return servers[0]
396412
server1, server2 = random.sample(servers, 2)
@@ -1117,16 +1133,3 @@ def _is_stale_server_description(current_sd: ServerDescription, new_sd: ServerDe
11171133
if current_tv["processId"] != new_tv["processId"]:
11181134
return False
11191135
return current_tv["counter"] > new_tv["counter"]
1120-
1121-
1122-
def _filter_servers(
1123-
candidates: list[Server], deprioritized_servers: Optional[list[Server]] = None
1124-
) -> list[Server]:
1125-
"""Filter out deprioritized servers from a list of server candidates."""
1126-
if not deprioritized_servers:
1127-
return candidates
1128-
1129-
filtered = [server for server in candidates if server not in deprioritized_servers]
1130-
1131-
# If not possible to pick a prioritized server, return the original list
1132-
return filtered or candidates

pymongo/topology_description.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ def __init__(
8585
self._server_descriptions = server_descriptions
8686
self._max_set_version = max_set_version
8787
self._max_election_id = max_election_id
88+
self._candidate_servers = list(self._server_descriptions.values())
8889

8990
# The heartbeat_frequency is used in staleness estimates.
9091
self._topology_settings = topology_settings
@@ -248,6 +249,11 @@ def readable_servers(self) -> list[ServerDescription]:
248249
"""List of readable Servers."""
249250
return [s for s in self._server_descriptions.values() if s.is_readable]
250251

252+
@property
253+
def candidate_servers(self) -> list[ServerDescription]:
254+
"""List of Servers excluding deprioritized servers."""
255+
return self._candidate_servers
256+
251257
@property
252258
def common_wire_version(self) -> Optional[int]:
253259
"""Minimum of all servers' max wire versions, or None."""
@@ -283,11 +289,27 @@ def _apply_local_threshold(self, selection: Optional[Selection]) -> list[ServerD
283289
if (cast(float, s.round_trip_time) - fastest) <= threshold
284290
]
285291

292+
def _filter_servers(
293+
self, deprioritized_servers: Optional[list[ServerDescription]] = None
294+
) -> None:
295+
"""Filter out deprioritized servers from a list of server candidates."""
296+
if not deprioritized_servers:
297+
self._candidate_servers = self.known_servers
298+
else:
299+
deprioritized_addresses = {sd.address for sd in deprioritized_servers}
300+
filtered = [
301+
server
302+
for server in self.known_servers
303+
if server.address not in deprioritized_addresses
304+
]
305+
self._candidate_servers = filtered or self.known_servers
306+
286307
def apply_selector(
287308
self,
288309
selector: Any,
289310
address: Optional[_Address] = None,
290311
custom_selector: Optional[_ServerSelector] = None,
312+
deprioritized_servers: Optional[list[ServerDescription]] = None,
291313
) -> list[ServerDescription]:
292314
"""List of servers matching the provided selector(s).
293315
@@ -324,21 +346,35 @@ def apply_selector(
324346
description = self.server_descriptions().get(address)
325347
return [description] if description and description.is_server_type_known else []
326348

349+
self._filter_servers(deprioritized_servers)
327350
# Primary selection fast path.
328351
if self.topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary and type(selector) is Primary:
329-
for sd in self._server_descriptions.values():
352+
for sd in self._candidate_servers:
330353
if sd.server_type == SERVER_TYPE.RSPrimary:
331354
sds = [sd]
332355
if custom_selector:
333356
sds = custom_selector(sds)
334357
return sds
358+
# All primaries are deprioritized
359+
if deprioritized_servers:
360+
for sd in deprioritized_servers:
361+
if sd.server_type == SERVER_TYPE.RSPrimary:
362+
sds = [sd]
363+
if custom_selector:
364+
sds = custom_selector(sds)
365+
return sds
335366
# No primary found, return an empty list.
336367
return []
337368

338369
selection = Selection.from_topology_description(self)
339370
# Ignore read preference for sharded clusters.
340371
if self.topology_type != TOPOLOGY_TYPE.Sharded:
341372
selection = selector(selection)
373+
# No suitable servers found, apply preference again but include deprioritized servers.
374+
if not selection and deprioritized_servers:
375+
self._filter_servers(None)
376+
selection = Selection.from_topology_description(self)
377+
selection = selector(selection)
342378

343379
# Apply custom selector followed by localThresholdMS.
344380
if custom_selector is not None and selection:

test/asynchronous/utils_selection_tests.py

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from bson import json_util
3636
from pymongo.asynchronous.settings import TopologySettings
3737
from pymongo.asynchronous.topology import Topology
38-
from pymongo.common import HEARTBEAT_FREQUENCY
38+
from pymongo.common import HEARTBEAT_FREQUENCY, clean_node
3939
from pymongo.errors import AutoReconnect, ConfigurationError
4040
from pymongo.operations import _Op
4141
from pymongo.server_selectors import writable_server_selector
@@ -95,12 +95,21 @@ async def run_scenario(self):
9595
# "Eligible servers" is defined in the server selection spec as
9696
# the set of servers matching both the ReadPreference's mode
9797
# and tag sets.
98-
top_latency = await create_topology(scenario_def)
98+
top_suitable = await create_topology(scenario_def, local_threshold_ms=1000000)
9999

100100
# "In latency window" is defined in the server selection
101101
# spec as the subset of suitable_servers that falls within the
102102
# allowable latency window.
103-
top_suitable = await create_topology(scenario_def, local_threshold_ms=1000000)
103+
top_latency = await create_topology(scenario_def)
104+
105+
top_suitable_deprioritized_servers = [
106+
top_suitable.get_server_by_address(clean_node(server["address"]))
107+
for server in scenario_def.get("deprioritized_servers", [])
108+
]
109+
top_latency_deprioritized_servers = [
110+
top_latency.get_server_by_address(clean_node(server["address"]))
111+
for server in scenario_def.get("deprioritized_servers", [])
112+
]
104113

105114
# Create server selector.
106115
if scenario_def.get("operation") == "write":
@@ -120,21 +129,37 @@ async def run_scenario(self):
120129
# Select servers.
121130
if not scenario_def.get("suitable_servers"):
122131
with self.assertRaises(AutoReconnect):
123-
await top_suitable.select_server(pref, _Op.TEST, server_selection_timeout=0)
132+
await top_suitable.select_server(
133+
pref,
134+
_Op.TEST,
135+
server_selection_timeout=0,
136+
deprioritized_servers=top_suitable_deprioritized_servers,
137+
)
124138

125139
return
126140

127141
if not scenario_def["in_latency_window"]:
128142
with self.assertRaises(AutoReconnect):
129-
await top_latency.select_server(pref, _Op.TEST, server_selection_timeout=0)
143+
await top_latency.select_server(
144+
pref,
145+
_Op.TEST,
146+
server_selection_timeout=0,
147+
deprioritized_servers=top_latency_deprioritized_servers,
148+
)
130149

131150
return
132151

133152
actual_suitable_s = await top_suitable.select_servers(
134-
pref, _Op.TEST, server_selection_timeout=0
153+
pref,
154+
_Op.TEST,
155+
server_selection_timeout=0,
156+
deprioritized_servers=top_suitable_deprioritized_servers,
135157
)
136158
actual_latency_s = await top_latency.select_servers(
137-
pref, _Op.TEST, server_selection_timeout=0
159+
pref,
160+
_Op.TEST,
161+
server_selection_timeout=0,
162+
deprioritized_servers=top_latency_deprioritized_servers,
138163
)
139164

140165
expected_suitable_servers = {}

0 commit comments

Comments
 (0)