Source code for aoptk.literature.utils

import asyncio
import time


[docs] class AsyncRequestLimiter: """Asynchronous request limiter to control the rate of API calls.""" def __init__(self, requests_per_second: int):
[docs] self.min_interval = 1.0 / requests_per_second
[docs] self._lock = asyncio.Lock()
[docs] self._next_allowed = 0.0
[docs] async def wait_turn(self) -> None: """Wait until it's the turn for the next request based on the rate limit.""" async with self._lock: now = time.monotonic() if now < self._next_allowed: await asyncio.sleep(self._next_allowed - now) now = time.monotonic() self._next_allowed = now + self.min_interval
[docs] def is_europepmc_id(publication_id: str) -> bool: """Check if the given publication ID is a EuropePMC ID.""" return bool(publication_id.startswith("PMC"))