refactoring for new arch, added Redis, fixed filename, added video exists check
This commit is contained in:
		
							
								
								
									
										19
									
								
								docker-compose.yml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										19
									
								
								docker-compose.yml
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,19 @@
 | 
			
		||||
version: "2.1"
 | 
			
		||||
services:
 | 
			
		||||
  rabbitmq:
 | 
			
		||||
    image: rabbitmq:3.10.7-management
 | 
			
		||||
    hostname: rabbitmq
 | 
			
		||||
    restart: always
 | 
			
		||||
    environment:
 | 
			
		||||
      - RABBITMQ_DEFAULT_USER=guest
 | 
			
		||||
      - RABBITMQ_DEFAULT_PASS=guest
 | 
			
		||||
    volumes:
 | 
			
		||||
      - ./rabbitmq:/var/lib/rabbitmq
 | 
			
		||||
    ports:
 | 
			
		||||
      - 15672:15672
 | 
			
		||||
      - 5672:5672
 | 
			
		||||
  redis:
 | 
			
		||||
    container_name: redis_video_downloader
 | 
			
		||||
    image: redis:latest
 | 
			
		||||
    ports:
 | 
			
		||||
      - "6379:6379"
 | 
			
		||||
							
								
								
									
										31
									
								
								poetry.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										31
									
								
								poetry.lock
									
									
									
										generated
									
									
									
								
							@@ -61,6 +61,17 @@ doc = ["Sphinx", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd-
 | 
			
		||||
test = ["anyio[trio]", "coverage[toml] (>=4.5)", "hypothesis (>=4.0)", "mock (>=4)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"]
 | 
			
		||||
trio = ["trio (<0.22)"]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "async-timeout"
 | 
			
		||||
version = "4.0.3"
 | 
			
		||||
description = "Timeout context manager for asyncio programs"
 | 
			
		||||
optional = false
 | 
			
		||||
python-versions = ">=3.7"
 | 
			
		||||
files = [
 | 
			
		||||
    {file = "async-timeout-4.0.3.tar.gz", hash = "sha256:4640d96be84d82d02ed59ea2b7105a0f7b33abe8703703cd0ab0bf87c427522f"},
 | 
			
		||||
    {file = "async_timeout-4.0.3-py3-none-any.whl", hash = "sha256:7405140ff1230c310e51dc27b3145b9092d659ce68ff733fb0cefe3ee42be028"},
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "click"
 | 
			
		||||
version = "8.1.6"
 | 
			
		||||
@@ -463,6 +474,24 @@ files = [
 | 
			
		||||
[package.extras]
 | 
			
		||||
dev = ["atomicwrites (==1.2.1)", "attrs (==19.2.0)", "coverage (==6.5.0)", "hatch", "invoke (==1.7.3)", "more-itertools (==4.3.0)", "pbr (==4.3.0)", "pluggy (==1.0.0)", "py (==1.11.0)", "pytest (==7.2.0)", "pytest-cov (==4.0.0)", "pytest-timeout (==2.1.0)", "pyyaml (==5.1)"]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "redis"
 | 
			
		||||
version = "5.0.0"
 | 
			
		||||
description = "Python client for Redis database and key-value store"
 | 
			
		||||
optional = false
 | 
			
		||||
python-versions = ">=3.7"
 | 
			
		||||
files = [
 | 
			
		||||
    {file = "redis-5.0.0-py3-none-any.whl", hash = "sha256:06570d0b2d84d46c21defc550afbaada381af82f5b83e5b3777600e05d8e2ed0"},
 | 
			
		||||
    {file = "redis-5.0.0.tar.gz", hash = "sha256:5cea6c0d335c9a7332a460ed8729ceabb4d0c489c7285b0a86dbbf8a017bd120"},
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[package.dependencies]
 | 
			
		||||
async-timeout = {version = ">=4.0.2", markers = "python_full_version <= \"3.11.2\""}
 | 
			
		||||
 | 
			
		||||
[package.extras]
 | 
			
		||||
hiredis = ["hiredis (>=1.0.0)"]
 | 
			
		||||
ocsp = ["cryptography (>=36.0.1)", "pyopenssl (==20.0.1)", "requests (>=2.26.0)"]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "setproctitle"
 | 
			
		||||
version = "1.3.2"
 | 
			
		||||
@@ -709,4 +738,4 @@ resolved_reference = "86e3cf5e5849aefcc540c19bb5fa5ab7f470d1c1"
 | 
			
		||||
[metadata]
 | 
			
		||||
lock-version = "2.0"
 | 
			
		||||
python-versions = "^3.11"
 | 
			
		||||
content-hash = "06de05e76cc4835c6fe983c2a8092f0126ddf52c98ca1a498d6e54289adda068"
 | 
			
		||||
content-hash = "efcb5800a420dd066a767d8a7a2765c047e58a76e3472b6cf11dd2e64abb6c8d"
 | 
			
		||||
 
 | 
			
		||||
@@ -15,6 +15,7 @@ python-multipart = "^0.0.6"
 | 
			
		||||
pika = "^1.3.2"
 | 
			
		||||
aio-pika = "^9.2.2"
 | 
			
		||||
setproctitle = "^1.3.2"
 | 
			
		||||
redis = "^5.0.0"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
[build-system]
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										12
									
								
								src/core/async_queue.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										12
									
								
								src/core/async_queue.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,12 @@
 | 
			
		||||
import asyncio
 | 
			
		||||
 | 
			
		||||
from src.core.redis_client import RedisClient
 | 
			
		||||
 | 
			
		||||
redis = RedisClient()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class AsyncQueue(asyncio.Queue):
 | 
			
		||||
 | 
			
		||||
    async def put(self, item):
 | 
			
		||||
        await redis.set_task_to_queue(item)
 | 
			
		||||
        return await super().put(item)
 | 
			
		||||
@@ -1,10 +1,15 @@
 | 
			
		||||
import asyncio
 | 
			
		||||
import concurrent.futures as pool
 | 
			
		||||
import os.path
 | 
			
		||||
import subprocess
 | 
			
		||||
 | 
			
		||||
from functools import partial
 | 
			
		||||
 | 
			
		||||
from fastapi import HTTPException
 | 
			
		||||
 | 
			
		||||
from src.core.async_queue import AsyncQueue
 | 
			
		||||
from src.core.rabbitmq import get_messages
 | 
			
		||||
from src.core.redis_client import RedisClient
 | 
			
		||||
from src.core.result import Result, ResultTypeEnum
 | 
			
		||||
from src.core.ydl import VideoDownloader
 | 
			
		||||
from src.exceptions.download_exceptions import SiteNotImplementedException
 | 
			
		||||
@@ -16,12 +21,16 @@ class MasterService:
 | 
			
		||||
        self.MAX_EXECUTOR_WORKERS = 8
 | 
			
		||||
        self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS,
 | 
			
		||||
                                                 initializer=executor_initializer)
 | 
			
		||||
        self.queue = asyncio.Queue()
 | 
			
		||||
        self.queue = AsyncQueue()
 | 
			
		||||
        self.rabbit_consumer = get_messages
 | 
			
		||||
        self.currently_underway = {}  # contains currently in progress videos
 | 
			
		||||
 | 
			
		||||
    async def run(self):
 | 
			
		||||
        # TODO add cleanup procedure for existing exector procs
 | 
			
		||||
        subprocess.run(
 | 
			
		||||
            "for pid in $(ps -ef | grep video_downloader_executor_process | awk '{print $2}'); do kill -9 $pid; done",
 | 
			
		||||
            shell=True, capture_output=True
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        tasks = [self.loop.create_task(self.create_workers()) for i in range(self.MAX_EXECUTOR_WORKERS + 1)]
 | 
			
		||||
 | 
			
		||||
        await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
 | 
			
		||||
@@ -29,15 +38,17 @@ class MasterService:
 | 
			
		||||
    async def create_workers(self):
 | 
			
		||||
        while True:
 | 
			
		||||
            video_params = await self.queue.get()
 | 
			
		||||
            redis = RedisClient()
 | 
			
		||||
            await redis.del_task_from_queue_and_add_to_tasks(task=video_params)
 | 
			
		||||
            self.currently_underway[video_params['link']] = video_params
 | 
			
		||||
            # TODO save current copy of self.currently_underway to DB\file\etc
 | 
			
		||||
            download_task = self.loop.run_in_executor(self.executor, partial(
 | 
			
		||||
                MasterService.video_processing_executor, video_params=video_params
 | 
			
		||||
                )
 | 
			
		||||
            )
 | 
			
		||||
                                                      )
 | 
			
		||||
            if download_task.done():
 | 
			
		||||
                result = download_task.result()
 | 
			
		||||
            # process result
 | 
			
		||||
 | 
			
		||||
            result = await download_task
 | 
			
		||||
            await redis.del_task_from_tasks_and_add_to_task_done(task=video_params)
 | 
			
		||||
            # TODO process result
 | 
			
		||||
            self.queue.task_done()
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
@@ -46,9 +57,16 @@ class MasterService:
 | 
			
		||||
            "format": video_params["format"],
 | 
			
		||||
            "merge_output_format": video_params["merge_output_format"],
 | 
			
		||||
            'outtmpl': video_params["outtmpl"],
 | 
			
		||||
            "quiet": True
 | 
			
		||||
        }
 | 
			
		||||
        downloader = VideoDownloader(link=video_params["link"], ydl_opts=ydl_opts)
 | 
			
		||||
        video_info = downloader.get_info()
 | 
			
		||||
        if os.path.exists(
 | 
			
		||||
                os.path.join(os.getcwd() + f"Youtube/{video_info['id']}_{video_info['width']}.{video_info['ext']}")
 | 
			
		||||
        ):
 | 
			
		||||
            return Result(result_type=ResultTypeEnum.EXIST)
 | 
			
		||||
        try:
 | 
			
		||||
            downloader.ydl_opts["quiet"] = False
 | 
			
		||||
            result = downloader.download()
 | 
			
		||||
            return result
 | 
			
		||||
        except SiteNotImplementedException as ex:
 | 
			
		||||
@@ -59,7 +77,6 @@ class MasterService:
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def video_processing_executor(video_params: dict):
 | 
			
		||||
        # TODO check if video exists on server etc.  Result(type=ResultType.EXISTS, value=False)
 | 
			
		||||
        try:
 | 
			
		||||
            MasterService.video_download(video_params=video_params)
 | 
			
		||||
            return Result(result_type=ResultTypeEnum.DONE)
 | 
			
		||||
@@ -74,4 +91,3 @@ def executor_initializer():
 | 
			
		||||
    return True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -13,19 +13,15 @@ async def on_message(message: AbstractIncomingMessage, queue) -> None:
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def get_messages(inner_queue) -> None:
 | 
			
		||||
    # Perform connection
 | 
			
		||||
    async with await connect("amqp://guest:guest@localhost/") as connection:
 | 
			
		||||
        # Creating a channel
 | 
			
		||||
        channel = await connection.channel()
 | 
			
		||||
        await channel.set_qos(prefetch_count=1)
 | 
			
		||||
 | 
			
		||||
        # Declaring queue
 | 
			
		||||
        queue = await channel.declare_queue(
 | 
			
		||||
            "hello",
 | 
			
		||||
            durable=True,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # Start listening the queue with name 'task_queue'
 | 
			
		||||
        await queue.consume(partial(on_message, queue=inner_queue))
 | 
			
		||||
 | 
			
		||||
        print(" [*] Waiting for messages. To exit press CTRL+C")
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										55
									
								
								src/core/redis_client.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										55
									
								
								src/core/redis_client.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,55 @@
 | 
			
		||||
import json
 | 
			
		||||
 | 
			
		||||
import redis.asyncio as redis
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class RedisClient:
 | 
			
		||||
    SET_NAME = "queue"
 | 
			
		||||
    TASKS_NAME = "tasks_working"
 | 
			
		||||
    TASKS_DONE_NAME = "tasks_done"
 | 
			
		||||
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self.connection = redis.Redis(host="localhost", port=6379, db=0)
 | 
			
		||||
 | 
			
		||||
    async def _set_task(self, task: dict) -> int:
 | 
			
		||||
        async with self.connection as connection:
 | 
			
		||||
            res = await connection.set(f'{self.TASKS_NAME}:{task["link"]}', json.dumps(task, indent=4).encode('utf-8'))
 | 
			
		||||
        return res
 | 
			
		||||
 | 
			
		||||
    async def _set_task_done(self, task: dict) -> int:
 | 
			
		||||
        async with self.connection as connection:
 | 
			
		||||
            res = await connection.set(
 | 
			
		||||
                f'{self.TASKS_DONE_NAME}:1:{task["link"]}',
 | 
			
		||||
                json.dumps(task, indent=4).encode('utf-8')
 | 
			
		||||
                                       )
 | 
			
		||||
        return res
 | 
			
		||||
 | 
			
		||||
    async def _del_task(self, task: dict) -> int:
 | 
			
		||||
        async with self.connection as connection:
 | 
			
		||||
            res = await connection.delete(f'{self.TASKS_NAME}:{task["link"]}')
 | 
			
		||||
        return res
 | 
			
		||||
 | 
			
		||||
    async def set_task_to_queue(self, task: dict) -> int:
 | 
			
		||||
        async with self.connection as connection:
 | 
			
		||||
            res = await connection.sadd(self.SET_NAME, json.dumps(task, indent=4).encode('utf-8'))
 | 
			
		||||
        return res
 | 
			
		||||
 | 
			
		||||
    async def get_queue(self) -> set:
 | 
			
		||||
        async with self.connection as connection:
 | 
			
		||||
            res = await connection.smembers(self.SET_NAME)
 | 
			
		||||
        return res
 | 
			
		||||
 | 
			
		||||
    async def del_task_from_queue(self, task: dict) -> int:
 | 
			
		||||
        async with self.connection as connection:
 | 
			
		||||
            res = await connection.srem(self.SET_NAME, json.dumps(task, indent=4).encode('utf-8'))
 | 
			
		||||
        return res
 | 
			
		||||
 | 
			
		||||
    async def del_task_from_queue_and_add_to_tasks(self, task: dict) -> int:
 | 
			
		||||
 | 
			
		||||
        await self.del_task_from_queue(task)
 | 
			
		||||
        return await self._set_task(task)
 | 
			
		||||
 | 
			
		||||
    async def del_task_from_tasks_and_add_to_task_done(self, task: dict) -> int:
 | 
			
		||||
        await self._del_task(task)
 | 
			
		||||
        return await self._set_task_done(task)
 | 
			
		||||
 | 
			
		||||
@@ -4,10 +4,11 @@ from enum import Enum
 | 
			
		||||
class ResultTypeEnum(Enum):
 | 
			
		||||
    EXCEPTION = "Error"
 | 
			
		||||
    DONE = "Done"
 | 
			
		||||
    EXIST = "Exist"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Result:
 | 
			
		||||
    def __init__(self, result_type: ResultTypeEnum, value: Exception = None):
 | 
			
		||||
    def __init__(self, result_type: ResultTypeEnum, value: Exception | bool = None):
 | 
			
		||||
        self.result_type = result_type
 | 
			
		||||
        self.value = value
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -24,12 +24,9 @@ class VideoDownloader:
 | 
			
		||||
        self.username = username
 | 
			
		||||
        self.password = password
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def get_unique_video_filename():
 | 
			
		||||
        prefix = f'vid_{datetime.now().strftime("%Y%m%d%H%M%S")}'
 | 
			
		||||
        random_uuid4 = uuid.uuid4().hex[:8]
 | 
			
		||||
        filename = f"{prefix}_{random_uuid4}"
 | 
			
		||||
        return filename
 | 
			
		||||
    def get_info(self):
 | 
			
		||||
        with youtube_dl.YoutubeDL(self.ydl_opts if self.ydl_opts else {}) as ydl:
 | 
			
		||||
            return ydl.extract_info(self.link, download=False)
 | 
			
		||||
 | 
			
		||||
    def download(self):
 | 
			
		||||
        domain = urlparse(self.link).netloc
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user