remove loop from app

This commit is contained in:
Dantenerosas 2023-10-18 00:38:33 +03:00
parent ccb16580be
commit 7646759ead
6 changed files with 32 additions and 66 deletions

View File

@ -1,8 +1,10 @@
import asyncio
from multiprocessing import freeze_support from multiprocessing import freeze_support
from src.core.master_service import MasterService from src.core.master_service import MasterService
if __name__ == '__main__': if __name__ == '__main__':
freeze_support() freeze_support()
ms = MasterService() loop = asyncio.get_event_loop()
ms = MasterService(loop)
ms.loop.run_until_complete(ms.run()) ms.loop.run_until_complete(ms.run())

View File

@ -22,8 +22,8 @@ from src.parsers.Telegram.telegram_media_downloader.telegram_parser import Teleg
class MasterService: class MasterService:
def __init__(self): def __init__(self, loop):
self.loop = asyncio.get_event_loop() self.loop = loop
self.MAX_EXECUTOR_WORKERS = 8 self.MAX_EXECUTOR_WORKERS = 8
self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS, self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS,
@ -71,19 +71,10 @@ class MasterService:
match downloader: match downloader:
case TelegramParser(): case TelegramParser():
if _check_config(): if _check_config():
tg_client = pyrogram.Client( loop = asyncio.get_event_loop()
"media_downloader",
api_id=app.api_id,
api_hash=app.api_hash,
proxy=app.proxy,
workdir=app.session_file_path,
)
app.pre_run()
app.is_running = True
tg_client.start()
loop = asyncio.get_running_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
loop.run_until_complete(downloader.video_download(client=tg_client)) loop.run_until_complete(downloader.video_download())
# loop.close()
return result return result
case _: case _:
result = downloader.video_download() result = downloader.video_download()

View File

@ -478,6 +478,7 @@ def _check_config() -> bool:
async def worker(client: pyrogram.client.Client): async def worker(client: pyrogram.client.Client):
"""Work for download task""" """Work for download task"""
# TODO: mb replace with asyncio.Event
while app.is_running: while app.is_running:
try: try:
item = await queue.get() item = await queue.get()
@ -487,11 +488,9 @@ async def worker(client: pyrogram.client.Client):
if node.client: if node.client:
await download_task(node.client, message, node) await download_task(node.client, message, node)
app.is_running = False app.is_running = False
app.loop.stop()
else: else:
await download_task(client, message, node) await download_task(client, message, node)
app.is_running = False app.is_running = False
app.loop.stop()
except Exception as e: except Exception as e:
logger.exception(f"{e}") logger.exception(f"{e}")
@ -568,7 +567,7 @@ async def run_until_all_task_finish():
def _exec_loop(): def _exec_loop():
"""Exec loop""" """Exec loop"""
# TODO: broken, no loop
if app.bot_token: if app.bot_token:
app.loop.run_forever() app.loop.run_forever()
else: else:
@ -593,7 +592,7 @@ def main():
client.start() client.start()
logger.success(_t("Successfully started (Press Ctrl+C to stop)")) logger.success(_t("Successfully started (Press Ctrl+C to stop)"))
# TODO: broken
app.loop.create_task(download_all_chat(client)) app.loop.create_task(download_all_chat(client))
for _ in range(app.max_download_task): for _ in range(app.max_download_task):
task = app.loop.create_task(worker(client)) task = app.loop.create_task(worker(client))

View File

