import asyncio import concurrent.futures as pool import subprocess from functools import partial import traceback from urllib.parse import urlparse from src.core.async_queue import AsyncQueue from src.core.rabbitmq import get_messages, publish_message_with_task_done from src.core.redis_client import RedisClient from src.core.result import Result, ResultTypeEnum from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException from src.parsers.MyMail.my_mail_parser import MyMailParser from src.parsers.Okru.ok_parser import OkParser from src.parsers.Yappy.yappy_parser import YappyParser from src.parsers.base_parser import BaseParser # TODO: добавить логгер с временными метками в yt-dlp class MasterService: def __init__(self): self.loop = asyncio.get_event_loop() self.MAX_EXECUTOR_WORKERS = 8 self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS, initializer=executor_initializer) self.queue = AsyncQueue() self.rabbit_consumer = get_messages self.currently_underway = {} # contains currently in progress videos async def run(self): subprocess.run( "for pid in $(ps -ef | grep video_downloader_executor_process | awk '{print $2}'); do kill -9 $pid; done", shell=True, capture_output=True ) tasks = [self.loop.create_task(self.create_workers()) for i in range(self.MAX_EXECUTOR_WORKERS + 1)] await asyncio.gather(self.rabbit_consumer(self.queue), *tasks) async def create_workers(self): while True: video_params = await self.queue.get() redis = RedisClient() await redis.del_tasks_queue() await redis.del_task_from_queue_and_add_to_tasks(task=video_params) self.currently_underway[video_params['link']] = video_params download_task = self.loop.run_in_executor(self.executor, partial( MasterService.video_processing_executor, video_params=video_params )) result: Result = await download_task if result.result_type in [ResultTypeEnum.DONE, ResultTypeEnum.EXIST]: await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params) await publish_message_with_task_done(task=result.value) self.queue.task_done() else: error_message = { "link": video_params["link"], "result": result.value, "status": "error" } await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params) await publish_message_with_task_done(task=error_message) if video_params['link'] in self.currently_underway: del self.currently_underway[video_params['link']] @staticmethod def video_download(video_params: dict): downloader: BaseParser | YappyParser | MyMailParser = MasterService.get_parser(video_params) result = downloader.video_download() return result @staticmethod def get_parser(params: dict): try: domain = urlparse(params["link"]).netloc # TODO: похоже нужно переделать на регулярки, т.к. добавлять каждую вариацию домена моветон, вероятно я сделаю parser_mapping = { "my.mail.ru": MyMailParser(params), "www.youtube.com": BaseParser(params), "youtube.com": BaseParser(params), "youtu.be": BaseParser(params), "vk.com": BaseParser(params), "ok.ru": BaseParser(params) if "topic" not in params["link"] else OkParser(params), "likee.video": BaseParser(params), "dzen.ru": BaseParser(params), "yappy.media": YappyParser(params), "yandex.ru": BaseParser(params), } return parser_mapping[domain] except KeyError: raise SiteNotImplementedException @staticmethod def video_processing_executor(video_params: dict): try: result = MasterService.video_download(video_params=video_params) return Result(result_type=ResultTypeEnum.DONE, value={ "link": video_params["link"], "result": result, "status": "done" }) except FileAlreadyExistException as ex: return Result(result_type=ResultTypeEnum.EXIST, value={ "link": video_params["link"], "result": ex.message, "status": "exist" }) except SiteNotImplementedException as ex: return Result(result_type=ResultTypeEnum.EXCEPTION, value={ "link": video_params["link"], "result": ex.default_message, "status": "error" }) except Exception as ex: Result(result_type=ResultTypeEnum.EXCEPTION, value={ "link": video_params["link"], "result": traceback.format_exc(), "status": "error" }) return Result(result_type=ResultTypeEnum.EXCEPTION, value={ "link": video_params["link"], "result": "Unknown exception encountered", "status": "error" }) # TODO upload to server def executor_initializer(): import setproctitle setproctitle.setproctitle(f'video_downloader_executor_process') return True