Skip to content

Commit fec16e9

Browse files
committed
Make list methods of clients iterable
1 parent ff9817c commit fec16e9

39 files changed

Lines changed: 2713 additions & 653 deletions

docs/02_concepts/08_pagination.mdx

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ import ApiLink from '@site/src/components/ApiLink';
1212

1313
import PaginationAsyncExample from '!!raw-loader!./code/08_pagination_async.py';
1414
import PaginationSyncExample from '!!raw-loader!./code/08_pagination_sync.py';
15-
1615
import IterateItemsAsyncExample from '!!raw-loader!./code/08_iterate_items_async.py';
1716
import IterateItemsSyncExample from '!!raw-loader!./code/08_iterate_items_sync.py';
1817

18+
1919
Most methods named `list` or `list_something` in the Apify client return a <ApiLink to="class/ListPage">`ListPage`</ApiLink> object. This object provides a consistent interface for working with paginated data and includes the following properties:
2020

2121
- `items` - The main results you're looking for.
@@ -45,7 +45,7 @@ The <ApiLink to="class/ListPage">`ListPage`</ApiLink> interface offers several k
4545

4646
## Generator-based iteration
4747

48-
For most use cases, `iterate_items()` is the recommended way to process all items in a dataset. It handles pagination automatically using a Python generator, fetching items in batches behind the scenes so you don't need to manage offsets or limits yourself.
48+
You can also use the `list` methods directly in iteration. It handles pagination automatically, fetching items in batches behind the scenes so you don't need to manage offsets or limits yourself.
4949

5050
<Tabs>
5151
<TabItem value="AsyncExample" label="Async client" default>
@@ -60,6 +60,4 @@ For most use cases, `iterate_items()` is the recommended way to process all item
6060
</TabItem>
6161
</Tabs>
6262

63-
`iterate_items()` accepts the same filtering parameters as `list_items()` (`clean`, `fields`, `omit`, `unwind`, `skip_empty`, `skip_hidden`), so you can combine automatic pagination with data filtering.
64-
65-
Similarly, `KeyValueStoreClient` provides an `iterate_keys()` method for iterating over all keys in a key-value store without manual pagination.
63+
Similarly, you can iterate over the return value of `KeyValueStoreClient.list_keys()` to go through all keys in a key-value store without manual pagination. The older `iterate_keys()` method is deprecated.

docs/02_concepts/code/08_iterate_items_async.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ async def main() -> None:
77
apify_client = ApifyClientAsync(TOKEN)
88
dataset_client = apify_client.dataset('dataset-id')
99

10-
# Iterate through all items automatically.
11-
async for item in dataset_client.iterate_items():
12-
print(item)
10+
# Define the pagination parameters
11+
limit = 1500 # Number of items in total
12+
offset = 100 # Starting offset
13+
14+
# Iterate through items automatically, lazily sending as many API calls
15+
# as needed and receiving items in chunks.
16+
async for item in dataset_client.list_items(limit=limit, offset=offset):
17+
print(item) # Process the item as needed

docs/02_concepts/code/08_iterate_items_sync.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,14 @@ def main() -> None:
77
apify_client = ApifyClient(TOKEN)
88
dataset_client = apify_client.dataset('dataset-id')
99

10-
# Iterate through all items automatically.
11-
for item in dataset_client.iterate_items():
12-
print(item)
10+
# Define the pagination parameters
11+
limit = 1500 # Number of items in total
12+
offset = 100 # Starting offset
13+
14+
# Iterate through items automatically, lazily sending as many API calls
15+
# as needed and receiving items in chunks.
16+
for item in dataset_client.list_items(limit=limit, offset=offset):
17+
print(item) # Process the item as needed
1318

1419

1520
if __name__ == '__main__':

docs/02_concepts/code/08_pagination_async.py

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,15 @@ async def main() -> None:
1010
dataset_client = apify_client.dataset('dataset-id')
1111

1212
# Define the pagination parameters
13-
limit = 1000 # Number of items per page
13+
limit = 1000 # Number items to request from API
1414
offset = 0 # Starting offset
15-
all_items = [] # List to store all fetched items
1615

17-
while True:
18-
# Fetch a page of items
19-
response = await dataset_client.list_items(limit=limit, offset=offset)
20-
items = response.items
21-
total = response.total
16+
# Send single API call to fetch paginated items.
17+
# (number of items per single call can be limited by API)
18+
paginated_items = await dataset_client.list_items(limit=limit, offset=offset)
2219

23-
print(f'Fetched {len(items)} items')
20+
# Inspect pagination metadata returned by API
21+
print(paginated_items.total)
2422

25-
# Add the fetched items to the complete list
26-
all_items.extend(items)
27-
28-
# Exit the loop if there are no more items to fetch
29-
if offset + limit >= total:
30-
break
31-
32-
# Increment the offset for the next page
33-
offset += limit
34-
35-
print(f'Overall fetched {len(all_items)} items')
23+
for item in paginated_items.items:
24+
print(item) # Process the item as needed

