Skip to content

Commit 22a2610

Browse files
fix: read/write splitting switch to writer (#1080)
Co-authored-by: Sophia Chu <112967780+sophia-bq@users.noreply.github.com>
1 parent 2f07ddd commit 22a2610

5 files changed

Lines changed: 191 additions & 50 deletions

File tree

aws_advanced_python_wrapper/read_write_splitting_plugin.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ def _set_reader_connection(
185185

186186
def _initialize_writer_connection(self):
187187
conn, writer_host = self._connection_handler.open_new_writer_connection(lambda x: self._plugin_service.connect(x, self._properties, self))
188-
189188
if conn is None:
190189
self.log_and_raise_exception(
191190
"ReadWriteSplittingPlugin.FailedToConnectToWriter"
@@ -280,13 +279,18 @@ def _switch_to_writer_connection(self):
280279
# Already connected to the intended writer.
281280
return
282281

282+
self._writer_host_info = self._connection_handler.get_writer_host_info()
283283
self._in_read_write_split = True
284284
if not self._is_connection_usable(self._writer_connection, driver_dialect):
285285
self._initialize_writer_connection()
286286
elif self._writer_connection is not None and self._writer_host_info is not None:
287-
self._switch_current_connection_to(
288-
self._writer_connection, self._writer_host_info
289-
)
287+
if self._connection_handler.can_host_be_used(self._writer_host_info):
288+
self._switch_current_connection_to(
289+
self._writer_connection, self._writer_host_info
290+
)
291+
else:
292+
ReadWriteSplittingConnectionManager.log_and_raise_exception(
293+
"ReadWriteSplittingPlugin.NoWriterFound")
290294

291295
if self._is_reader_conn_from_internal_pool:
292296
self._close_connection_if_idle(self._reader_connection)
@@ -508,6 +512,10 @@ def refresh_and_store_host_list(
508512
"""Refreshes the host list and then stores it."""
509513
...
510514

515+
def get_writer_host_info(self) -> Optional[HostInfo]:
516+
"""Get the current writer host info."""
517+
...
518+
511519

512520
class TopologyBasedConnectionHandler(ReadWriteConnectionHandler):
513521
"""Topology based implementation of connection handling logic."""
@@ -538,7 +546,7 @@ def open_new_writer_connection(
538546
self,
539547
plugin_service_connect_func: Callable[[HostInfo], Connection],
540548
) -> tuple[Optional[Connection], Optional[HostInfo]]:
541-
writer_host = self._get_writer()
549+
writer_host = self.get_writer_host_info()
542550
if writer_host is None:
543551
return None, None
544552

@@ -621,7 +629,7 @@ def can_host_be_used(self, host_info: HostInfo) -> bool:
621629

622630
def has_no_readers(self) -> bool:
623631
if len(self._hosts) == 1:
624-
return self._get_writer() is not None
632+
return self.get_writer_host_info() is not None
625633
return False
626634

627635
def refresh_and_store_host_list(
@@ -657,14 +665,11 @@ def is_writer_host(self, current_host: HostInfo) -> bool:
657665
def is_reader_host(self, current_host) -> bool:
658666
return current_host.role == HostRole.READER
659667

660-
def _get_writer(self) -> Optional[HostInfo]:
668+
def get_writer_host_info(self) -> Optional[HostInfo]:
661669
for host in self._hosts:
662670
if host.role == HostRole.WRITER:
663671
return host
664672

665-
ReadWriteSplittingConnectionManager.log_and_raise_exception(
666-
"ReadWriteSplittingPlugin.NoWriterFound"
667-
)
668673
return None
669674

670675

aws_advanced_python_wrapper/simple_read_write_splitting_plugin.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,9 @@ def is_reader_host(self, current_host: HostInfo) -> bool:
228228
or current_host.url.casefold() == self._read_endpoint
229229
)
230230

231+
def get_writer_host_info(self) -> Optional[HostInfo]:
232+
return self._write_endpoint_host_info
233+
231234
def _create_host_info(self, endpoint: str, role: HostRole) -> HostInfo:
232235
endpoint = endpoint.strip()
233236
host = endpoint

tests/integration/container/test_aws_secrets_manager.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,12 @@ def test_failover_with_secrets_manager(
215215
props.update({
216216
"plugins": "failover,aws_secrets_manager",
217217
"secrets_manager_secret_id": secret_name,
218-
"secrets_manager_region": region
218+
"secrets_manager_region": region,
219+
"socket_timeout": 10,
220+
"connect_timeout": 10,
221+
"monitoring-connect_timeout": 5,
222+
"monitoring-socket_timeout": 5,
223+
"topology_refresh_ms": 10,
219224
})
220225

221226
with AwsWrapperConnection.connect(

tests/integration/container/test_custom_endpoint.py

Lines changed: 147 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from aws_advanced_python_wrapper import AwsWrapperConnection
3030
from aws_advanced_python_wrapper.errors import (FailoverSuccessError,
3131
ReadWriteSplittingError)
32+
from aws_advanced_python_wrapper.hostinfo import HostRole
3233
from aws_advanced_python_wrapper.utils.log import Logger
3334
from aws_advanced_python_wrapper.utils.properties import (Properties,
3435
WrapperProperties)
@@ -195,76 +196,183 @@ def test_custom_endpoint_failover(self, test_driver: TestDriver, conn_utils, pro
195196

196197
conn.close()
197198

198-
def test_custom_endpoint_read_write_splitting__with_custom_endpoint_changes(
199+
def _setup_custom_endpoint_role(self, target_driver_connect, conn_kwargs, rds_utils, host_role: HostRole):
200+
self.logger.debug("Setting up custom endpoint instance with role: " + host_role.name)
201+
props = {'plugins': ''}
202+
original_writer = rds_utils.get_cluster_writer_instance_id()
203+
failover_target = None
204+
with AwsWrapperConnection.connect(target_driver_connect, **conn_kwargs, **props) as conn:
205+
endpoint_members = self.endpoint_info["StaticMembers"]
206+
original_instance_id = rds_utils.query_instance_id(conn)
207+
self.logger.debug("Original instance id: " + original_instance_id)
208+
assert original_instance_id in endpoint_members
209+
210+
if host_role == HostRole.WRITER:
211+
if original_instance_id == original_writer:
212+
self.logger.debug("Role is already " + host_role.name + ", no failover needed.")
213+
return # Do nothing, no need to failover.
214+
failover_target = original_instance_id
215+
self.logger.debug("Failing over to get writer role...")
216+
elif host_role == HostRole.READER:
217+
if original_instance_id != original_writer:
218+
self.logger.debug("Role is already " + host_role.name + ", no failover needed.")
219+
return # Do nothing, no need to failover.
220+
self.logger.debug("Failing over to get reader role...")
221+
222+
rds_utils.failover_cluster_and_wait_until_writer_changed(target_id=failover_target)
223+
224+
self.logger.debug("Verifying that new connection has role: " + host_role.name)
225+
# Verify that new connection is now the correct role
226+
with AwsWrapperConnection.connect(target_driver_connect, **conn_kwargs, **props) as conn:
227+
endpoint_members = self.endpoint_info["StaticMembers"]
228+
original_instance_id = rds_utils.query_instance_id(conn)
229+
assert original_instance_id in endpoint_members
230+
231+
new_role = rds_utils.query_host_role(conn, TestEnvironment.get_current().get_engine())
232+
assert new_role == host_role
233+
self.logger.debug("Custom endpoint instance successfully set to role: " + host_role.name)
234+
235+
def test_custom_endpoint_read_write_splitting__with_custom_endpoint_changes__with_reader_as_init_conn(
199236
self, test_driver: TestDriver, conn_utils, props, rds_utils):
237+
'''
238+
Will test for the following scenario:
239+
1. Initially connect to a reader instance via the custom endpoint.
240+
2. Attempt to switch to writer instance - should fail since the custom endpoint only has the reader instance.
241+
3. Modify the custom endpoint to add the writer instance as a static member.
242+
4. Switch to writer instance - should succeed.
243+
5. Switch back to reader instance - should succeed.
244+
6. Modify the custom endpoint to remove the writer instance as a static member.
245+
7. Attempt to switch to writer instance - should fail since the custom endpoint no longer has the writer instance.
246+
'''
200247
target_driver_connect = DriverHelper.get_connect_func(test_driver)
201248
kwargs = conn_utils.get_connect_params()
202249
kwargs["host"] = self.endpoint_info["Endpoint"]
203250
# This setting is not required for the test, but it allows us to also test re-creation of expired monitors since
204251
# it takes more than 30 seconds to modify the cluster endpoint (usually around 140s).
205252
props["custom_endpoint_idle_monitor_expiration_ms"] = 30_000
206253
props["wait_for_custom_endpoint_info_timeout_ms"] = 30_000
207-
conn = AwsWrapperConnection.connect(target_driver_connect, **kwargs, **props)
208254

255+
# Ensure that we are starting with a reader connection
256+
self._setup_custom_endpoint_role(target_driver_connect, kwargs, rds_utils, HostRole.READER)
257+
258+
conn = AwsWrapperConnection.connect(target_driver_connect, **kwargs, **props)
209259
endpoint_members = self.endpoint_info["StaticMembers"]
210-
original_instance_id = rds_utils.query_instance_id(conn)
211-
assert original_instance_id in endpoint_members
260+
original_reader_id = rds_utils.query_instance_id(conn)
261+
assert original_reader_id in endpoint_members
212262

213263
# Attempt to switch to an instance of the opposite role. This should fail since the custom endpoint consists
214264
# only of the current host.
215-
new_read_only_value = original_instance_id == rds_utils.get_cluster_writer_instance_id()
216-
if new_read_only_value:
217-
# We are connected to the writer. Attempting to switch to the reader will not work but will intentionally
218-
# not throw an exception. In this scenario we log a warning and purposefully stick with the writer.
219-
self.logger.debug("Initial connection is to the writer. Attempting to switch to reader...")
220-
conn.read_only = new_read_only_value
221-
new_instance_id = rds_utils.query_instance_id(conn)
222-
assert new_instance_id == original_instance_id
223-
else:
224-
# We are connected to the reader. Attempting to switch to the writer will throw an exception.
225-
self.logger.debug("Initial connection is to a reader. Attempting to switch to writer...")
226-
with pytest.raises(ReadWriteSplittingError):
227-
conn.read_only = new_read_only_value
265+
self.logger.debug("Initial connection is to a reader. Attempting to switch to writer...")
266+
with pytest.raises(ReadWriteSplittingError):
267+
conn.read_only = False
228268

229-
instances = TestEnvironment.get_current().get_instances()
230269
writer_id = rds_utils.get_cluster_writer_instance_id()
231-
if original_instance_id == writer_id:
232-
new_member = instances[1].get_instance_id()
233-
else:
234-
new_member = writer_id
235270

236271
rds_client = client('rds', region_name=TestEnvironment.get_current().get_aurora_region())
237272
rds_client.modify_db_cluster_endpoint(
238273
DBClusterEndpointIdentifier=self.endpoint_id,
239-
StaticMembers=[original_instance_id, new_member]
274+
StaticMembers=[original_reader_id, writer_id]
240275
)
241276

242277
try:
243-
self.wait_until_endpoint_has_members(rds_client, {original_instance_id, new_member})
278+
self.wait_until_endpoint_has_members(rds_client, {original_reader_id, writer_id})
244279

245-
# We should now be able to switch to new_member.
246-
conn.read_only = new_read_only_value
280+
# We should now be able to switch to writer.
281+
conn.read_only = False
247282
new_instance_id = rds_utils.query_instance_id(conn)
248-
assert new_instance_id == new_member
283+
assert new_instance_id == writer_id
249284

250285
# Switch back to original instance
251-
conn.read_only = not new_read_only_value
286+
conn.read_only = True
287+
new_instance_id = rds_utils.query_instance_id(conn)
288+
assert new_instance_id == original_reader_id
252289
finally:
290+
# Remove the writer from the custom endpoint.
253291
rds_client.modify_db_cluster_endpoint(
254292
DBClusterEndpointIdentifier=self.endpoint_id,
255-
StaticMembers=[original_instance_id])
256-
self.wait_until_endpoint_has_members(rds_client, {original_instance_id})
293+
StaticMembers=[original_reader_id])
294+
self.wait_until_endpoint_has_members(rds_client, {original_reader_id})
257295

258296
# We should not be able to switch again because new_member was removed from the custom endpoint.
259-
if new_read_only_value:
260-
# We are connected to the writer. Attempting to switch to the reader will not work but will intentionally
261-
# not throw an exception. In this scenario we log a warning and purposefully stick with the writer.
262-
conn.read_only = new_read_only_value
297+
# We are connected to the reader. Attempting to switch to the writer will throw an exception.
298+
with pytest.raises(ReadWriteSplittingError):
299+
conn.read_only = False
300+
301+
conn.close()
302+
303+
def test_custom_endpoint_read_write_splitting__with_custom_endpoint_changes__with_writer_as_init_conn(
304+
self, test_driver: TestDriver, conn_utils, props, rds_utils):
305+
'''
306+
Will test for the following scenario:
307+
1. Iniitially connect to the writer instance via the custom endpoint.
308+
2. Attempt to switch to reader instance - should succeed, but will still use writer instance as reader.
309+
3. Modify the custom endpoint to add a reader instance as a static member.
310+
4. Switch to reader instance - should succeed.
311+
5. Switch back to writer instance - should succeed.
312+
6. Modify the custom endpoint to remove the reader instance as a static member.
313+
7. Attempt to switch to reader instance - should fail since the custom endpoint no longer has the reader instance.
314+
'''
315+
316+
target_driver_connect = DriverHelper.get_connect_func(test_driver)
317+
kwargs = conn_utils.get_connect_params()
318+
kwargs["host"] = self.endpoint_info["Endpoint"]
319+
# This setting is not required for the test, but it allows us to also test re-creation of expired monitors since
320+
# it takes more than 30 seconds to modify the cluster endpoint (usually around 140s).
321+
props["custom_endpoint_idle_monitor_expiration_ms"] = 30_000
322+
props["wait_for_custom_endpoint_info_timeout_ms"] = 30_000
323+
324+
# Ensure that we are starting with a writer connection
325+
self._setup_custom_endpoint_role(target_driver_connect, kwargs, rds_utils, HostRole.WRITER)
326+
conn = AwsWrapperConnection.connect(target_driver_connect, **kwargs, **props)
327+
328+
endpoint_members = self.endpoint_info["StaticMembers"]
329+
original_writer_id = str(rds_utils.query_instance_id(conn))
330+
assert original_writer_id in endpoint_members
331+
332+
# We are connected to the writer. Attempting to switch to the reader will not work but will intentionally
333+
# not throw an exception. In this scenario we log a warning and purposefully stick with the writer.
334+
self.logger.debug("Initial connection is to the writer. Attempting to switch to reader...")
335+
conn.read_only = True
336+
new_instance_id = rds_utils.query_instance_id(conn)
337+
assert new_instance_id == original_writer_id
338+
339+
instances = TestEnvironment.get_current().get_instances()
340+
writer_id = str(rds_utils.get_cluster_writer_instance_id())
341+
342+
reader_id_to_add = ""
343+
# Get any reader id
344+
for instance in instances:
345+
if instance.get_instance_id() != writer_id:
346+
reader_id_to_add = instance.get_instance_id()
347+
break
348+
349+
rds_client = client('rds', region_name=TestEnvironment.get_current().get_aurora_region())
350+
rds_client.modify_db_cluster_endpoint(
351+
DBClusterEndpointIdentifier=self.endpoint_id,
352+
StaticMembers=[original_writer_id, reader_id_to_add]
353+
)
354+
355+
try:
356+
self.wait_until_endpoint_has_members(rds_client, {original_writer_id, reader_id_to_add})
357+
# We should now be able to switch to new_member.
358+
conn.read_only = True
263359
new_instance_id = rds_utils.query_instance_id(conn)
264-
assert new_instance_id == original_instance_id
265-
else:
266-
# We are connected to the reader. Attempting to switch to the writer will throw an exception.
267-
with pytest.raises(ReadWriteSplittingError):
268-
conn.read_only = new_read_only_value
360+
assert new_instance_id == reader_id_to_add
361+
362+
# Switch back to original instance
363+
conn.read_only = False
364+
finally:
365+
# Remove the reader from the custom endpoint.
366+
rds_client.modify_db_cluster_endpoint(
367+
DBClusterEndpointIdentifier=self.endpoint_id,
368+
StaticMembers=[original_writer_id])
369+
self.wait_until_endpoint_has_members(rds_client, {original_writer_id})
370+
371+
# We should not be able to switch again because new_member was removed from the custom endpoint.
372+
# We are connected to the writer. Attempting to switch to the reader will not work but will intentionally
373+
# not throw an exception. In this scenario we log a warning and fallback to the writer.
374+
conn.read_only = True
375+
new_instance_id = rds_utils.query_instance_id(conn)
376+
assert new_instance_id == original_writer_id
269377

270378
conn.close()

tests/integration/container/utils/rds_test_utility.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
from aws_advanced_python_wrapper.driver_info import DriverInfo
3838
from aws_advanced_python_wrapper.errors import UnsupportedOperationError
39+
from aws_advanced_python_wrapper.hostinfo import HostRole
3940
from aws_advanced_python_wrapper.utils.log import Logger
4041
from aws_advanced_python_wrapper.utils.messages import Messages
4142
from .database_engine import DatabaseEngine
@@ -255,6 +256,25 @@ def query_instance_id(
255256
raise RuntimeError(Messages.get_formatted(
256257
"RdsTestUtility.MethodNotSupportedForDeployment", "query_instance_id", database_deployment))
257258

259+
def query_host_role(
260+
self,
261+
conn,
262+
database_engine: DatabaseEngine) -> HostRole:
263+
if database_engine == DatabaseEngine.MYSQL:
264+
is_reader_query = "SELECT @@innodb_read_only"
265+
elif database_engine == DatabaseEngine.PG:
266+
is_reader_query = "SELECT pg_catalog.pg_is_in_recovery()"
267+
268+
with closing(conn.cursor()) as cursor:
269+
cursor.execute(is_reader_query)
270+
record = cursor.fetchone()
271+
is_reader = record[0]
272+
273+
if is_reader in (1, True):
274+
return HostRole.READER
275+
else:
276+
return HostRole.WRITER
277+
258278
def _query_aurora_instance_id(self, conn: Connection, engine: DatabaseEngine) -> str:
259279
if engine == DatabaseEngine.MYSQL:
260280
sql = "SELECT @@aurora_server_id"

0 commit comments

Comments
 (0)