# JTFTP - Python/AsyncIO TFTP Server # Copyright (C) 2022 Jeffrey C. Ollie # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import asyncio import itertools import logging from typing import Awaitable from typing import Callable from typing import Iterable from typing import TypeVar logger = logging.getLogger(__name__) T = TypeVar("T") def iterlast(iterable: Iterable[T]) -> Iterable[tuple[bool, T]]: """Generate C{(is_last, item)} tuples from C{iterable}. On each iteration this peeks ahead to see if the most recent iteration will be the last, and returns this information as the C{is_last} element of each tuple. """ iterable, peekable = itertools.tee(iterable) try: # advance the peekable iterator next(peekable) except StopIteration: # the iterator is zero length return for item in iterable: try: next(peekable) except StopIteration: yield True, item else: yield False, item async def timed_caller( timings: Iterable[int], call: Callable[[None], Awaitable[None]], last: Callable[[None], Awaitable[None]], ) -> None: """Call C{call} or C{last} according to C{timings}. The given C{timings} is an iterable of numbers. Each is a delay in seconds that will be taken before making the next call to C{call} or C{last}. The call to C{last} will happen after the last delay. If C{timings} is an infinite iterable then C{last} will never be called. @raise ValueError: if no timings are specified; there must be at least one, even if it specifies a zero seconds delay. @raise ValueError: if a negative timing is specified. """ no_timings = True for is_last, timing in iterlast(timings): no_timings = False if timing < 0: raise ValueError("negative timing") await asyncio.sleep(timing) if is_last: await last() else: await call() if no_timings: raise ValueError("no timings specified")