Skip to content

Commit ca50f66

Browse files
authored
fix: populate opened connection queue with url (#1094)
1 parent 6f195d2 commit ca50f66

5 files changed

Lines changed: 17 additions & 15 deletions

File tree

.github/workflows/integration_tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ jobs:
7171
strategy:
7272
fail-fast: false
7373
matrix:
74-
python-version: [ "3.11", "3.12", "3.13" ]
75-
environment: ["mysql", "pg"]
74+
python-version: [ "3.11", "3.12", "3.13" ]
75+
environment: [ "mysql", "pg" ]
7676

7777
steps:
7878
- name: 'Clone repository'

aws_advanced_python_wrapper/aurora_connection_tracker_plugin.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def populate_opened_connection_set(self, host_info: HostInfo, conn: Connection):
124124
aliases: FrozenSet[str] = host_info.as_aliases()
125125

126126
if self._rds_utils.is_rds_instance(host_info.host):
127-
self._track_connection(host_info.as_alias(), conn)
127+
self._track_connection(host_info.url, conn)
128128
return
129129

130130
instance_endpoint: Optional[str] = next(
@@ -133,6 +133,7 @@ def populate_opened_connection_set(self, host_info: HostInfo, conn: Connection):
133133
logger.debug("OpenedConnectionTracker.UnableToPopulateOpenedConnectionSet")
134134
return
135135

136+
instance_endpoint = instance_endpoint if instance_endpoint.endswith("/") else instance_endpoint + "/"
136137
self._track_connection(instance_endpoint, conn)
137138

138139
def invalidate_all_connections(self, host_info: Optional[HostInfo] = None, host: Optional[FrozenSet[str]] = None):
@@ -144,7 +145,7 @@ def invalidate_all_connections(self, host_info: Optional[HostInfo] = None, host:
144145
"""
145146

146147
if host_info:
147-
self.invalidate_all_connections(host=frozenset([host_info.as_alias()]))
148+
self.invalidate_all_connections(host=frozenset([host_info.url]))
148149
self.invalidate_all_connections(host=host_info.as_aliases())
149150
return
150151

@@ -160,6 +161,8 @@ def invalidate_all_connections(self, host_info: Optional[HostInfo] = None, host:
160161
if not instance_endpoint:
161162
return
162163

164+
instance_endpoint = instance_endpoint if instance_endpoint.endswith("/") else instance_endpoint + "/"
165+
163166
with self._lock:
164167
connection_set: Optional[WeakSet] = self._opened_connections.get(instance_endpoint)
165168
connections_list = list(connection_set) if connection_set is not None else None
@@ -173,14 +176,16 @@ def remove_connection_tracking(self, host_info: HostInfo, connection: Connection
173176
return
174177

175178
if self._rds_utils.is_rds_instance(host_info.host):
176-
host = host_info.as_alias()
179+
host = host_info.url
177180
else:
178181
host = next((alias for alias in host_info.as_aliases()
179182
if self._rds_utils.is_rds_instance(self._rds_utils.remove_port(alias))), "")
180183

181184
if not host:
182185
return
183186

187+
host = host if host.endswith("/") else host + "/"
188+
184189
with self._lock:
185190
connection_set = self._opened_connections.get(host)
186191
if connection_set:

aws_advanced_python_wrapper/failover_v2_plugin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ def connect(
149149
break
150150

151151
conn = None
152-
if (host_with_availability is None or host_with_availability.availability != HostAvailability.UNAVAILABLE):
152+
if host_with_availability is None or host_with_availability.availability != HostAvailability.UNAVAILABLE:
153153
try:
154154
conn = self._stale_dns_helper.get_verified_connection(
155155
is_initial_connection, self._host_list_provider_service,

aws_advanced_python_wrapper/utils/pg_exception_handler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@ class PgExceptionHandler(ExceptionHandler):
2828
_CONNECTION_FAILED = "connection failed"
2929
_CONSUMING_INPUT_FAILED = "consuming input failed"
3030
_CONNECTION_SOCKET_CLOSED = "connection socket closed"
31+
_CONNECTION_CLOSED = "the connection is closed"
3132

3233
_NETWORK_ERROR_MESSAGES: List[str] = [
3334
_CONNECTION_FAILED,
3435
_CONSUMING_INPUT_FAILED,
35-
_CONNECTION_SOCKET_CLOSED
36+
_CONNECTION_SOCKET_CLOSED,
37+
_CONNECTION_CLOSED,
3638
]
3739
_ACCESS_ERROR_MESSAGES: List[str] = [
3840
_PASSWORD_AUTHENTICATION_FAILED_MSG,

aws_advanced_python_wrapper/wrapper.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@
2222

2323
from aws_advanced_python_wrapper.driver_dialect_manager import \
2424
DriverDialectManager
25-
from aws_advanced_python_wrapper.errors import (AwsWrapperError,
26-
FailoverSuccessError)
25+
from aws_advanced_python_wrapper.errors import AwsWrapperError
2726
from aws_advanced_python_wrapper.pep249 import Connection, Cursor, Error
2827
from aws_advanced_python_wrapper.pep249_methods import DbApiMethod
2928
from aws_advanced_python_wrapper.plugin import CanReleaseResources
@@ -285,12 +284,8 @@ def execute(
285284
*args: Any,
286285
**kwargs: Any
287286
) -> AwsWrapperCursor:
288-
try:
289-
return self._plugin_manager.execute(self.target_cursor, DbApiMethod.CURSOR_EXECUTE,
290-
lambda: self.target_cursor.execute(*args, **kwargs), *args, **kwargs)
291-
except FailoverSuccessError as e:
292-
self._target_cursor = self.connection.target_connection.cursor()
293-
raise e
287+
return self._plugin_manager.execute(self.target_cursor, DbApiMethod.CURSOR_EXECUTE,
288+
lambda: self.target_cursor.execute(*args, **kwargs), *args, **kwargs)
294289

295290
def executemany(
296291
self,

0 commit comments

Comments
 (0)