Skip to content

Commit 0b1ba85

Browse files
authored
feat: failover v2 plugin (#1079)
1 parent 22a2610 commit 0b1ba85

25 files changed

Lines changed: 2075 additions & 85 deletions

aws_advanced_python_wrapper/blue_green_plugin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1093,7 +1093,7 @@ def _init_host_list_provider(self):
10931093
logger.warning("BlueGreenStatusMonitor.HostInfoNone")
10941094
return
10951095

1096-
host_list_provider_supplier = self._plugin_service.database_dialect.get_host_list_provider_supplier()
1096+
host_list_provider_supplier = self._plugin_service.database_dialect.get_host_list_provider_supplier(self._plugin_service)
10971097
host_list_provider_service: HostListProviderService = cast('HostListProviderService', self._plugin_service)
10981098
self._host_list_provider = host_list_provider_supplier(host_list_provider_service, props_copy)
10991099

aws_advanced_python_wrapper/cluster_topology_monitor.py

Lines changed: 537 additions & 0 deletions
Large diffs are not rendered by default.

aws_advanced_python_wrapper/database_dialect.py

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@
1818
Protocol, Tuple, runtime_checkable)
1919

2020
from aws_advanced_python_wrapper.driver_info import DriverInfo
21+
from aws_advanced_python_wrapper.failover_v2_plugin import FailoverV2Plugin
2122
from aws_advanced_python_wrapper.host_list_provider import (
22-
AuroraTopologyUtils, MultiAzTopologyUtils)
23+
AuroraTopologyUtils, MonitoringRdsHostListProvider, MultiAzTopologyUtils)
2324
from aws_advanced_python_wrapper.utils.rds_url_type import RdsUrlType
2425

2526
if TYPE_CHECKING:
2627
from aws_advanced_python_wrapper.pep249 import Connection
2728
from .driver_dialect import DriverDialect
2829
from .exception_handling import ExceptionHandler
30+
from aws_advanced_python_wrapper.plugin_service import PluginService
2931

3032
from abc import ABC, abstractmethod
3133
from concurrent.futures import TimeoutError
@@ -88,6 +90,7 @@ class TopologyAwareDatabaseDialect(Protocol):
8890
_TOPOLOGY_QUERY: str
8991
_HOST_ID_QUERY: str
9092
_IS_READER_QUERY: str
93+
_WRITER_HOST_QUERY: str
9194

9295
@property
9396
def topology_query(self) -> str:
@@ -101,6 +104,10 @@ def host_id_query(self) -> str:
101104
def is_reader_query(self) -> str:
102105
return self._IS_READER_QUERY
103106

107+
@property
108+
def writer_id_query(self) -> str:
109+
return self._WRITER_HOST_QUERY
110+
104111

105112
@runtime_checkable
106113
class AuroraLimitlessDialect(Protocol):
@@ -147,7 +154,7 @@ def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
147154
...
148155

149156
@abstractmethod
150-
def get_host_list_provider_supplier(self) -> Callable:
157+
def get_host_list_provider_supplier(self, plugin_service: PluginService) -> Callable:
151158
...
152159

153160
@abstractmethod
@@ -213,7 +220,7 @@ def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
213220

214221
return False
215222

216-
def get_host_list_provider_supplier(self) -> Callable:
223+
def get_host_list_provider_supplier(self, plugin_service: PluginService) -> Callable:
217224
return lambda provider_service, props: ConnectionStringHostListProvider(provider_service, props)
218225

219226
def prepare_conn_props(self, props: Properties):
@@ -261,7 +268,7 @@ def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
261268

262269
return False
263270

264-
def get_host_list_provider_supplier(self) -> Callable:
271+
def get_host_list_provider_supplier(self, plugin_service: PluginService) -> Callable:
265272
return lambda provider_service, props: ConnectionStringHostListProvider(provider_service, props)
266273

267274
def prepare_conn_props(self, props: Properties):
@@ -387,6 +394,9 @@ class AuroraMysqlDialect(MysqlDatabaseDialect, TopologyAwareDatabaseDialect, Blu
387394
"OR SESSION_ID = 'MASTER_SESSION_ID' ")
388395
_HOST_ID_QUERY = "SELECT @@aurora_server_id"
389396
_IS_READER_QUERY = "SELECT @@innodb_read_only"
397+
_WRITER_HOST_QUERY = \
398+
("SELECT SERVER_ID FROM information_schema.replica_host_status "
399+
"WHERE SESSION_ID = 'MASTER_SESSION_ID' AND SERVER_ID = @@aurora_server_id")
390400

391401
_BG_STATUS_QUERY = "SELECT version, endpoint, port, role, status FROM mysql.rds_topology"
392402
_BG_STATUS_EXISTS_QUERY = \
@@ -410,7 +420,13 @@ def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
410420

411421
return False
412422

413-
def get_host_list_provider_supplier(self) -> Callable:
423+
def get_host_list_provider_supplier(self, plugin_service: PluginService) -> Callable:
424+
if plugin_service.is_plugin_in_use(FailoverV2Plugin):
425+
return lambda provider_service, props: MonitoringRdsHostListProvider(
426+
provider_service,
427+
props, AuroraTopologyUtils(self, props),
428+
plugin_service)
429+
414430
return lambda provider_service, props: RdsHostListProvider(provider_service, props, AuroraTopologyUtils(self, props))
415431

