Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions docs/02_concepts/08_pagination.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import ApiLink from '@site/src/components/ApiLink';

import PaginationAsyncExample from '!!raw-loader!./code/08_pagination_async.py';
import PaginationSyncExample from '!!raw-loader!./code/08_pagination_sync.py';

import IterateItemsAsyncExample from '!!raw-loader!./code/08_iterate_items_async.py';
import IterateItemsSyncExample from '!!raw-loader!./code/08_iterate_items_sync.py';


Most methods named `list` or `list_something` in the Apify client return a <ApiLink to="class/ListPage">`ListPage`</ApiLink> object. This object provides a consistent interface for working with paginated data and includes the following properties:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still have ListPage? Could you ask Claude to also review and update the docs?


- `items` - The main results you're looking for.
Expand Down Expand Up @@ -45,7 +45,7 @@ The <ApiLink to="class/ListPage">`ListPage`</ApiLink> interface offers several k

## Generator-based iteration

For most use cases, `iterate_items()` is the recommended way to process all items in a dataset. It handles pagination automatically using a Python generator, fetching items in batches behind the scenes so you don't need to manage offsets or limits yourself.
You can also use the `list` methods directly in iteration. It handles pagination automatically, fetching items in batches behind the scenes so you don't need to manage offsets or limits yourself.

<Tabs>
<TabItem value="AsyncExample" label="Async client" default>
Expand All @@ -60,6 +60,4 @@ For most use cases, `iterate_items()` is the recommended way to process all item
</TabItem>
</Tabs>

`iterate_items()` accepts the same filtering parameters as `list_items()` (`clean`, `fields`, `omit`, `unwind`, `skip_empty`, `skip_hidden`), so you can combine automatic pagination with data filtering.

Similarly, `KeyValueStoreClient` provides an `iterate_keys()` method for iterating over all keys in a key-value store without manual pagination.
Similarly, you can iterate over the return value of `KeyValueStoreClient.list_keys()` to go through all keys in a key-value store without manual pagination. The older `iterate_keys()` method is deprecated.
11 changes: 8 additions & 3 deletions docs/02_concepts/code/08_iterate_items_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ async def main() -> None:
apify_client = ApifyClientAsync(TOKEN)
dataset_client = apify_client.dataset('dataset-id')

# Iterate through all items automatically.
async for item in dataset_client.iterate_items():
print(item)
# Define the pagination parameters
limit = 1500 # Number of items in total
offset = 100 # Starting offset

# Iterate through items automatically, lazily sending as many API calls
# as needed and receiving items in chunks.
async for item in dataset_client.list_items(limit=limit, offset=offset):
print(item) # Process the item as needed
11 changes: 8 additions & 3 deletions docs/02_concepts/code/08_iterate_items_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ def main() -> None:
apify_client = ApifyClient(TOKEN)
dataset_client = apify_client.dataset('dataset-id')

# Iterate through all items automatically.
for item in dataset_client.iterate_items():
print(item)
# Define the pagination parameters
limit = 1500 # Number of items in total
offset = 100 # Starting offset

# Iterate through items automatically, lazily sending as many API calls
# as needed and receiving items in chunks.
for item in dataset_client.list_items(limit=limit, offset=offset):
print(item) # Process the item as needed


if __name__ == '__main__':
Expand Down
27 changes: 8 additions & 19 deletions docs/02_concepts/code/08_pagination_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,15 @@ async def main() -> None:
dataset_client = apify_client.dataset('dataset-id')

# Define the pagination parameters
limit = 1000 # Number of items per page
limit = 1000 # Number items to request from API
offset = 0 # Starting offset
all_items = [] # List to store all fetched items

while True:
# Fetch a page of items
response = await dataset_client.list_items(limit=limit, offset=offset)
items = response.items
total = response.total
# Send single API call to fetch paginated items.
# (number of items per single call can be limited by API)
paginated_items = await dataset_client.list_items(limit=limit, offset=offset)

print(f'Fetched {len(items)} items')
# Inspect pagination metadata returned by API
print(paginated_items.total)

# Add the fetched items to the complete list
all_items.extend(items)

# Exit the loop if there are no more items to fetch
if offset + limit >= total:
break

# Increment the offset for the next page
offset += limit

