From 69eb65f32b3e7ac4b017c8f9e664de5adf9b327e Mon Sep 17 00:00:00 2001 From: nikili0n Date: Wed, 18 Oct 2023 00:38:33 +0300 Subject: [PATCH] remove loop from app --- main.py | 4 +- src/core/master_service.py | 19 ++----- .../media_downloader.py | 7 +-- .../telegram_media_downloader/module/app.py | 6 +- .../telegram_media_downloader/module/bot.py | 6 +- .../telegram_parser.py | 56 +++++-------------- 6 files changed, 32 insertions(+), 66 deletions(-) diff --git a/main.py b/main.py index 2dc10b1..1c4f13c 100644 --- a/main.py +++ b/main.py @@ -1,8 +1,10 @@ +import asyncio from multiprocessing import freeze_support from src.core.master_service import MasterService if __name__ == '__main__': freeze_support() - ms = MasterService() + loop = asyncio.get_event_loop() + ms = MasterService(loop) ms.loop.run_until_complete(ms.run()) diff --git a/src/core/master_service.py b/src/core/master_service.py index e5fa67c..fdcc90d 100644 --- a/src/core/master_service.py +++ b/src/core/master_service.py @@ -22,8 +22,8 @@ from src.parsers.Telegram.telegram_media_downloader.telegram_parser import Teleg class MasterService: - def __init__(self): - self.loop = asyncio.get_event_loop() + def __init__(self, loop): + self.loop = loop self.MAX_EXECUTOR_WORKERS = 8 self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS, @@ -71,19 +71,10 @@ class MasterService: match downloader: case TelegramParser(): if _check_config(): - tg_client = pyrogram.Client( - "media_downloader", - api_id=app.api_id, - api_hash=app.api_hash, - proxy=app.proxy, - workdir=app.session_file_path, - ) - app.pre_run() - app.is_running = True - tg_client.start() - loop = asyncio.get_running_loop() + loop = asyncio.get_event_loop() asyncio.set_event_loop(loop) - loop.run_until_complete(downloader.video_download(client=tg_client)) + loop.run_until_complete(downloader.video_download()) + # loop.close() return result case _: result = downloader.video_download() diff --git a/src/parsers/Telegram/telegram_media_downloader/media_downloader.py b/src/parsers/Telegram/telegram_media_downloader/media_downloader.py index 75b69e3..ab55829 100644 --- a/src/parsers/Telegram/telegram_media_downloader/media_downloader.py +++ b/src/parsers/Telegram/telegram_media_downloader/media_downloader.py @@ -478,6 +478,7 @@ def _check_config() -> bool: async def worker(client: pyrogram.client.Client): """Work for download task""" + # TODO: mb replace with asyncio.Event while app.is_running: try: item = await queue.get() @@ -487,11 +488,9 @@ async def worker(client: pyrogram.client.Client): if node.client: await download_task(node.client, message, node) app.is_running = False - app.loop.stop() else: await download_task(client, message, node) app.is_running = False - app.loop.stop() except Exception as e: logger.exception(f"{e}") @@ -568,7 +567,7 @@ async def run_until_all_task_finish(): def _exec_loop(): """Exec loop""" - + # TODO: broken, no loop if app.bot_token: app.loop.run_forever() else: @@ -593,7 +592,7 @@ def main(): client.start() logger.success(_t("Successfully started (Press Ctrl+C to stop)")) - + # TODO: broken app.loop.create_task(download_all_chat(client)) for _ in range(app.max_download_task): task = app.loop.create_task(worker(client)) diff --git a/src/parsers/Telegram/telegram_media_downloader/module/app.py b/src/parsers/Telegram/telegram_media_downloader/module/app.py index 74a17bf..f433dca 100644 --- a/src/parsers/Telegram/telegram_media_downloader/module/app.py +++ b/src/parsers/Telegram/telegram_media_downloader/module/app.py @@ -227,9 +227,6 @@ class Application: self.web_login_secret: str = "" self.debug_web: bool = False - self.loop = asyncio.get_event_loop() - asyncio.set_event_loop(self.loop) - self.executor = ThreadPoolExecutor( min(32, (os.cpu_count() or 0) + 4), thread_name_prefix="multi_task" ) @@ -447,7 +444,8 @@ class Application: self.cloud_drive_config, self.save_path, local_file_path ) elif self.cloud_drive_config.upload_adapter == "aligo": - ret = await self.loop.run_in_executor( + loop = asyncio.get_running_loop() + ret = await loop.run_in_executor( self.executor, CloudDrive.aligo_upload_file( self.cloud_drive_config, self.save_path, local_file_path diff --git a/src/parsers/Telegram/telegram_media_downloader/module/bot.py b/src/parsers/Telegram/telegram_media_downloader/module/bot.py index 5140eff..c45eaa8 100644 --- a/src/parsers/Telegram/telegram_media_downloader/module/bot.py +++ b/src/parsers/Telegram/telegram_media_downloader/module/bot.py @@ -232,8 +232,10 @@ class DownloadBot: pass # TODO: add admin # self.bot.set_my_commands(commands, scope=types.BotCommandScopeChatAdministrators(self.app.)) - - _bot.app.loop.create_task(_bot.update_reply_message()) + # TODO: check for correctness + loop = asyncio.get_running_loop() + loop.create_task(_bot.update_reply_message()) + # _bot.app.loop.create_task(_bot.update_reply_message()) _bot = DownloadBot() diff --git a/src/parsers/Telegram/telegram_media_downloader/telegram_parser.py b/src/parsers/Telegram/telegram_media_downloader/telegram_parser.py index 74bd92e..83327ac 100644 --- a/src/parsers/Telegram/telegram_media_downloader/telegram_parser.py +++ b/src/parsers/Telegram/telegram_media_downloader/telegram_parser.py @@ -3,6 +3,7 @@ from urllib.parse import urlparse from loguru import logger from pyrogram import Client +import pyrogram from ruamel.yaml import YAML from src.exceptions.download_exceptions import FileAlreadyExistException @@ -12,46 +13,9 @@ from src.parsers.base_parser import BaseParser import asyncio import threading -from typing import Awaitable, TypeVar -T = TypeVar("T") - - -def _start_background_loop(loop): - asyncio.set_event_loop(loop) - loop.run_forever() - - -_LOOP = asyncio.new_event_loop() -_LOOP_THREAD = threading.Thread( - target=_start_background_loop, args=(_LOOP,), daemon=True -) -_LOOP_THREAD.start() - - -def asyncio_run(coro: Awaitable[T], timeout=30) -> T: - """ - Runs the coroutine in an event loop running on a background thread, - and blocks the current thread until it returns a result. - This plays well with gevent, since it can yield on the Future result call. - - :param coro: A coroutine, typically an async method - :param timeout: How many seconds we should wait for a result before raising an error - """ - return asyncio.run_coroutine_threadsafe(coro, _LOOP).result(timeout=timeout) - - -def asyncio_gather(*futures, return_exceptions=False) -> list: - """ - A version of asyncio.gather that runs on the internal event loop - """ - async def gather(): - return await asyncio.gather(*futures, return_exceptions=return_exceptions) - - return asyncio.run_coroutine_threadsafe(gather(), loop=_LOOP).result() - class TelegramParser(BaseParser): - async def video_download(self, client: Client = None): + async def video_download(self): url_parse_result = urlparse(self.params["link"]) channel, message_id = url_parse_result.path[1:].split('/') if "/c/" not in url_parse_result.path else \ url_parse_result.path[3:].split('/') @@ -68,9 +32,19 @@ class TelegramParser(BaseParser): mode="w+", encoding="utf-8") as f: YAML().dump(config, f) if _check_config(): - await download_all_chat(client) - await worker(client) - client.stop() + tg_client = pyrogram.Client( + "media_downloader", + api_id=app.api_id, + api_hash=app.api_hash, + proxy=app.proxy, + workdir=app.session_file_path, + ) + app.pre_run() + app.is_running = True + await tg_client.start() + await download_all_chat(tg_client) + await worker(tg_client) + await tg_client.stop() app.is_running = False logger.info("Stopped!") return f"Telegram/{message_id}.mp4"