416432
@property
@@ -449,6 +465,10 @@ class AuroraPgDialect(PgDatabaseDialect, TopologyAwareDatabaseDialect, AuroraLim
449465
_BG_STATUS_QUERY = (f"SELECT version, endpoint, port, role, status "
450466
f"FROM pg_catalog.get_blue_green_fast_switchover_metadata('aws_advanced_python_wrapper-{DriverInfo.DRIVER_VERSION}')")
451467
_BG_STATUS_EXISTS_QUERY = "SELECT 'pg_catalog.get_blue_green_fast_switchover_metadata'::regproc"
468+
_WRITER_HOST_QUERY = \
469+
("SELECT SERVER_ID FROM pg_catalog.aurora_replica_status() "
470+
"WHERE SESSION_ID OPERATOR(pg_catalog.=) 'MASTER_SESSION_ID' "
471+
"AND SERVER_ID OPERATOR(pg_catalog.=) pg_catalog.aurora_db_instance_identifier()")
452472

453473
@property
454474
def dialect_update_candidates(self) -> Optional[Tuple[DialectCode, ...]]:
@@ -483,7 +503,13 @@ def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
483503

484504
return False
485505

486-
def get_host_list_provider_supplier(self) -> Callable:
506+
def get_host_list_provider_supplier(self, plugin_service: PluginService) -> Callable:
507+
if plugin_service.is_plugin_in_use(FailoverV2Plugin):
508+
return lambda provider_service, props: MonitoringRdsHostListProvider(
509+
provider_service,
510+
props,
511+
AuroraTopologyUtils(self, props), plugin_service)
512+
487513
return lambda provider_service, props: RdsHostListProvider(provider_service, props, AuroraTopologyUtils(self, props))
488514

489515
@property
@@ -533,7 +559,12 @@ def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
533559

534560
return False
535561

536-
def get_host_list_provider_supplier(self) -> Callable:
562+
def get_host_list_provider_supplier(self, plugin_service: PluginService) -> Callable:
563+
if plugin_service.is_plugin_in_use(FailoverV2Plugin):
564+
return lambda provider_service, props: MonitoringRdsHostListProvider(
565+
provider_service, props,
566+
MultiAzTopologyUtils(self, props, self._WRITER_HOST_QUERY, self._WRITER_HOST_COLUMN_INDEX), plugin_service)
567+
537568
return lambda provider_service, props: RdsHostListProvider(
538569
provider_service,
539570
props,
@@ -588,7 +619,12 @@ def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
588619

589620
return False
590621

591-
def get_host_list_provider_supplier(self) -> Callable:
622+
def get_host_list_provider_supplier(self, plugin_service: PluginService) -> Callable:
623+
if plugin_service.is_plugin_in_use(FailoverV2Plugin):
624+
return lambda provider_service, props: MonitoringRdsHostListProvider(
625+
provider_service, props,
626+
MultiAzTopologyUtils(self, props, self._WRITER_HOST_QUERY), plugin_service)
627+
592628
return lambda provider_service, props: RdsHostListProvider(
593629
provider_service,
594630
props,
@@ -629,7 +665,7 @@ def exception_handler(self) -> Optional[ExceptionHandler]:
629665
def is_dialect(self, conn: Connection, driver_dialect: DriverDialect) -> bool:
630666
return False
631667

632-
def get_host_list_provider_supplier(self) -> Callable:
668+
def get_host_list_provider_supplier(self, plugin_service: PluginService) -> Callable:
633669
return lambda provider_service, props: ConnectionStringHostListProvider(provider_service, props)
634670

635671
def prepare_conn_props(self, props: Properties):

aws_advanced_python_wrapper/exception_handling.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ def is_login_exception(self, error: Optional[Exception] = None, sql_state: Optio
4141
"""
4242
pass
4343

44+
def is_read_only_connection_exception(self, error: Optional[Exception] = None, sql_state: Optional[str] = None) -> bool:
45+
"""
46+
Checks whether the given error is caused by failing to authenticate the user.
47+
:param error: The error raised by the target driver.
48+
:param sql_state: The SQL State associated with the error.
49+
:return: True if the error is caused by a login issue, False otherwise.
50+
"""
51+
pass
52+
4453

4554
class ExceptionManager:
4655
custom_handler: Optional[ExceptionHandler] = None
@@ -67,6 +76,13 @@ def is_login_exception(self, dialect: Optional[DatabaseDialect], error: Optional
6776
return handler.is_login_exception(error=error, sql_state=sql_state)
6877
return False
6978

79+
def is_read_only_connection_exception(self, dialect: Optional[DatabaseDialect], error: Optional[Exception] = None,
80+
sql_state: Optional[str] = None) -> bool:
81+
handler = self._get_handler(dialect)
82+
if handler is not None:
83+
return handler.is_read_only_connection_exception(error=error, sql_state=sql_state)
84+
return False
85+
7086
def _get_handler(self, dialect: Optional[DatabaseDialect]) -> Optional[ExceptionHandler]:
7187
if dialect is None:
7288
return None

aws_advanced_python_wrapper/failover_plugin.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -377,11 +377,10 @@ def _invalidate_current_connection(self):
377377
except Exception:
378378
pass
379379

380-
if not driver_dialect.is_closed(conn):
381-
try:
382-
return driver_dialect.execute(DbApiMethod.CONNECTION_CLOSE.method_name, lambda: conn.close())
383-
except Exception:
384-
pass
380+
try:
381+
return driver_dialect.execute(DbApiMethod.CONNECTION_CLOSE.method_name, lambda: conn.close())
382+
except Exception:
383+
pass
385384

386385
return None
387386

0 commit comments

Comments
 (0)