From 913dc7f9aa20f08fce34f3cfb59ca672c47dbeb9 Mon Sep 17 00:00:00 2001 From: nikili0n Date: Tue, 17 Oct 2023 19:07:26 +0300 Subject: [PATCH] minor fixes --- .../telegram_parser.py | 46 +++++++++++++++++-- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/src/parsers/Telegram/telegram_media_downloader/telegram_parser.py b/src/parsers/Telegram/telegram_media_downloader/telegram_parser.py index ef85878..1b18d43 100644 --- a/src/parsers/Telegram/telegram_media_downloader/telegram_parser.py +++ b/src/parsers/Telegram/telegram_media_downloader/telegram_parser.py @@ -1,6 +1,4 @@ -import asyncio import os -from asyncio import gather from urllib.parse import urlparse from loguru import logger @@ -12,6 +10,45 @@ from src.parsers.Telegram.telegram_media_downloader.media_downloader import _che worker from src.parsers.base_parser import BaseParser +import asyncio +import threading +from typing import Awaitable, TypeVar +T = TypeVar("T") + + +def _start_background_loop(loop): + asyncio.set_event_loop(loop) + loop.run_forever() + + +_LOOP = asyncio.new_event_loop() +_LOOP_THREAD = threading.Thread( + target=_start_background_loop, args=(_LOOP,), daemon=True +) +_LOOP_THREAD.start() + + +def asyncio_run(coro: Awaitable[T], timeout=30) -> T: + """ + Runs the coroutine in an event loop running on a background thread, + and blocks the current thread until it returns a result. + This plays well with gevent, since it can yield on the Future result call. + + :param coro: A coroutine, typically an async method + :param timeout: How many seconds we should wait for a result before raising an error + """ + return asyncio.run_coroutine_threadsafe(coro, _LOOP).result(timeout=timeout) + + +def asyncio_gather(*futures, return_exceptions=False) -> list: + """ + A version of asyncio.gather that runs on the internal event loop + """ + async def gather(): + return await asyncio.gather(*futures, return_exceptions=return_exceptions) + + return asyncio.run_coroutine_threadsafe(gather(), loop=_LOOP).result() + class TelegramParser(BaseParser): def video_download(self, client: Client = None): @@ -31,9 +68,8 @@ class TelegramParser(BaseParser): mode="w+", encoding="utf-8") as f: YAML().dump(config, f) if _check_config(): - asyncio.run_coroutine_threadsafe(download_all_chat(client), app.loop) - asyncio.run_coroutine_threadsafe(worker(client), app.loop) - app.loop.run_forever() + a = asyncio_run(download_all_chat(client)) + b = asyncio_run(worker(client)) client.stop() app.is_running = False logger.info("Stopped!")