refactoring master service
This commit is contained in:
parent
e909c80178
commit
338d2c58a1
@ -1,6 +1,8 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import concurrent.futures as pool
|
import concurrent.futures as pool
|
||||||
|
import multiprocessing
|
||||||
|
from functools import partial
|
||||||
from multiprocessing import freeze_support
|
from multiprocessing import freeze_support
|
||||||
from collections import deque
|
from collections import deque
|
||||||
|
|
||||||
@ -13,17 +15,14 @@ from src.core.ydl import VideoDownloader
|
|||||||
from src.exceptions.download_exceptions import SiteNotImplementedException
|
from src.exceptions.download_exceptions import SiteNotImplementedException
|
||||||
|
|
||||||
|
|
||||||
as_queue = asyncio.Queue()
|
async def on_message(message: AbstractIncomingMessage, queue) -> None:
|
||||||
|
|
||||||
|
|
||||||
async def on_message(message: AbstractIncomingMessage) -> None:
|
|
||||||
async with message.process():
|
async with message.process():
|
||||||
# print(f" [x] Received message {message!r}")
|
# print(f" [x] Received message {message!r}")
|
||||||
await as_queue.put(json.loads(message.body))
|
await queue.put(json.loads(message.body))
|
||||||
print(f" Message body is: {message.body!r}")
|
print(f" Message body is: {message.body!r}")
|
||||||
|
|
||||||
|
|
||||||
async def get_messages() -> None:
|
async def get_messages(inner_queue) -> None:
|
||||||
# Perform connection
|
# Perform connection
|
||||||
connection = await connect("amqp://guest:guest@localhost/")
|
connection = await connect("amqp://guest:guest@localhost/")
|
||||||
|
|
||||||
@ -39,7 +38,7 @@ async def get_messages() -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Start listening the queue with name 'task_queue'
|
# Start listening the queue with name 'task_queue'
|
||||||
await queue.consume(on_message)
|
await queue.consume(partial(on_message, queue=inner_queue))
|
||||||
|
|
||||||
print(" [*] Waiting for messages. To exit press CTRL+C")
|
print(" [*] Waiting for messages. To exit press CTRL+C")
|
||||||
await asyncio.Future()
|
await asyncio.Future()
|
||||||
@ -63,18 +62,24 @@ def get_url_for_download_video(data: str):
|
|||||||
|
|
||||||
|
|
||||||
async def create_workers(queue):
|
async def create_workers(queue):
|
||||||
loop = asyncio.get_event_loop()
|
link = await queue.get()
|
||||||
while True:
|
get_url_for_download_video(link)
|
||||||
link = await queue.get()
|
|
||||||
with pool.ProcessPoolExecutor(max_workers=8) as executor:
|
|
||||||
tasks = loop.run_in_executor(executor, get_url_for_download_video, link)
|
|
||||||
queue.task_done()
|
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
class MasterService:
|
||||||
await asyncio.gather(get_messages(), create_workers(as_queue))
|
def __init__(self):
|
||||||
|
self.MAX_EXECUTOR_WORKERS = 8
|
||||||
|
self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS)
|
||||||
|
self.queue = asyncio.Queue()
|
||||||
|
self.rabbit_consumer = get_messages
|
||||||
|
|
||||||
|
async def run(self):
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
tasks = [loop.run_in_executor(self.executor, create_workers, self.queue) for i in range(self.MAX_EXECUTOR_WORKERS)]
|
||||||
|
await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
freeze_support()
|
freeze_support()
|
||||||
asyncio.run(main())
|
ms = MasterService()
|
||||||
|
asyncio.run(ms.run())
|
||||||
|
Loading…
Reference in New Issue
Block a user