Source code for pipelinex.extras.datasets.httpx.async_api_dataset

import asyncio
import sys
from typing import Any, Dict

import httpx

from ..requests.api_dataset import APIDataSet


[docs]def asyncio_run(aw): if sys.version_info >= (3, 7): return asyncio.run(aw) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: r = loop.run_until_complete(aw) finally: loop.close() asyncio.set_event_loop(None) return r
[docs]async def request_coroutine(session, method, url, request_args): return await session.request(method, url=url, **request_args)
[docs]async def requests_coroutine(session_config, method, url_list, request_args): async with httpx.AsyncClient(**session_config) as session: request_coroutines = [ request_coroutine(session, method, url, request_args) for url in url_list ] request_tasks = [ asyncio.ensure_future(coroutine) for coroutine in request_coroutines ] r = await asyncio.wait(request_tasks) return r
[docs]class AsyncAPIDataSet(APIDataSet): def _configure_session(self, session_config, _): return session_config def _execute_request(self) -> Dict[str, Any]: request_args = self._request_args method = self._method url_dict = self._get_url_dict() session_config = self._session name_url_list = list(url_dict.items()) url_list = [e[1] for e in name_url_list] tasks_done, tasks_pending = asyncio_run( requests_coroutine(session_config, method, url_list, request_args) ) name_list = [e[0] for e in name_url_list] response_dict = {} for name, task in zip(name_list, tasks_done): try: response_dict[name] = task.result() except Exception as exc: response_dict[name] = self._handle_exceptions(exc) return response_dict