Skip to content

Commit ba41d95

Browse files
vdusekclaude
andcommitted
fix: prevent StreamedLog stop() from dropping tail or hanging on silent stream
The async variant lost the final buffered log line because the trailing `_log_buffer_content(include_last_part=True)` was skipped when `stop()` cancelled the task; wrapping the loop in try/finally ensures the tail is always flushed. The sync variant could hang in `iter_bytes()` on a silent stream — pass a 30s read timeout (overridable via the new `_read_timeout` class attribute) so `stop()` can unblock within bounded time. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 3e9fc3d commit ba41d95

2 files changed

Lines changed: 167 additions & 17 deletions

File tree

src/apify_client/_streamed_log.py

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
import re
66
import threading
77
from asyncio import Task
8-
from datetime import UTC, datetime
8+
from datetime import UTC, datetime, timedelta
99
from threading import Thread
10-
from typing import TYPE_CHECKING, Self, cast
10+
from typing import TYPE_CHECKING, ClassVar, Self, cast
1111

1212
from apify_client._docs import docs_group
1313

@@ -90,6 +90,10 @@ class StreamedLog(StreamedLogBase):
9090
call `start` and `stop` manually. Obtain an instance via `RunClient.get_streamed_log`.
9191
"""
9292

93+
# Caps how long `iter_bytes()` can block on a silent stream so `stop()` can unblock within
94+
# this window instead of waiting for the long-polling default.
95+
_read_timeout: ClassVar[timedelta] = timedelta(seconds=30)
96+
9397
def __init__(self, log_client: LogClient, *, to_logger: logging.Logger, from_start: bool = True) -> None:
9498
"""Initialize `StreamedLog`.
9599
@@ -138,17 +142,17 @@ def __exit__(
138142
self.stop()
139143

140144
def _stream_log(self) -> None:
141-
with self._log_client.stream(raw=True) as log_stream:
145+
with self._log_client.stream(raw=True, timeout=self._read_timeout) as log_stream:
142146
if not log_stream:
143147
return
144-
for data in log_stream.iter_bytes():
145-
self._process_new_data(data)
146-
if self._stop_logging:
147-
break
148-
149-
# If the stream is finished, then the last part will be also processed.
150-
self._log_buffer_content(include_last_part=True)
151-
return
148+
try:
149+
for data in log_stream.iter_bytes():
150+
self._process_new_data(data)
151+
if self._stop_logging:
152+
break
153+
finally:
154+
# Flush the last buffered part even if the read timed out or was stopped.
155+
self._log_buffer_content(include_last_part=True)
152156

153157

154158
@docs_group('Other')
@@ -214,8 +218,9 @@ async def _stream_log(self) -> None:
214218
async with self._log_client.stream(raw=True) as log_stream:
215219
if not log_stream:
216220
return
217-
async for data in log_stream.aiter_bytes():
218-
self._process_new_data(data)
219-
220-
# If the stream is finished, then the last part will be also processed.
221-
self._log_buffer_content(include_last_part=True)
221+
try:
222+
async for data in log_stream.aiter_bytes():
223+
self._process_new_data(data)
224+
finally:
225+
# Flush the last buffered part even if the task is cancelled by `stop()`.
226+
self._log_buffer_content(include_last_part=True)

tests/unit/test_logging.py

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import json
55
import logging
6+
import threading
67
import time
78
from datetime import datetime, timedelta
89
from typing import TYPE_CHECKING
@@ -15,7 +16,7 @@
1516
from apify_client._logging import RedirectLogFormatter
1617
from apify_client._models import ActorJobStatus
1718
from apify_client._status_message_watcher import StatusMessageWatcherBase
18-
from apify_client._streamed_log import StreamedLogBase
19+
from apify_client._streamed_log import StreamedLog, StreamedLogBase
1920

2021
if TYPE_CHECKING:
2122
from collections.abc import Iterator
@@ -652,3 +653,147 @@ async def test_status_message_watcher_async_restart_after_normal_completion(http
652653
assert task2 is not task # New task created
653654
await task2 # Let it complete (will hit terminal status again)
654655
assert task2.done()
656+
657+
658+
_TAIL_FIRST_MESSAGE = '2025-05-13T07:24:12.588Z tail_test first complete line'
659+
_TAIL_SECOND_MESSAGE = '2025-05-13T07:24:13.132Z tail_test trailing partial line'
660+
661+
662+
def _register_run_and_actor_endpoints(httpserver: HTTPServer) -> None:
663+
"""Register the minimal run and actor endpoints required by `get_streamed_log`."""
664+
httpserver.expect_request(f'/v2/actor-runs/{_MOCKED_RUN_ID}', method='GET').respond_with_json(
665+
{
666+
'data': {
667+
'id': _MOCKED_RUN_ID,
668+
'actId': _MOCKED_ACTOR_ID,
669+
'userId': 'test_user_id',
670+
'startedAt': '2019-11-30T07:34:24.202Z',
671+
'finishedAt': '2019-12-12T09:30:12.202Z',
672+
'status': 'RUNNING',
673+
'statusMessage': 'Running',
674+
'isStatusMessageTerminal': False,
675+
'meta': {'origin': 'WEB'},
676+
'stats': {'restartCount': 0, 'resurrectCount': 0, 'computeUnits': 0.1},
677+
'options': {'build': 'latest', 'timeoutSecs': 300, 'memoryMbytes': 1024, 'diskMbytes': 2048},
678+
'buildId': 'test_build_id',
679+
'generalAccess': 'RESTRICTED',
680+
'defaultKeyValueStoreId': 'test_kvs_id',
681+
'defaultDatasetId': 'test_dataset_id',
682+
'defaultRequestQueueId': 'test_rq_id',
683+
'buildNumber': '0.0.1',
684+
'containerUrl': 'https://test.runs.apify.net',
685+
}
686+
}
687+
)
688+
httpserver.expect_request(f'/v2/acts/{_MOCKED_ACTOR_ID}', method='GET').respond_with_json(
689+
{
690+
'data': {
691+
'id': _MOCKED_ACTOR_ID,
692+
'userId': 'test_user_id',
693+
'name': _MOCKED_ACTOR_NAME,
694+
'username': 'test_user',
695+
'isPublic': False,
696+
'createdAt': '2019-07-08T11:27:57.401Z',
697+
'modifiedAt': '2019-07-08T14:01:05.546Z',
698+
'stats': {
699+
'totalBuilds': 0,
700+
'totalRuns': 0,
701+
'totalUsers': 0,
702+
'totalUsers7Days': 0,
703+
'totalUsers30Days': 0,
704+
'totalUsers90Days': 0,
705+
'totalMetamorphs': 0,
706+
'lastRunStartedAt': '2019-07-08T14:01:05.546Z',
707+
},
708+
'versions': [],
709+
'defaultRunOptions': {'build': 'latest', 'timeoutSecs': 3600, 'memoryMbytes': 2048},
710+
'deploymentKey': 'test_key',
711+
}
712+
}
713+
)
714+
715+
716+
@pytest.mark.usefixtures('propagate_stream_logs')
717+
async def test_streamed_log_async_stop_flushes_buffered_tail(
718+
caplog: LogCaptureFixture,
719+
httpserver: HTTPServer,
720+
) -> None:
721+
"""Verify the buffered tail is flushed to the logger when the async task is cancelled by `stop`."""
722+
stop_emitting = threading.Event()
723+
724+
def _tail_handler(_request: Request) -> Response:
725+
def generate_logs() -> Iterator[bytes]:
726+
yield f'{_TAIL_FIRST_MESSAGE}\n'.encode()
727+
# Second marker has no trailing newline/next-marker, so it stays in the buffer.
728+
yield _TAIL_SECOND_MESSAGE.encode()
729+
# Block until the test tears the server down (or stop releases us).
730+
stop_emitting.wait(timeout=30)
731+
732+
return Response(response=generate_logs(), status=200, mimetype='application/octet-stream')
733+
734+
httpserver.expect_request(
735+
f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true'
736+
).respond_with_handler(_tail_handler)
737+
_register_run_and_actor_endpoints(httpserver)
738+
739+
api_url = httpserver.url_for('/').removesuffix('/')
740+
run_client = ApifyClientAsync(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID)
741+
streamed_log = await run_client.get_streamed_log()
742+
743+
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
744+
745+
try:
746+
with caplog.at_level(logging.DEBUG, logger=logger_name):
747+
async with streamed_log:
748+
# Wait long enough for both chunks to arrive and be processed.
749+
await asyncio.sleep(1)
750+
# Context exit calls stop(), which cancels the task mid-stream.
751+
finally:
752+
stop_emitting.set()
753+
754+
messages = [record.message for record in caplog.records]
755+
assert any(_TAIL_FIRST_MESSAGE in m for m in messages), f'First message missing. Got: {messages}'
756+
assert any(_TAIL_SECOND_MESSAGE in m for m in messages), f'Buffered tail dropped on async stop(). Got: {messages}'
757+
758+
759+
@pytest.mark.usefixtures('propagate_stream_logs')
760+
def test_streamed_log_sync_stop_does_not_hang_on_silent_stream(
761+
httpserver: HTTPServer,
762+
monkeypatch: pytest.MonkeyPatch,
763+
) -> None:
764+
"""Verify `stop()` returns promptly even when the underlying stream is silent (no chunks)."""
765+
# Shorten the read timeout so the test doesn't wait for the production default.
766+
monkeypatch.setattr(StreamedLog, '_read_timeout', timedelta(seconds=1))
767+
768+
release_server = threading.Event()
769+
770+
def _silent_handler(_request: Request) -> Response:
771+
def generate_logs() -> Iterator[bytes]:
772+
# Yield an empty chunk so werkzeug flushes headers and the client sees a streaming
773+
# response; then block without emitting any log data.
774+
yield b''
775+
release_server.wait(timeout=30)
776+
777+
return Response(response=generate_logs(), status=200, mimetype='application/octet-stream')
778+
779+
httpserver.expect_request(
780+
f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true'
781+
).respond_with_handler(_silent_handler)
782+
_register_run_and_actor_endpoints(httpserver)
783+
784+
api_url = httpserver.url_for('/').removesuffix('/')
785+
run_client = ApifyClient(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID)
786+
streamed_log = run_client.get_streamed_log()
787+
788+
streamed_log.start()
789+
try:
790+
# Give the streaming thread time to start and block inside iter_bytes.
791+
time.sleep(0.3)
792+
793+
# Call stop() from a helper thread so the test cannot hang indefinitely if the fix regresses.
794+
stop_thread = threading.Thread(target=streamed_log.stop)
795+
stop_thread.start()
796+
stop_thread.join(timeout=5)
797+
assert not stop_thread.is_alive(), 'stop() hangs when the underlying stream is silent'
798+
finally:
799+
release_server.set()

0 commit comments

Comments
 (0)