Skip to content

Commit 5b6a9dd

Browse files
vdusekclaude
andcommitted
fix: prevent StreamedLog stop() from dropping tail or hanging on silent stream
The async variant lost the final buffered log line because the trailing `_log_buffer_content(include_last_part=True)` was skipped when `stop()` cancelled the task; wrapping the loop in try/finally ensures the tail is always flushed. The sync variant could hang in `iter_bytes()` on a silent stream — pass a 30s read timeout (overridable via the new `_read_timeout` class attribute) so `stop()` can unblock within bounded time. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 656aa03 commit 5b6a9dd

2 files changed

Lines changed: 167 additions & 17 deletions

File tree

src/apify_client/_streamed_log.py

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
import re
66
import threading
77
from asyncio import Task
8-
from datetime import UTC, datetime
8+
from datetime import UTC, datetime, timedelta
99
from threading import Thread
10-
from typing import TYPE_CHECKING, Self, cast
10+
from typing import TYPE_CHECKING, ClassVar, Self, cast
1111

1212
from apify_client._docs import docs_group
1313

@@ -90,6 +90,10 @@ class StreamedLog(StreamedLogBase):
9090
call `start` and `stop` manually. Obtain an instance via `RunClient.get_streamed_log`.
9191
"""
9292

93+
# Caps how long `iter_bytes()` can block on a silent stream so `stop()` can unblock within
94+
# this window instead of waiting for the long-polling default.
95+
_read_timeout: ClassVar[timedelta] = timedelta(seconds=30)
96+
9397
def __init__(self, log_client: LogClient, *, to_logger: logging.Logger, from_start: bool = True) -> None:
9498
"""Initialize `StreamedLog`.
9599
@@ -138,17 +142,17 @@ def __exit__(
138142
self.stop()
139143

140144
def _stream_log(self) -> None:
141-
with self._log_client.stream(raw=True) as log_stream:
145+
with self._log_client.stream(raw=True, timeout=self._read_timeout) as log_stream:
142146
if not log_stream:
143147
return
144-
for data in log_stream.iter_bytes():
145-
self._process_new_data(data)
146-
if self._stop_logging:
147-
break
148-
149-
# If the stream is finished, then the last part will be also processed.
150-
self._log_buffer_content(include_last_part=True)
151-
return
148+
try:
149+
for data in log_stream.iter_bytes():
150+
self._process_new_data(data)
151+
if self._stop_logging:
152+
break
153+
finally:
154+
# Flush the last buffered part even if the read timed out or was stopped.
155+
self._log_buffer_content(include_last_part=True)
152156

153157

154158
@docs_group('Other')
@@ -214,8 +218,9 @@ async def _stream_log(self) -> None:
214218
async with self._log_client.stream(raw=True) as log_stream:
215219
if not log_stream:
216220
return
217-
async for data in log_stream.aiter_bytes():
218-
self._process_new_data(data)
219-
220-
# If the stream is finished, then the last part will be also processed.
221-
self._log_buffer_content(include_last_part=True)
221+
try:
222+
async for data in log_stream.aiter_bytes():
223+
self._process_new_data(data)
224+
finally:
225+
# Flush the last buffered part even if the task is cancelled by `stop()`.
226+
self._log_buffer_content(include_last_part=True)

tests/unit/test_logging.py

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import json
55
import logging
6+
import threading
67
import time
78
from datetime import datetime, timedelta
89
from typing import TYPE_CHECKING
@@ -15,7 +16,7 @@
1516
from apify_client._logging import RedirectLogFormatter
1617
from apify_client._models import ActorJobStatus
1718
from apify_client._status_message_watcher import StatusMessageWatcherBase
18-
from apify_client._streamed_log import StreamedLogBase
19+
from apify_client._streamed_log import StreamedLog, StreamedLogBase
1920

2021
if TYPE_CHECKING:
2122
from collections.abc import Iterator
@@ -717,3 +718,147 @@ async def test_async_watcher_aexit_skips_final_sleep_on_exception(
717718
elapsed = time.monotonic() - start
718719

719720
assert elapsed < _FAST_EXIT_THRESHOLD_S, f'__aexit__ should skip final sleep on exception, took {elapsed:.2f}s'
721+
722+
723+
_TAIL_FIRST_MESSAGE = '2025-05-13T07:24:12.588Z tail_test first complete line'
724+
_TAIL_SECOND_MESSAGE = '2025-05-13T07:24:13.132Z tail_test trailing partial line'
725+
726+
727+
def _register_run_and_actor_endpoints(httpserver: HTTPServer) -> None:
728+
"""Register the minimal run and actor endpoints required by `get_streamed_log`."""
729+
httpserver.expect_request(f'/v2/actor-runs/{_MOCKED_RUN_ID}', method='GET').respond_with_json(
730+
{
731+
'data': {
732+
'id': _MOCKED_RUN_ID,
733+
'actId': _MOCKED_ACTOR_ID,
734+
'userId': 'test_user_id',
735+
'startedAt': '2019-11-30T07:34:24.202Z',
736+
'finishedAt': '2019-12-12T09:30:12.202Z',
737+
'status': 'RUNNING',
738+
'statusMessage': 'Running',
739+
'isStatusMessageTerminal': False,
740+
'meta': {'origin': 'WEB'},
741+
'stats': {'restartCount': 0, 'resurrectCount': 0, 'computeUnits': 0.1},
742+
'options': {'build': 'latest', 'timeoutSecs': 300, 'memoryMbytes': 1024, 'diskMbytes': 2048},
743+
'buildId': 'test_build_id',
744+
'generalAccess': 'RESTRICTED',
745+
'defaultKeyValueStoreId': 'test_kvs_id',
746+
'defaultDatasetId': 'test_dataset_id',
747+
'defaultRequestQueueId': 'test_rq_id',
748+
'buildNumber': '0.0.1',
749+
'containerUrl': 'https://test.runs.apify.net',
750+
}
751+
}
752+
)
753+
httpserver.expect_request(f'/v2/acts/{_MOCKED_ACTOR_ID}', method='GET').respond_with_json(
754+
{
755+
'data': {
756+
'id': _MOCKED_ACTOR_ID,
757+
'userId': 'test_user_id',
758+
'name': _MOCKED_ACTOR_NAME,
759+
'username': 'test_user',
760+
'isPublic': False,
761+
'createdAt': '2019-07-08T11:27:57.401Z',
762+
'modifiedAt': '2019-07-08T14:01:05.546Z',
763+
'stats': {
764+
'totalBuilds': 0,
765+
'totalRuns': 0,
766+
'totalUsers': 0,
767+
'totalUsers7Days': 0,
768+
'totalUsers30Days': 0,
769+
'totalUsers90Days': 0,
770+
'totalMetamorphs': 0,
771+
'lastRunStartedAt': '2019-07-08T14:01:05.546Z',
772+
},
773+
'versions': [],
774+
'defaultRunOptions': {'build': 'latest', 'timeoutSecs': 3600, 'memoryMbytes': 2048},
775+
'deploymentKey': 'test_key',
776+
}
777+
}
778+
)
779+
780+
781+
@pytest.mark.usefixtures('propagate_stream_logs')
782+
async def test_streamed_log_async_stop_flushes_buffered_tail(
783+
caplog: LogCaptureFixture,
784+
httpserver: HTTPServer,
785+
) -> None:
786+
"""Verify the buffered tail is flushed to the logger when the async task is cancelled by `stop`."""
787+
stop_emitting = threading.Event()
788+
789+
def _tail_handler(_request: Request) -> Response:
790+
def generate_logs() -> Iterator[bytes]:
791+
yield f'{_TAIL_FIRST_MESSAGE}\n'.encode()
792+
# Second marker has no trailing newline/next-marker, so it stays in the buffer.
793+
yield _TAIL_SECOND_MESSAGE.encode()
794+
# Block until the test tears the server down (or stop releases us).
795+
stop_emitting.wait(timeout=30)
796+
797+
return Response(response=generate_logs(), status=200, mimetype='application/octet-stream')
798+
799+
httpserver.expect_request(
800+
f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true'
801+
).respond_with_handler(_tail_handler)
802+
_register_run_and_actor_endpoints(httpserver)
803+
804+
api_url = httpserver.url_for('/').removesuffix('/')
805+
run_client = ApifyClientAsync(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID)
806+
streamed_log = await run_client.get_streamed_log()
807+
808+
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
809+
810+
try:
811+
with caplog.at_level(logging.DEBUG, logger=logger_name):
812+
async with streamed_log:
813+
# Wait long enough for both chunks to arrive and be processed.
814+
await asyncio.sleep(1)
815+
# Context exit calls stop(), which cancels the task mid-stream.
816+
finally:
817+
stop_emitting.set()
818+
819+
messages = [record.message for record in caplog.records]
820+
assert any(_TAIL_FIRST_MESSAGE in m for m in messages), f'First message missing. Got: {messages}'
821+
assert any(_TAIL_SECOND_MESSAGE in m for m in messages), f'Buffered tail dropped on async stop(). Got: {messages}'
822+
823+
824+
@pytest.mark.usefixtures('propagate_stream_logs')
825+
def test_streamed_log_sync_stop_does_not_hang_on_silent_stream(
826+
httpserver: HTTPServer,
827+
monkeypatch: pytest.MonkeyPatch,
828+
) -> None:
829+
"""Verify `stop()` returns promptly even when the underlying stream is silent (no chunks)."""
830+
# Shorten the read timeout so the test doesn't wait for the production default.
831+
monkeypatch.setattr(StreamedLog, '_read_timeout', timedelta(seconds=1))
832+
833+
release_server = threading.Event()
834+
835+
def _silent_handler(_request: Request) -> Response:
836+
def generate_logs() -> Iterator[bytes]:
837+
# Yield an empty chunk so werkzeug flushes headers and the client sees a streaming
838+
# response; then block without emitting any log data.
839+
yield b''
840+
release_server.wait(timeout=30)
841+
842+
return Response(response=generate_logs(), status=200, mimetype='application/octet-stream')
843+
844+
httpserver.expect_request(
845+
f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true'
846+
).respond_with_handler(_silent_handler)
847+
_register_run_and_actor_endpoints(httpserver)
848+
849+
api_url = httpserver.url_for('/').removesuffix('/')
850+
run_client = ApifyClient(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID)
851+
streamed_log = run_client.get_streamed_log()
852+
853+
streamed_log.start()
854+
try:
855+
# Give the streaming thread time to start and block inside iter_bytes.
856+
time.sleep(0.3)
857+
858+
# Call stop() from a helper thread so the test cannot hang indefinitely if the fix regresses.
859+
stop_thread = threading.Thread(target=streamed_log.stop)
860+
stop_thread.start()
861+
stop_thread.join(timeout=5)
862+
assert not stop_thread.is_alive(), 'stop() hangs when the underlying stream is silent'
863+
finally:
864+
release_server.set()

0 commit comments

Comments
 (0)