print(f'Overall fetched {len(all_items)} items')
for item in paginated_items.items:
print(item) # Process the item as needed
27 changes: 8 additions & 19 deletions docs/02_concepts/code/08_pagination_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,15 @@ def main() -> None:
dataset_client = apify_client.dataset('dataset-id')

# Define the pagination parameters
limit = 1000 # Number of items per page
limit = 1000 # Number items to request from API
offset = 0 # Starting offset
all_items = [] # List to store all fetched items

while True:
# Fetch a page of items
response = dataset_client.list_items(limit=limit, offset=offset)
items = response.items
total = response.total
# Send single API call to fetch paginated items.
# (number of items per single call can be limited by API)
paginated_items = dataset_client.list_items(limit=limit, offset=offset)

print(f'Fetched {len(items)} items')
# Inspect pagination metadata returned by API
print(paginated_items.total)

# Add the fetched items to the complete list
all_items.extend(items)

# Exit the loop if there are no more items to fetch
if offset + limit >= total:
break

# Increment the offset for the next page
offset += limit

print(f'Overall fetched {len(all_items)} items')
for item in paginated_items.items:
print(item) # Process the item as needed
1 change: 1 addition & 0 deletions scripts/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
(re.compile(r'\bSynchronous\b'), 'Asynchronous'),
(re.compile(r'Retry a function'), 'Retry an async function'),
(re.compile(r'Function to retry'), 'Async function to retry'),
(re.compile(r'returned page also supports iteration: `for'), 'returned page also supports iteration: `async for'),
]
"""Patterns for converting sync docstrings to async docstrings."""

Expand Down
203 changes: 203 additions & 0 deletions src/apify_client/_pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING, Any, Generic, Protocol, TypeVar

if TYPE_CHECKING:
from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine, Generator, Iterator

T = TypeVar('T')


class HasItems(Protocol[T]):
items: list[T]


def _min_for_limit_param(a: int | None, b: int | None) -> int | None:
"""Return minimum of two limit parameters, treating `None` or `0` as infinity.

The Apify API treats `0` as no limit for the `limit` parameter, so `0` here means infinity.
Returns `None` when both inputs represent infinity.
"""
if a == 0:
a = None
if b == 0:
b = None
if a is None:
return b
if b is None:
return a
return min(a, b)


class _LazyTask(Generic[T]):
"""Task that is created lazily upon awaiting.

This allows to reuse the same Task multiple times without the need to schedule the task when it is created.
"""

def __init__(self, awaitable: Coroutine[Any, Any, T]) -> None:
self._awaitable = awaitable
self._task: asyncio.Task[T] | None = None

def __await__(self) -> Generator[Any, None, T]:
if self._task is None:
self._task = asyncio.create_task(self._awaitable)
return (yield from self._task.__await__())


def build_get_iterator(
callback: Callable[..., HasItems[T]],
first_page: HasItems[T],
**kwargs: Any,
) -> Callable[[], Iterator[T]]:
"""Build a factory for `Iterator` to yield items across paginated API calls.

The callback is invoked to lazy fetch items from API.

There are several optional kwargs that control the pagination, but not all are accepted on each paginated endpoint.
Some endpoints do not return all paginated metadata, so the implementation should be resilient to missing fields,
but it can use them if available.

The `total` field from the first page is not trusted for stopping iteration because it may change between calls;
iteration stops when a page has no items or when the user-requested `limit` has been reached.

The `count` field does not count objects returned, but objects scanned by the API. For example when using filters,
returned items can be smaller than `count`. Therefore, `count` should be used for correct offset calculation if
available.

Iteration relevant kwargs:
chunk_size: Maximum number of items requested per API call during iteration. Pass `0`
or `None` to let the API decide (effectively infinity).
limit: User-requested total item limit. Stops iteration once this many items are yielded.
offset: Starting offset for the first page.
**other: Passed through to the callback unchanged.
"""
chunk_size = kwargs.pop('chunk_size', 0) or 0
offset = kwargs.get('offset') or 0
limit = kwargs.get('limit') or 0

def get_iterator() -> Iterator[T]:
current_page = first_page
yield from current_page.items

