import asyncio import json import concurrent.futures as pool from multiprocessing import freeze_support from collections import deque from fastapi import HTTPException from aio_pika import connect from aio_pika.abc import AbstractIncomingMessage from src.core.send import body from src.core.ydl import VideoDownloader from src.exceptions.download_exceptions import SiteNotImplementedException as_queue = asyncio.Queue() async def on_message(message: AbstractIncomingMessage) -> None: async with message.process(): # print(f" [x] Received message {message!r}") await as_queue.put(json.loads(message.body)) print(f" Message body is: {message.body!r}") async def get_messages() -> None: # Perform connection connection = await connect("amqp://guest:guest@localhost/") async with connection: # Creating a channel channel = await connection.channel() await channel.set_qos(prefetch_count=1) # Declaring queue queue = await channel.declare_queue( "hello", durable=True, ) # Start listening the queue with name 'task_queue' await queue.consume(on_message) print(" [*] Waiting for messages. To exit press CTRL+C") await asyncio.Future() def get_url_for_download_video(data: str): file_name = VideoDownloader.get_unique_video_filename() ydl_opts = { "format": data["format"], "merge_output_format": data["merge_output_format"], 'outtmpl': data["outtmpl"], } downloader = VideoDownloader(link=data["link"], ydl_opts=ydl_opts) try: result = downloader.download() except SiteNotImplementedException as ex: raise HTTPException( status_code=400, detail=ex.message ) async def create_workers(queue): loop = asyncio.get_event_loop() while True: link = await queue.get() with pool.ProcessPoolExecutor(max_workers=8) as executor: tasks = loop.run_in_executor(executor, get_url_for_download_video, link) queue.task_done() async def main(): await asyncio.gather(get_messages(), create_workers(as_queue)) if __name__ == '__main__': freeze_support() asyncio.run(main())