import asyncio import concurrent.futures as pool import os.path import subprocess from functools import partial from fastapi import HTTPException from src.core.async_queue import AsyncQueue from src.core.rabbitmq import get_messages from src.core.redis_client import RedisClient from src.core.result import Result, ResultTypeEnum from src.core.ydl import VideoDownloader from src.exceptions.download_exceptions import SiteNotImplementedException class MasterService: def __init__(self): self.loop = asyncio.get_event_loop() self.MAX_EXECUTOR_WORKERS = 8 self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS, initializer=executor_initializer) self.queue = AsyncQueue() self.rabbit_consumer = get_messages self.currently_underway = {} # contains currently in progress videos async def run(self): subprocess.run( "for pid in $(ps -ef | grep video_downloader_executor_process | awk '{print $2}'); do kill -9 $pid; done", shell=True, capture_output=True ) tasks = [self.loop.create_task(self.create_workers()) for i in range(self.MAX_EXECUTOR_WORKERS + 1)] await asyncio.gather(self.rabbit_consumer(self.queue), *tasks) async def create_workers(self): while True: video_params = await self.queue.get() redis = RedisClient() await redis.del_task_from_queue_and_add_to_tasks(task=video_params) self.currently_underway[video_params['link']] = video_params download_task = self.loop.run_in_executor(self.executor, partial( MasterService.video_processing_executor, video_params=video_params ) ) result = await download_task await redis.del_task_from_tasks_and_add_to_task_done(task=video_params) # TODO process result self.queue.task_done() @staticmethod def video_download(video_params: dict): ydl_opts = { "format": video_params["format"], "merge_output_format": video_params["merge_output_format"], 'outtmpl': video_params["outtmpl"], "quiet": True } downloader = VideoDownloader(link=video_params["link"], ydl_opts=ydl_opts) video_info = downloader.get_info() if os.path.exists( os.path.join(os.getcwd() + f"Youtube/{video_info['id']}_{video_info['width']}.{video_info['ext']}") ): return Result(result_type=ResultTypeEnum.EXIST) try: downloader.ydl_opts["quiet"] = False result = downloader.download() return result except SiteNotImplementedException as ex: raise HTTPException( status_code=400, detail=ex.message ) @staticmethod def video_processing_executor(video_params: dict): try: MasterService.video_download(video_params=video_params) return Result(result_type=ResultTypeEnum.DONE) except Exception as ex: return Result(result_type=ResultTypeEnum.EXCEPTION, value=ex) # TODO upload to server def executor_initializer(): import setproctitle setproctitle.setproctitle(f'video_downloader_executor_process') return True