docs/02_concepts/code/08_pagination_sync.py

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,15 @@ def main() -> None:
1010
dataset_client = apify_client.dataset('dataset-id')
1111

1212
# Define the pagination parameters
13-
limit = 1000 # Number of items per page
13+
limit = 1000 # Number items to request from API
1414
offset = 0 # Starting offset
15-
all_items = [] # List to store all fetched items
1615

17-
while True:
18-
# Fetch a page of items
19-
response = dataset_client.list_items(limit=limit, offset=offset)
20-
items = response.items
21-
total = response.total
16+
# Send single API call to fetch paginated items.
17+
# (number of items per single call can be limited by API)
18+
paginated_items = dataset_client.list_items(limit=limit, offset=offset)
2219

23-
print(f'Fetched {len(items)} items')
20+
# Inspect pagination metadata returned by API
21+
print(paginated_items.total)
2422

25-
# Add the fetched items to the complete list
26-
all_items.extend(items)
27-
28-
# Exit the loop if there are no more items to fetch
29-
if offset + limit >= total:
30-
break
31-
32-
# Increment the offset for the next page
33-
offset += limit
34-
35-
print(f'Overall fetched {len(all_items)} items')
23+
for item in paginated_items.items:
24+
print(item) # Process the item as needed

scripts/_utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
(re.compile(r'\bSynchronous\b'), 'Asynchronous'),
2828
(re.compile(r'Retry a function'), 'Retry an async function'),
2929
(re.compile(r'Function to retry'), 'Async function to retry'),
30+
(re.compile(r'returned page also supports iteration: `for'), 'returned page also supports iteration: `async for'),
3031
]
3132
"""Patterns for converting sync docstrings to async docstrings."""
3233

