import asyncio import concurrent.futures as pool import subprocess from functools import partial from fastapi import HTTPException from src.core.async_queue import AsyncQueue from src.core.rabbitmq import get_messages from src.core.redis_client import RedisClient from src.core.result import Result, ResultTypeEnum from src.exceptions.download_exceptions import SiteNotImplementedException from src.parsers.MyMail.my_mail_parser import MyMailParser from src.parsers.Yappy.yappy_parser import YappyParser from src.parsers.base_parser import BaseParser class MasterService: def __init__(self): self.loop = asyncio.get_event_loop() self.MAX_EXECUTOR_WORKERS = 8 self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS, initializer=executor_initializer) self.queue = AsyncQueue() self.rabbit_consumer = get_messages self.currently_underway = {} # contains currently in progress videos async def run(self): subprocess.run( "for pid in $(ps -ef | grep video_downloader_executor_process | awk '{print $2}'); do kill -9 $pid; done", shell=True, capture_output=True ) tasks = [self.loop.create_task(self.create_workers()) for i in range(self.MAX_EXECUTOR_WORKERS + 1)] await asyncio.gather(self.rabbit_consumer(self.queue), *tasks) async def create_workers(self): while True: video_params = await self.queue.get() redis = RedisClient() await redis.del_task_from_queue_and_add_to_tasks(task=video_params) self.currently_underway[video_params['link']] = video_params download_task = self.loop.run_in_executor(self.executor, partial( MasterService.video_processing_executor, video_params=video_params ) ) result = await download_task await redis.del_task_from_tasks_and_add_to_task_done(task=video_params) # TODO process result self.queue.task_done() @staticmethod def video_download(video_params: dict): downloader: BaseParser | YappyParser | MyMailParser = MasterService.get_parser(video_params) try: result = downloader.video_download() return result except SiteNotImplementedException as ex: raise HTTPException( status_code=400, detail=ex.message ) @staticmethod def get_parser(params: dict): parser_mapping = { "MyMailRu": MyMailParser(params), "base": BaseParser(params), "Yappy": YappyParser(params), } return parser_mapping[params["parser"]] @staticmethod def video_processing_executor(video_params: dict): try: MasterService.video_download(video_params=video_params) return Result(result_type=ResultTypeEnum.DONE) except Exception as ex: return Result(result_type=ResultTypeEnum.EXCEPTION, value=ex) # TODO upload to server def executor_initializer(): import setproctitle setproctitle.setproctitle(f'video_downloader_executor_process') return True