|
20 | 20 | from concurrent.futures import ThreadPoolExecutor |
21 | 21 | from typing import TYPE_CHECKING, Dict, Optional |
22 | 22 |
|
| 23 | +from aws_advanced_python_wrapper.errors import AwsWrapperError |
23 | 24 | from aws_advanced_python_wrapper.host_availability import HostAvailability |
24 | 25 | from aws_advanced_python_wrapper.hostinfo import HostInfo |
25 | 26 | from aws_advanced_python_wrapper.utils.atomic import AtomicReference |
26 | 27 | from aws_advanced_python_wrapper.utils.messages import Messages |
27 | | -from aws_advanced_python_wrapper.utils.rdsutils import RdsUtils |
| 28 | +from aws_advanced_python_wrapper.utils.rds_utils import RdsUtils |
28 | 29 | from aws_advanced_python_wrapper.utils.storage.storage_service import ( |
29 | 30 | StorageService, Topology) |
30 | 31 | from aws_advanced_python_wrapper.utils.thread_safe_connection_holder import \ |
|
35 | 36 | from aws_advanced_python_wrapper.pep249 import Connection |
36 | 37 | from aws_advanced_python_wrapper.plugin_service import PluginService |
37 | 38 | from aws_advanced_python_wrapper.utils.properties import Properties |
38 | | - from aws_advanced_python_wrapper.host_list_provider import TopologyUtils |
| 39 | + from aws_advanced_python_wrapper.host_list_provider import TopologyUtils, GlobalAuroraTopologyUtils |
39 | 40 |
|
40 | 41 | from aws_advanced_python_wrapper.hostinfo import HostRole |
41 | 42 | from aws_advanced_python_wrapper.utils.log import Logger |
@@ -316,9 +317,10 @@ def _open_any_connection_and_update_topology(self) -> Topology: |
316 | 317 | writer_host_info = self._initial_host_info |
317 | 318 | self._writer_host_info.set(writer_host_info) |
318 | 319 | else: |
319 | | - writer_host = self._instance_template.host.replace("?", writer_id) |
320 | | - port = self._instance_template.port \ |
321 | | - if self._instance_template.is_port_specified() \ |
| 320 | + instance_template = self._get_instance_template(writer_id, conn) |
| 321 | + writer_host = instance_template.host.replace("?", writer_id) |
| 322 | + port = instance_template.port \ |
| 323 | + if instance_template.is_port_specified() \ |
322 | 324 | else self._initial_host_info.port |
323 | 325 | writer_host_info = HostInfo( |
324 | 326 | writer_host, |
@@ -438,6 +440,9 @@ def _query_for_topology(self, connection: Connection) -> Topology: |
438 | 440 | return hosts |
439 | 441 | return () |
440 | 442 |
|
| 443 | + def _get_instance_template(self, instance_id: str, connection: Connection) -> HostInfo: |
| 444 | + return self._instance_template |
| 445 | + |
441 | 446 | def _update_topology_cache(self, hosts: Topology) -> None: |
442 | 447 | StorageService.set(self._cluster_id, hosts, Topology) |
443 | 448 | # Notify waiting threads |
@@ -499,8 +504,8 @@ def __call__(self) -> None: |
499 | 504 |
|
500 | 505 | if is_writer: |
501 | 506 | try: |
502 | | - if self._monitor._topology_utils.get_host_role( |
503 | | - connection, self._monitor._plugin_service.driver_dialect) != HostRole.WRITER: |
| 507 | + if self._monitor._plugin_service.get_host_role( |
| 508 | + connection) != HostRole.WRITER: |
504 | 509 | is_writer = False |
505 | 510 | except Exception as ex: |
506 | 511 | logger.debug("HostMonitor.InvalidWriterQuery", ex) |
@@ -565,3 +570,45 @@ def _calculate_backoff_with_jitter(self, attempt: int) -> int: |
565 | 570 | backoff = ClusterTopologyMonitorImpl.INITIAL_BACKOFF_MS * (2 ** min(attempt, 6)) |
566 | 571 | backoff = min(backoff, ClusterTopologyMonitorImpl.MAX_BACKOFF_MS) |
567 | 572 | return int(backoff * (0.5 + random.random() * 0.5)) |
| 573 | + |
| 574 | + |
| 575 | +class GlobalAuroraTopologyMonitor(ClusterTopologyMonitorImpl): |
| 576 | + def __init__( |
| 577 | + self, |
| 578 | + plugin_service: PluginService, |
| 579 | + topology_utils: GlobalAuroraTopologyUtils, |
| 580 | + cluster_id: str, |
| 581 | + initial_host_info: HostInfo, |
| 582 | + props: Properties, |
| 583 | + instance_template: HostInfo, |
| 584 | + refresh_rate_ns: int, |
| 585 | + high_refresh_rate_ns: int, |
| 586 | + instance_templates_by_region: dict[str, HostInfo] |
| 587 | + ): |
| 588 | + super().__init__( |
| 589 | + plugin_service, |
| 590 | + topology_utils, |
| 591 | + cluster_id, |
| 592 | + initial_host_info, |
| 593 | + props, |
| 594 | + instance_template, |
| 595 | + refresh_rate_ns, |
| 596 | + high_refresh_rate_ns |
| 597 | + ) |
| 598 | + self._instance_templates_by_region = instance_templates_by_region |
| 599 | + self._global_topology_utils = topology_utils |
| 600 | + |
| 601 | + def _get_instance_template(self, instance_id: str, connection: Connection) -> HostInfo: |
| 602 | + region = self._global_topology_utils.get_region(instance_id, connection) |
| 603 | + if region: |
| 604 | + instance_template = self._instance_templates_by_region.get(region) |
| 605 | + if instance_template is None: |
| 606 | + raise AwsWrapperError( |
| 607 | + Messages.get_formatted("GlobalAuroraTopologyMonitor.cannotFindRegionTemplate", region)) |
| 608 | + return instance_template |
| 609 | + return self._instance_template |
| 610 | + |
| 611 | + def _query_for_topology(self, connection: Connection) -> Topology: |
| 612 | + result = self._global_topology_utils.query_for_topology_with_regions( |
| 613 | + connection, self._instance_templates_by_region) |
| 614 | + return result if result is not None else () |
0 commit comments