refactoring master service
This commit is contained in:
		@@ -1,6 +1,8 @@
 | 
				
			|||||||
import asyncio
 | 
					import asyncio
 | 
				
			||||||
import json
 | 
					import json
 | 
				
			||||||
import concurrent.futures as pool
 | 
					import concurrent.futures as pool
 | 
				
			||||||
 | 
					import multiprocessing
 | 
				
			||||||
 | 
					from functools import partial
 | 
				
			||||||
from multiprocessing import freeze_support
 | 
					from multiprocessing import freeze_support
 | 
				
			||||||
from collections import deque
 | 
					from collections import deque
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -13,17 +15,14 @@ from src.core.ydl import VideoDownloader
 | 
				
			|||||||
from src.exceptions.download_exceptions import SiteNotImplementedException
 | 
					from src.exceptions.download_exceptions import SiteNotImplementedException
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
as_queue = asyncio.Queue()
 | 
					async def on_message(message: AbstractIncomingMessage, queue) -> None:
 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
async def on_message(message: AbstractIncomingMessage) -> None:
 | 
					 | 
				
			||||||
    async with message.process():
 | 
					    async with message.process():
 | 
				
			||||||
        # print(f" [x] Received message {message!r}")
 | 
					        # print(f" [x] Received message {message!r}")
 | 
				
			||||||
        await as_queue.put(json.loads(message.body))
 | 
					        await queue.put(json.loads(message.body))
 | 
				
			||||||
        print(f"     Message body is: {message.body!r}")
 | 
					        print(f"     Message body is: {message.body!r}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def get_messages() -> None:
 | 
					async def get_messages(inner_queue) -> None:
 | 
				
			||||||
    # Perform connection
 | 
					    # Perform connection
 | 
				
			||||||
    connection = await connect("amqp://guest:guest@localhost/")
 | 
					    connection = await connect("amqp://guest:guest@localhost/")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -39,7 +38,7 @@ async def get_messages() -> None:
 | 
				
			|||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Start listening the queue with name 'task_queue'
 | 
					        # Start listening the queue with name 'task_queue'
 | 
				
			||||||
        await queue.consume(on_message)
 | 
					        await queue.consume(partial(on_message, queue=inner_queue))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        print(" [*] Waiting for messages. To exit press CTRL+C")
 | 
					        print(" [*] Waiting for messages. To exit press CTRL+C")
 | 
				
			||||||
        await asyncio.Future()
 | 
					        await asyncio.Future()
 | 
				
			||||||
@@ -63,18 +62,24 @@ def get_url_for_download_video(data: str):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def create_workers(queue):
 | 
					async def create_workers(queue):
 | 
				
			||||||
    loop = asyncio.get_event_loop()
 | 
					    link = await queue.get()
 | 
				
			||||||
    while True:
 | 
					    get_url_for_download_video(link)
 | 
				
			||||||
        link = await queue.get()
 | 
					 | 
				
			||||||
        with pool.ProcessPoolExecutor(max_workers=8) as executor:
 | 
					 | 
				
			||||||
            tasks = loop.run_in_executor(executor, get_url_for_download_video, link)
 | 
					 | 
				
			||||||
        queue.task_done()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def main():
 | 
					class MasterService:
 | 
				
			||||||
    await asyncio.gather(get_messages(), create_workers(as_queue))
 | 
					    def __init__(self):
 | 
				
			||||||
 | 
					        self.MAX_EXECUTOR_WORKERS = 8
 | 
				
			||||||
 | 
					        self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS)
 | 
				
			||||||
 | 
					        self.queue = asyncio.Queue()
 | 
				
			||||||
 | 
					        self.rabbit_consumer = get_messages
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async def run(self):
 | 
				
			||||||
 | 
					        loop = asyncio.get_event_loop()
 | 
				
			||||||
 | 
					        tasks = [loop.run_in_executor(self.executor, create_workers, self.queue) for i in range(self.MAX_EXECUTOR_WORKERS)]
 | 
				
			||||||
 | 
					        await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
if __name__ == '__main__':
 | 
					if __name__ == '__main__':
 | 
				
			||||||
    freeze_support()
 | 
					    freeze_support()
 | 
				
			||||||
    asyncio.run(main())
 | 
					    ms = MasterService()
 | 
				
			||||||
 | 
					    asyncio.run(ms.run())
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user