Skip to content

Commit 03888f1

Browse files
authored
refactor: core services (#1220)
1 parent da90f4f commit 03888f1

58 files changed

Lines changed: 1842 additions & 1655 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

aws_advanced_python_wrapper/aws_secrets_manager_plugin.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from __future__ import annotations
1616

17+
from datetime import timedelta
1718
from json import JSONDecodeError, loads
1819
from re import search
1920
from types import SimpleNamespace
@@ -23,7 +24,7 @@
2324

2425
from aws_advanced_python_wrapper.aws_credentials_manager import \
2526
AwsCredentialsManager
26-
from aws_advanced_python_wrapper.utils.cache_map import CacheMap
27+
from aws_advanced_python_wrapper.utils import services_container
2728

2829
if TYPE_CHECKING:
2930
from boto3 import Session
@@ -46,14 +47,20 @@
4647
logger = Logger(__name__)
4748

4849

50+
class Secret:
51+
"""Wrapper type for secrets, used as StorageService type key."""
52+
53+
def __init__(self, value: SimpleNamespace):
54+
self.value = value
55+
56+
4957
class AwsSecretsManagerPlugin(Plugin):
5058
_SUBSCRIBED_METHODS: Set[str] = {DbApiMethod.CONNECT.method_name, DbApiMethod.FORCE_CONNECT.method_name}
5159

5260
_SECRETS_ARN_PATTERN = r"^arn:aws:secretsmanager:(?P<region>[^:\n]*):[^:\n]*:([^:/\n]*[:/])?(.*)$"
5361
_ONE_YEAR_IN_SECONDS = 60 * 60 * 24 * 365
5462

5563
_secret: Optional[SimpleNamespace] = None
56-
_secrets_cache: CacheMap[Tuple, SimpleNamespace] = CacheMap()
5764
_secret_key: Tuple = ()
5865

5966
@property
@@ -63,6 +70,8 @@ def subscribed_methods(self) -> Set[str]:
6370
def __init__(self, plugin_service: PluginService, props: Properties, session: Optional[Session] = None):
6471
self._plugin_service = plugin_service
6572
self._session = session
73+
self._storage_service = services_container.get_storage_service()
74+
self._storage_service.register(Secret, item_expiration_time=timedelta(minutes=30))
6675

6776
secret_id = WrapperProperties.SECRETS_MANAGER_SECRET_ID.get(props)
6877
if not secret_id:
@@ -100,13 +109,13 @@ def force_connect(
100109
return self._connect(host_info, props, force_connect_func)
101110

102111
def _connect(self, host_info: HostInfo, props: Properties, connect_func: Callable) -> Connection:
103-
token_expiration_sec: int = WrapperProperties.SECRETS_MANAGER_EXPIRATION.get_int(props)
112+
token_expiration_sec = WrapperProperties.SECRETS_MANAGER_EXPIRATION.get_int(props)
104113
# if value is less than 0, default to one year
105114
if token_expiration_sec < 0:
106115
token_expiration_sec = AwsSecretsManagerPlugin._ONE_YEAR_IN_SECONDS
107116
token_expiration_ns = token_expiration_sec * 1_000_000_000
108117

109-
secret_fetched: bool = self._update_secret(host_info, props, token_expiration_ns=token_expiration_ns)
118+
secret_fetched: bool = self._update_secret(host_info, props, token_expiration_ns)
110119

111120
try:
112121
self._apply_secret_to_properties(props)
@@ -120,7 +129,7 @@ def _connect(self, host_info: HostInfo, props: Properties, connect_func: Callabl
120129
raise AwsWrapperError(
121130
Messages.get_formatted("AwsSecretsManagerPlugin.ConnectException", e), e) from e
122131

123-
secret_fetched = self._update_secret(host_info, props, token_expiration_ns=token_expiration_ns, force_refetch=True)
132+
secret_fetched = self._update_secret(host_info, props, token_expiration_ns, force_refetch=True)
124133

125134
if secret_fetched:
126135
try:
@@ -146,13 +155,14 @@ def _update_secret(self, host_info: HostInfo, props: Properties, token_expiratio
146155

147156
try:
148157
fetched: bool = False
149-
self._secret: Optional[SimpleNamespace] = AwsSecretsManagerPlugin._secrets_cache.get(self._secret_key)
158+
cached_secret = self._storage_service.get(Secret, self._secret_key)
159+
self._secret = cached_secret.value if cached_secret is not None else None
150160
endpoint = self._secret_key[2]
151161
if not self._secret or force_refetch:
152162
try:
153163
self._secret = self._fetch_latest_credentials(host_info, props)
154164
if self._secret:
155-
AwsSecretsManagerPlugin._secrets_cache.put(self._secret_key, self._secret, token_expiration_ns)
165+
self._storage_service.put(Secret, self._secret_key, Secret(self._secret), item_expiration_ns=token_expiration_ns)
156166
fetched = True
157167
except (ClientError, AttributeError) as e:
158168
logger.debug("AwsSecretsManagerPlugin.FailedToFetchDbCredentials", e)

aws_advanced_python_wrapper/blue_green_plugin.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from __future__ import annotations
1616

1717
import socket
18-
from datetime import datetime
18+
from datetime import datetime, timedelta
1919
from time import perf_counter_ns
2020
from typing import TYPE_CHECKING, FrozenSet, List, cast
2121

@@ -44,9 +44,11 @@
4444
from aws_advanced_python_wrapper.iam_plugin import IamAuthPlugin
4545
from aws_advanced_python_wrapper.pep249_methods import DbApiMethod
4646
from aws_advanced_python_wrapper.plugin import Plugin, PluginFactory
47+
from aws_advanced_python_wrapper.utils import services_container
4748
from aws_advanced_python_wrapper.utils.atomic import AtomicInt
4849
from aws_advanced_python_wrapper.utils.concurrent import (ConcurrentDict,
4950
ConcurrentSet)
51+
from aws_advanced_python_wrapper.utils.events import MonitorResetEvent
5052
from aws_advanced_python_wrapper.utils.log import Logger
5153
from aws_advanced_python_wrapper.utils.messages import Messages
5254
from aws_advanced_python_wrapper.utils.properties import (Properties,
@@ -901,6 +903,9 @@ def _run(self):
901903

902904
finally:
903905
self._close_connection()
906+
if self._host_list_provider is not None:
907+
self._host_list_provider.stop_monitor()
908+
self._host_list_provider = None
904909
logger.debug("BlueGreenStatusMonitor.ThreadCompleted", self._bg_role)
905910

906911
def _open_connection(self):
@@ -1237,6 +1242,8 @@ def __init__(self, plugin_service: PluginService, props: Properties, bg_id: str)
12371242
self._green_dns_removed = False
12381243
self._green_topology_changed = False
12391244
self._all_green_hosts_changed_name = False
1245+
self._monitor_reset_on_in_progress_completed = False
1246+
self._monitor_reset_on_topology_completed = False
12401247
self._post_status_end_time_ns = 0
12411248
self._process_status_lock = RLock()
12421249
self._status_check_intervals_ms: Dict[BlueGreenIntervalRate, int] = {}
@@ -1258,6 +1265,8 @@ def __init__(self, plugin_service: PluginService, props: Properties, bg_id: str)
12581265
Messages.get_formatted(
12591266
"BlueGreenStatusProvider.UnsupportedDialect", self._bg_id, dialect.__class__.__name__))
12601267

1268+
services_container.get_storage_service().register(BlueGreenStatus, item_expiration_time=timedelta(hours=1))
1269+
12611270
current_host_info = self._plugin_service.current_host_info
12621271
blue_monitor = BlueGreenStatusMonitor(
12631272
BlueGreenRole.SOURCE,
@@ -1476,6 +1485,7 @@ def _update_summary_status(self, bg_role: BlueGreenRole, interim_status: BlueGre
14761485
elif self._latest_phase == BlueGreenPhase.IN_PROGRESS:
14771486
self._update_dns_flags(bg_role, interim_status)
14781487
self._summary_status = self._get_status_of_in_progress()
1488+
self._reset_monitors("_monitor_reset_on_in_progress_completed", "- start")
14791489

14801490
elif self._latest_phase == BlueGreenPhase.POST:
14811491
self._update_dns_flags(bg_role, interim_status)
@@ -1503,6 +1513,7 @@ def _update_dns_flags(self, bg_role: BlueGreenRole, interim_status: BlueGreenInt
15031513
logger.debug("BlueGreenStatusProvider.GreenTopologyChanged", self._bg_id)
15041514
self._green_topology_changed = True
15051515
self._store_event_phase_time("Green topology changed")
1516+
self._reset_monitors("_monitor_reset_on_topology_completed", "- green topology")
15061517

15071518
def _store_event_phase_time(self, key_prefix: str, phase: Optional[BlueGreenPhase] = None):
15081519
rollback_str = " (rollback)" if self._rollback else ""
@@ -1846,6 +1857,20 @@ def _update_status_cache(self):
18461857
with latest_status.cv:
18471858
latest_status.cv.notify_all()
18481859

1860+
def _reset_monitors(self, completed_flag_attr: str, event_name: str):
1861+
if getattr(self, completed_flag_attr):
1862+
return
1863+
setattr(self, completed_flag_attr, True)
1864+
1865+
blue_endpoints = frozenset(
1866+
host for host, role in self._roles_by_host.items()
1867+
if role == BlueGreenRole.SOURCE)
1868+
1869+
cluster_id = self._plugin_service.host_list_provider.get_cluster_id()
1870+
services_container.get_event_publisher().publish(
1871+
MonitorResetEvent(cluster_id=cluster_id, endpoints=blue_endpoints))
1872+
self._store_event_phase_time(f"Monitor reset {event_name}")
1873+
18491874
def _log_current_context(self):
18501875
logger.debug(f"[bg_id: '{self._bg_id}'] Summary status: \n{self._summary_status}")
18511876
hosts_str = "\n".join(
@@ -1913,6 +1938,8 @@ def _reset_context_when_completed(self):
19131938
self._green_dns_removed = False
19141939
self._green_topology_changed = False
19151940
self._all_green_hosts_changed_name = False
1941+
self._monitor_reset_on_in_progress_completed = False
1942+
self._monitor_reset_on_topology_completed = False
19161943
self._post_status_end_time_ns = 0
19171944
self._interim_status_hashes = [0, 0]
19181945
self._latest_context_hash = 0

aws_advanced_python_wrapper/cleanup.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,11 @@
1616
OpenedConnectionTracker
1717
from aws_advanced_python_wrapper.aws_credentials_manager import \
1818
AwsCredentialsManager
19-
from aws_advanced_python_wrapper.host_monitoring_plugin import \
20-
MonitoringThreadContainer
21-
from aws_advanced_python_wrapper.thread_pool_container import \
22-
ThreadPoolContainer
23-
from aws_advanced_python_wrapper.utils.sliding_expiration_cache_container import \
24-
SlidingExpirationCacheContainer
19+
from aws_advanced_python_wrapper.utils import services_container
2520

2621

2722
def release_resources() -> None:
2823
"""Release all global resources used by the wrapper."""
29-
MonitoringThreadContainer.clean_up()
30-
ThreadPoolContainer.release_resources()
24+
services_container.release_resources()
3125
AwsCredentialsManager.release_resources()
3226
OpenedConnectionTracker.release_resources()
33-
SlidingExpirationCacheContainer.release_resources()

0 commit comments

Comments
 (0)