refactoring for new arch, minor fixes
This commit is contained in:
		@@ -1,85 +1,77 @@
 | 
			
		||||
import asyncio
 | 
			
		||||
import json
 | 
			
		||||
import concurrent.futures as pool
 | 
			
		||||
import multiprocessing
 | 
			
		||||
from functools import partial
 | 
			
		||||
from multiprocessing import freeze_support
 | 
			
		||||
from collections import deque
 | 
			
		||||
 | 
			
		||||
from fastapi import HTTPException
 | 
			
		||||
from aio_pika import connect
 | 
			
		||||
from aio_pika.abc import AbstractIncomingMessage
 | 
			
		||||
 | 
			
		||||
from src.core.send import body
 | 
			
		||||
from src.core.rabbitmq import get_messages
 | 
			
		||||
from src.core.result import Result, ResultTypeEnum
 | 
			
		||||
from src.core.ydl import VideoDownloader
 | 
			
		||||
from src.exceptions.download_exceptions import SiteNotImplementedException
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def on_message(message: AbstractIncomingMessage, queue) -> None:
 | 
			
		||||
    async with message.process():
 | 
			
		||||
        # print(f" [x] Received message {message!r}")
 | 
			
		||||
        await queue.put(json.loads(message.body))
 | 
			
		||||
        print(f"     Message body is: {message.body!r}")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def get_messages(inner_queue) -> None:
 | 
			
		||||
    # Perform connection
 | 
			
		||||
    connection = await connect("amqp://guest:guest@localhost/")
 | 
			
		||||
 | 
			
		||||
    async with connection:
 | 
			
		||||
        # Creating a channel
 | 
			
		||||
        channel = await connection.channel()
 | 
			
		||||
        await channel.set_qos(prefetch_count=1)
 | 
			
		||||
 | 
			
		||||
        # Declaring queue
 | 
			
		||||
        queue = await channel.declare_queue(
 | 
			
		||||
            "hello",
 | 
			
		||||
            durable=True,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # Start listening the queue with name 'task_queue'
 | 
			
		||||
        await queue.consume(partial(on_message, queue=inner_queue))
 | 
			
		||||
 | 
			
		||||
        print(" [*] Waiting for messages. To exit press CTRL+C")
 | 
			
		||||
        await asyncio.Future()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_url_for_download_video(data: str):
 | 
			
		||||
    file_name = VideoDownloader.get_unique_video_filename()
 | 
			
		||||
    ydl_opts = {
 | 
			
		||||
        "format": data["format"],
 | 
			
		||||
        "merge_output_format": data["merge_output_format"],
 | 
			
		||||
        'outtmpl': data["outtmpl"],
 | 
			
		||||
    }
 | 
			
		||||
    downloader = VideoDownloader(link=data["link"], ydl_opts=ydl_opts)
 | 
			
		||||
    try:
 | 
			
		||||
        result = downloader.download()
 | 
			
		||||
    except SiteNotImplementedException as ex:
 | 
			
		||||
        raise HTTPException(
 | 
			
		||||
            status_code=400,
 | 
			
		||||
            detail=ex.message
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def create_workers(queue):
 | 
			
		||||
    link = await queue.get()
 | 
			
		||||
    get_url_for_download_video(link)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MasterService:
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self.loop = asyncio.get_event_loop()
 | 
			
		||||
        self.MAX_EXECUTOR_WORKERS = 8
 | 
			
		||||
        self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS)
 | 
			
		||||
        self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS,
 | 
			
		||||
                                                 initializer=executor_initializer)
 | 
			
		||||
        self.queue = asyncio.Queue()
 | 
			
		||||
        self.rabbit_consumer = get_messages
 | 
			
		||||
        self.currently_underway = {}  # contains currently in progress videos
 | 
			
		||||
 | 
			
		||||
    async def run(self):
 | 
			
		||||
        loop = asyncio.get_event_loop()
 | 
			
		||||
        tasks = [loop.run_in_executor(self.executor, create_workers, self.queue) for i in range(self.MAX_EXECUTOR_WORKERS)]
 | 
			
		||||
        # TODO add cleanup procedure for existing exector procs
 | 
			
		||||
        tasks = [self.loop.create_task(self.create_workers()) for i in range(self.MAX_EXECUTOR_WORKERS + 1)]
 | 
			
		||||
 | 
			
		||||
        await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
 | 
			
		||||
 | 
			
		||||
    async def create_workers(self):
 | 
			
		||||
        while True:
 | 
			
		||||
            video_params = await self.queue.get()
 | 
			
		||||
            self.currently_underway[video_params['link']] = video_params
 | 
			
		||||
            # TODO save current copy of self.currently_underway to DB\file\etc
 | 
			
		||||
            download_task = self.loop.run_in_executor(self.executor, partial(
 | 
			
		||||
                MasterService.video_processing_executor, video_params=video_params
 | 
			
		||||
            )
 | 
			
		||||
                                                      )
 | 
			
		||||
            if download_task.done():
 | 
			
		||||
                result = download_task.result()
 | 
			
		||||
            # process result
 | 
			
		||||
            self.queue.task_done()
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def video_download(video_params: dict):
 | 
			
		||||
        ydl_opts = {
 | 
			
		||||
            "format": video_params["format"],
 | 
			
		||||
            "merge_output_format": video_params["merge_output_format"],
 | 
			
		||||
            'outtmpl': video_params["outtmpl"],
 | 
			
		||||
        }
 | 
			
		||||
        downloader = VideoDownloader(link=video_params["link"], ydl_opts=ydl_opts)
 | 
			
		||||
        try:
 | 
			
		||||
            result = downloader.download()
 | 
			
		||||
            return result
 | 
			
		||||
        except SiteNotImplementedException as ex:
 | 
			
		||||
            raise HTTPException(
 | 
			
		||||
                status_code=400,
 | 
			
		||||
                detail=ex.message
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def video_processing_executor(video_params: dict):
 | 
			
		||||
        # TODO check if video exists on server etc.  Result(type=ResultType.EXISTS, value=False)
 | 
			
		||||
        try:
 | 
			
		||||
            MasterService.video_download(video_params=video_params)
 | 
			
		||||
            return Result(result_type=ResultTypeEnum.DONE)
 | 
			
		||||
        except Exception as ex:
 | 
			
		||||
            return Result(result_type=ResultTypeEnum.EXCEPTION, value=ex)
 | 
			
		||||
        # TODO upload to server
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def executor_initializer():
 | 
			
		||||
    import setproctitle
 | 
			
		||||
    setproctitle.setproctitle(f'video_downloader_executor_process')
 | 
			
		||||
    return True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
    freeze_support()
 | 
			
		||||
    ms = MasterService()
 | 
			
		||||
    asyncio.run(ms.run())
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										32
									
								
								src/core/rabbitmq.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										32
									
								
								src/core/rabbitmq.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,32 @@
 | 
			
		||||
import asyncio
 | 
			
		||||
import json
 | 
			
		||||
from functools import partial
 | 
			
		||||
 | 
			
		||||
from aio_pika import connect
 | 
			
		||||
from aio_pika.abc import AbstractIncomingMessage
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def on_message(message: AbstractIncomingMessage, queue) -> None:
 | 
			
		||||
    async with message.process():
 | 
			
		||||
        await queue.put(json.loads(message.body))
 | 
			
		||||
        print(f"     Message body is: {message.body!r}")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def get_messages(inner_queue) -> None:
 | 
			
		||||
    # Perform connection
 | 
			
		||||
    async with await connect("amqp://guest:guest@localhost/") as connection:
 | 
			
		||||
        # Creating a channel
 | 
			
		||||
        channel = await connection.channel()
 | 
			
		||||
        await channel.set_qos(prefetch_count=1)
 | 
			
		||||
 | 
			
		||||
        # Declaring queue
 | 
			
		||||
        queue = await channel.declare_queue(
 | 
			
		||||
            "hello",
 | 
			
		||||
            durable=True,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # Start listening the queue with name 'task_queue'
 | 
			
		||||
        await queue.consume(partial(on_message, queue=inner_queue))
 | 
			
		||||
 | 
			
		||||
        print(" [*] Waiting for messages. To exit press CTRL+C")
 | 
			
		||||
        await asyncio.Future()
 | 
			
		||||
							
								
								
									
										15
									
								
								src/core/result.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										15
									
								
								src/core/result.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,15 @@
 | 
			
		||||
from enum import Enum
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ResultTypeEnum(Enum):
 | 
			
		||||
    EXCEPTION = "Error"
 | 
			
		||||
    DONE = "Done"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Result:
 | 
			
		||||
    def __init__(self, result_type: ResultTypeEnum, value: Exception = None):
 | 
			
		||||
        self.result_type = result_type
 | 
			
		||||
        self.value = value
 | 
			
		||||
 | 
			
		||||
    def __repr__(self):
 | 
			
		||||
        return f'Result: {self.result_type.value}. Traceback: {self.value if self.value else None}'
 | 
			
		||||
		Reference in New Issue
	
	Block a user