import asyncio import json import concurrent.futures as pool import multiprocessing from functools import partial from multiprocessing import freeze_support from collections import deque from fastapi import HTTPException from aio_pika import connect from aio_pika.abc import AbstractIncomingMessage from src.core.send import body from src.core.ydl import VideoDownloader from src.exceptions.download_exceptions import SiteNotImplementedException async def on_message(message: AbstractIncomingMessage, queue) -> None: async with message.process(): # print(f" [x] Received message {message!r}") await queue.put(json.loads(message.body)) print(f" Message body is: {message.body!r}") async def get_messages(inner_queue) -> None: # Perform connection connection = await connect("amqp://guest:guest@localhost/") async with connection: # Creating a channel channel = await connection.channel() await channel.set_qos(prefetch_count=1) # Declaring queue queue = await channel.declare_queue( "hello", durable=True, ) # Start listening the queue with name 'task_queue' await queue.consume(partial(on_message, queue=inner_queue)) print(" [*] Waiting for messages. To exit press CTRL+C") await asyncio.Future() def get_url_for_download_video(data: str): file_name = VideoDownloader.get_unique_video_filename() ydl_opts = { "format": data["format"], "merge_output_format": data["merge_output_format"], 'outtmpl': data["outtmpl"], } downloader = VideoDownloader(link=data["link"], ydl_opts=ydl_opts) try: result = downloader.download() except SiteNotImplementedException as ex: raise HTTPException( status_code=400, detail=ex.message ) async def create_workers(queue): link = await queue.get() get_url_for_download_video(link) class MasterService: def __init__(self): self.MAX_EXECUTOR_WORKERS = 8 self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS) self.queue = asyncio.Queue() self.rabbit_consumer = get_messages async def run(self): loop = asyncio.get_event_loop() tasks = [loop.run_in_executor(self.executor, create_workers, self.queue) for i in range(self.MAX_EXECUTOR_WORKERS)] await asyncio.gather(self.rabbit_consumer(self.queue), *tasks) if __name__ == '__main__': freeze_support() ms = MasterService() asyncio.run(ms.run())