src/apify_client/_pagination.py

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from typing import TYPE_CHECKING, Any, Generic, Protocol, TypeVar
5+
6+
if TYPE_CHECKING:
7+
from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine, Generator, Iterator
8+
9+
T = TypeVar('T')
10+
11+
12+
class HasItems(Protocol[T]):
13+
items: list[T]
14+
15+
16+
def _min_for_limit_param(a: int | None, b: int | None) -> int | None:
17+
"""Return minimum of two limit parameters, treating `None` or `0` as infinity.
18+
19+
The Apify API treats `0` as no limit for the `limit` parameter, so `0` here means infinity.
20+
Returns `None` when both inputs represent infinity.
21+
"""
22+
if a == 0:
23+
a = None
24+
if b == 0:
25+
b = None
26+
if a is None:
27+
return b
28+
if b is None:
29+
return a
30+
return min(a, b)
31+
32+
33+
class _LazyTask(Generic[T]):
34+
"""Task that is created lazily upon awaiting.
35+
36+
This allows to reuse the same Task multiple times without the need to schedule the task when it is created.
37+
"""
38+
39+
def __init__(self, awaitable: Coroutine[Any, Any, T]) -> None:
40+
self._awaitable = awaitable
41+
self._task: asyncio.Task[T] | None = None
42+
43+
def __await__(self) -> Generator[Any, None, T]:
44+
if self._task is None:
45+
self._task = asyncio.create_task(self._awaitable)
46+
return (yield from self._task.__await__())
47+
48+
49+
def build_get_iterator(
50+
callback: Callable[..., HasItems[T]],
51+
first_page: HasItems[T],
52+
**kwargs: Any,
53+
) -> Callable[[], Iterator[T]]:
54+
"""Build a factory for `Iterator` to yield items across paginated API calls.
55+
56+
The callback is invoked to lazy fetch items from API.
57+
58+
There are several optional kwargs that control the pagination, but not all are accepted on each paginated endpoint.
59+
Some endpoints do not return all paginated metadata, so the implementation should be resilient to missing fields,
60+
but it can use them if available.
61+
62+
The `total` field from the first page is not trusted for stopping iteration because it may change between calls;
63+
iteration stops when a page has no items or when the user-requested `limit` has been reached.
64+
65+
The `count` field does not count objects returned, but objects scanned by the API. For example when using filters,
66+
returned items can be smaller than `count`. Therefore, `count` should be used for correct offset calculation if
67+
available.
68+
69+
Iteration relevant kwargs:
70+
chunk_size: Maximum number of items requested per API call during iteration. Pass `0`
71+
or `None` to let the API decide (effectively infinity).
72+
limit: User-requested total item limit. Stops iteration once this many items are yielded.
73+
offset: Starting offset for the first page.
74+
**other: Passed through to the callback unchanged.
75+
"""
76+
chunk_size = kwargs.pop('chunk_size', 0) or 0
77+
offset = kwargs.get('offset') or 0
78+
limit = kwargs.get('limit') or 0
79+
80+
def get_iterator() -> Iterator[T]:
81+
current_page = first_page
82+
yield from current_page.items
83+
84+
fetched_items = getattr(current_page, 'count', len(current_page.items))
85+
while current_page.items and (not limit or (limit > fetched_items)):
86+
new_kwargs = {
87+
**kwargs,
88+
'offset': offset + fetched_items,
89+
'limit': chunk_size if not limit else _min_for_limit_param(limit - fetched_items, chunk_size),
90+
}
91+
current_page = callback(**new_kwargs)
92+
yield from current_page.items
93+
fetched_items += getattr(current_page, 'count', len(current_page.items))
94+
95+
return get_iterator
96+
97+
98+
def build_get_iterator_async(
99+
callback: Callable[..., Coroutine[Any, Any, HasItems[T]]],
100+
fetch_first_page: Awaitable[HasItems[T]],
101+
**kwargs: Any,
102+
) -> Callable[[], AsyncIterator[T]]:
103+
"""Build a factory for `AsyncIterator` to yield items across paginated API calls.
104+
105+
Mirrors `build_get_iterator` but for async callbacks.
106+
"""
107+
chunk_size = kwargs.pop('chunk_size', 0) or 0
108+
offset = kwargs.get('offset') or 0
109+
limit = kwargs.get('limit') or 0
110+
111+
async def get_async_iterator() -> AsyncIterator[T]:
112+
current_page = await fetch_first_page
113+
for item in current_page.items:
114+
yield item
115+
116+
fetched_items = getattr(current_page, 'count', len(current_page.items))
117+
while current_page.items and (not limit or (limit > fetched_items)):
118+
new_kwargs = {
119+
**kwargs,
120+
'offset': offset + fetched_items,
121+
'limit': chunk_size if not limit else _min_for_limit_param(limit - fetched_items, chunk_size),
122+
}
123+
current_page = await callback(**new_kwargs)
124+
for item in current_page.items:
125+
yield item
126+
fetched_items += getattr(current_page, 'count', len(current_page.items))
127+
128+
return get_async_iterator
129+
130+
131+
def build_get_cursor_iterator(
132+
callback: Callable[..., HasItems[T]],
133+
first_page: HasItems[T],
134+
*,
135+
cursor_param: str,
136+
limit: int | None = None,
137+
chunk_size: int | None = None,
138+
**kwargs: Any,
139+
) -> Callable[[], Iterator[T]]:
140+
"""Build a factory for `Iterator` to yield items across paginated API calls.
141+
142+
Mirrors `build_get_iterator` but with cursor based pagination.
143+
144+
The caller is responsible for fetching the first page (typically by calling `callback` with
145+
the initial cursor). After each page, `getattr(page, f'next_{cursor_param}')` is consulted
146+
to obtain the next cursor; returning `None` ends iteration. The iteration also stops when a
147+
page is empty or when the caller-requested `limit` has been reached.
148+
"""
149+
effective_chunk = chunk_size or 0
150+
user_limit = limit or 0
151+
152+
def get_iterator() -> Iterator[T]:
153+
current_page = first_page
154+
yield from current_page.items
155+
156+
fetched = len(current_page.items)
157+
next_cursor = getattr(current_page, f'next_{cursor_param}')
158+
159+
while current_page.items and next_cursor is not None and (not user_limit or user_limit > fetched):
160+
remaining = (user_limit - fetched) if user_limit else 0
161+
next_limit = effective_chunk if not user_limit else _min_for_limit_param(remaining, effective_chunk)
162+
current_page = callback(**{**kwargs, cursor_param: next_cursor, 'limit': next_limit})
163+
yield from current_page.items
164+
fetched += len(current_page.items)
165+
next_cursor = getattr(current_page, f'next_{cursor_param}')
166+
167+
return get_iterator
168+
169+
170+
def build_get_cursor_iterator_async(
171+
callback: Callable[..., Coroutine[Any, Any, HasItems[T]]],
172+
fetch_first_page: Awaitable[HasItems[T]],
173+
*,
174+
cursor_param: str,
175+
limit: int | None = None,
176+
chunk_size: int | None = None,
177+
**kwargs: Any,
178+
) -> Callable[[], AsyncIterator[T]]:
179+
"""Build a factory for `Iterator` to yield items across paginated API calls.
180+
181+
Mirrors `build_get_cursor_iterator` but for async callbacks.
182+
"""
183+
effective_chunk = chunk_size or 0
184+
user_limit = limit or 0
185+
186+
async def get_async_iterator() -> AsyncIterator[T]:
187+
current_page = await fetch_first_page
188+
for item in current_page.items:
189+
yield item
190+
191+
fetched = len(current_page.items)
192+
next_cursor = getattr(current_page, f'next_{cursor_param}')
193+
194+
while current_page.items and next_cursor is not None and (not user_limit or user_limit > fetched):
195+
remaining = (user_limit - fetched) if user_limit else 0
196+
next_limit = effective_chunk if not user_limit else _min_for_limit_param(remaining, effective_chunk)
197+
current_page = await callback(**{**kwargs, cursor_param: next_cursor, 'limit': next_limit})
198+
for item in current_page.items:
199+
yield item
200+
fetched += len(current_page.items)
201+
next_cursor = getattr(current_page, f'next_{cursor_param}')
202+
203+
return get_async_iterator

0 commit comments

Comments
 (0)