@ -227,9 +227,6 @@ class Application:
self.web_login_secret: str = "" self.web_login_secret: str = ""
self.debug_web: bool = False self.debug_web: bool = False
self.loop = asyncio.get_event_loop()
asyncio.set_event_loop(self.loop)
self.executor = ThreadPoolExecutor( self.executor = ThreadPoolExecutor(
min(32, (os.cpu_count() or 0) + 4), thread_name_prefix="multi_task" min(32, (os.cpu_count() or 0) + 4), thread_name_prefix="multi_task"
) )
@ -447,7 +444,8 @@ class Application:
self.cloud_drive_config, self.save_path, local_file_path self.cloud_drive_config, self.save_path, local_file_path
) )
elif self.cloud_drive_config.upload_adapter == "aligo": elif self.cloud_drive_config.upload_adapter == "aligo":
ret = await self.loop.run_in_executor( loop = asyncio.get_running_loop()
ret = await loop.run_in_executor(
self.executor, self.executor,
CloudDrive.aligo_upload_file( CloudDrive.aligo_upload_file(
self.cloud_drive_config, self.save_path, local_file_path self.cloud_drive_config, self.save_path, local_file_path

View File

@ -232,8 +232,10 @@ class DownloadBot:
pass pass
# TODO: add admin # TODO: add admin
# self.bot.set_my_commands(commands, scope=types.BotCommandScopeChatAdministrators(self.app.)) # self.bot.set_my_commands(commands, scope=types.BotCommandScopeChatAdministrators(self.app.))
# TODO: check for correctness
_bot.app.loop.create_task(_bot.update_reply_message()) loop = asyncio.get_running_loop()
loop.create_task(_bot.update_reply_message())
# _bot.app.loop.create_task(_bot.update_reply_message())
_bot = DownloadBot() _bot = DownloadBot()

View File

@ -3,6 +3,7 @@ from urllib.parse import urlparse
from loguru import logger from loguru import logger
from pyrogram import Client from pyrogram import Client
import pyrogram
from ruamel.yaml import YAML from ruamel.yaml import YAML
from src.exceptions.download_exceptions import FileAlreadyExistException from src.exceptions.download_exceptions import FileAlreadyExistException
@ -12,46 +13,9 @@ from src.parsers.base_parser import BaseParser
import asyncio import asyncio
import threading import threading
from typing import Awaitable, TypeVar
T = TypeVar("T")
def _start_background_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
_LOOP = asyncio.new_event_loop()
_LOOP_THREAD = threading.Thread(
target=_start_background_loop, args=(_LOOP,), daemon=True
)
_LOOP_THREAD.start()
def asyncio_run(coro: Awaitable[T], timeout=30) -> T:
"""
Runs the coroutine in an event loop running on a background thread,
and blocks the current thread until it returns a result.
This plays well with gevent, since it can yield on the Future result call.
:param coro: A coroutine, typically an async method
:param timeout: How many seconds we should wait for a result before raising an error
"""
return asyncio.run_coroutine_threadsafe(coro, _LOOP).result(timeout=timeout)
def asyncio_gather(*futures, return_exceptions=False) -> list:
"""
A version of asyncio.gather that runs on the internal event loop
"""
async def gather():
return await asyncio.gather(*futures, return_exceptions=return_exceptions)
return asyncio.run_coroutine_threadsafe(gather(), loop=_LOOP).result()
class TelegramParser(BaseParser): class TelegramParser(BaseParser):
async def video_download(self, client: Client = None): async def video_download(self):
url_parse_result = urlparse(self.params["link"]) url_parse_result = urlparse(self.params["link"])
channel, message_id = url_parse_result.path[1:].split('/') if "/c/" not in url_parse_result.path else \ channel, message_id = url_parse_result.path[1:].split('/') if "/c/" not in url_parse_result.path else \
url_parse_result.path[3:].split('/') url_parse_result.path[3:].split('/')
@ -68,9 +32,19 @@ class TelegramParser(BaseParser):
mode="w+", encoding="utf-8") as f: mode="w+", encoding="utf-8") as f:
YAML().dump(config, f) YAML().dump(config, f)
if _check_config(): if _check_config():
await download_all_chat(client) tg_client = pyrogram.Client(
await worker(client) "media_downloader",
client.stop() api_id=app.api_id,
api_hash=app.api_hash,
proxy=app.proxy,
workdir=app.session_file_path,
)
app.pre_run()
app.is_running = True
await tg_client.start()
await download_all_chat(tg_client)
await worker(tg_client)
await tg_client.stop()
app.is_running = False app.is_running = False
logger.info("Stopped!") logger.info("Stopped!")
return f"Telegram/{message_id}.mp4" return f"Telegram/{message_id}.mp4"