diff --git a/src/core/master_service.py b/src/core/master_service.py index bebfe56..8ad9e37 100644 --- a/src/core/master_service.py +++ b/src/core/master_service.py @@ -1,6 +1,8 @@ import asyncio import json import concurrent.futures as pool +import multiprocessing +from functools import partial from multiprocessing import freeze_support from collections import deque @@ -13,17 +15,14 @@ from src.core.ydl import VideoDownloader from src.exceptions.download_exceptions import SiteNotImplementedException -as_queue = asyncio.Queue() - - -async def on_message(message: AbstractIncomingMessage) -> None: +async def on_message(message: AbstractIncomingMessage, queue) -> None: async with message.process(): # print(f" [x] Received message {message!r}") - await as_queue.put(json.loads(message.body)) + await queue.put(json.loads(message.body)) print(f" Message body is: {message.body!r}") -async def get_messages() -> None: +async def get_messages(inner_queue) -> None: # Perform connection connection = await connect("amqp://guest:guest@localhost/") @@ -39,7 +38,7 @@ async def get_messages() -> None: ) # Start listening the queue with name 'task_queue' - await queue.consume(on_message) + await queue.consume(partial(on_message, queue=inner_queue)) print(" [*] Waiting for messages. To exit press CTRL+C") await asyncio.Future() @@ -63,18 +62,24 @@ def get_url_for_download_video(data: str): async def create_workers(queue): - loop = asyncio.get_event_loop() - while True: - link = await queue.get() - with pool.ProcessPoolExecutor(max_workers=8) as executor: - tasks = loop.run_in_executor(executor, get_url_for_download_video, link) - queue.task_done() + link = await queue.get() + get_url_for_download_video(link) -async def main(): - await asyncio.gather(get_messages(), create_workers(as_queue)) +class MasterService: + def __init__(self): + self.MAX_EXECUTOR_WORKERS = 8 + self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS) + self.queue = asyncio.Queue() + self.rabbit_consumer = get_messages + + async def run(self): + loop = asyncio.get_event_loop() + tasks = [loop.run_in_executor(self.executor, create_workers, self.queue) for i in range(self.MAX_EXECUTOR_WORKERS)] + await asyncio.gather(self.rabbit_consumer(self.queue), *tasks) if __name__ == '__main__': freeze_support() - asyncio.run(main()) \ No newline at end of file + ms = MasterService() + asyncio.run(ms.run())