fetched_items = getattr(current_page, 'count', len(current_page.items))
while current_page.items and (not limit or (limit > fetched_items)):
new_kwargs = {
**kwargs,
'offset': offset + fetched_items,
'limit': chunk_size if not limit else _min_for_limit_param(limit - fetched_items, chunk_size),
}
current_page = callback(**new_kwargs)
yield from current_page.items
fetched_items += getattr(current_page, 'count', len(current_page.items))

return get_iterator


def build_get_iterator_async(
callback: Callable[..., Coroutine[Any, Any, HasItems[T]]],
fetch_first_page: Awaitable[HasItems[T]],
**kwargs: Any,
) -> Callable[[], AsyncIterator[T]]:
"""Build a factory for `AsyncIterator` to yield items across paginated API calls.

Mirrors `build_get_iterator` but for async callbacks.
"""
chunk_size = kwargs.pop('chunk_size', 0) or 0
offset = kwargs.get('offset') or 0
limit = kwargs.get('limit') or 0

async def get_async_iterator() -> AsyncIterator[T]:
current_page = await fetch_first_page
for item in current_page.items:
yield item

fetched_items = getattr(current_page, 'count', len(current_page.items))
while current_page.items and (not limit or (limit > fetched_items)):
new_kwargs = {
**kwargs,
'offset': offset + fetched_items,
'limit': chunk_size if not limit else _min_for_limit_param(limit - fetched_items, chunk_size),
}
current_page = await callback(**new_kwargs)
for item in current_page.items:
yield item
fetched_items += getattr(current_page, 'count', len(current_page.items))

return get_async_iterator


def build_get_cursor_iterator(
callback: Callable[..., HasItems[T]],
first_page: HasItems[T],
*,
cursor_param: str,
limit: int | None = None,
chunk_size: int | None = None,
**kwargs: Any,
) -> Callable[[], Iterator[T]]:
"""Build a factory for `Iterator` to yield items across paginated API calls.

Mirrors `build_get_iterator` but with cursor based pagination.

The caller is responsible for fetching the first page (typically by calling `callback` with
the initial cursor). After each page, `getattr(page, f'next_{cursor_param}')` is consulted
to obtain the next cursor; returning `None` ends iteration. The iteration also stops when a
page is empty or when the caller-requested `limit` has been reached.
"""
effective_chunk = chunk_size or 0
user_limit = limit or 0

def get_iterator() -> Iterator[T]:
current_page = first_page
yield from current_page.items

fetched = len(current_page.items)
next_cursor = getattr(current_page, f'next_{cursor_param}')

while current_page.items and next_cursor is not None and (not user_limit or user_limit > fetched):
remaining = (user_limit - fetched) if user_limit else 0
next_limit = effective_chunk if not user_limit else _min_for_limit_param(remaining, effective_chunk)
current_page = callback(**{**kwargs, cursor_param: next_cursor, 'limit': next_limit})
yield from current_page.items
fetched += len(current_page.items)
next_cursor = getattr(current_page, f'next_{cursor_param}')

return get_iterator


def build_get_cursor_iterator_async(
callback: Callable[..., Coroutine[Any, Any, HasItems[T]]],
fetch_first_page: Awaitable[HasItems[T]],
*,
cursor_param: str,
limit: int | None = None,
chunk_size: int | None = None,
**kwargs: Any,
) -> Callable[[], AsyncIterator[T]]:
"""Build a factory for `Iterator` to yield items across paginated API calls.

Mirrors `build_get_cursor_iterator` but for async callbacks.
"""
effective_chunk = chunk_size or 0
user_limit = limit or 0

async def get_async_iterator() -> AsyncIterator[T]:
current_page = await fetch_first_page
for item in current_page.items:
yield item

fetched = len(current_page.items)
next_cursor = getattr(current_page, f'next_{cursor_param}')

while current_page.items and next_cursor is not None and (not user_limit or user_limit > fetched):
remaining = (user_limit - fetched) if user_limit else 0
next_limit = effective_chunk if not user_limit else _min_for_limit_param(remaining, effective_chunk)
current_page = await callback(**{**kwargs, cursor_param: next_cursor, 'limit': next_limit})
for item in current_page.items:
yield item
fetched += len(current_page.items)
next_cursor = getattr(current_page, f'next_{cursor_param}')

return get_async_iterator
Loading