From 8b6fef80a0eabe4ae6d606502fd3d24aaf716cc2 Mon Sep 17 00:00:00 2001 From: Dantenerosas Date: Sun, 27 Aug 2023 16:27:28 +0300 Subject: [PATCH] refactoring for new arch, added Redis, fixed filename, added video exists check --- docker-compose.yml | 19 +++++++++++++ poetry.lock | 31 ++++++++++++++++++++- pyproject.toml | 1 + src/core/async_queue.py | 12 +++++++++ src/core/master_service.py | 34 ++++++++++++++++------- src/core/rabbitmq.py | 4 --- src/core/redis_client.py | 55 ++++++++++++++++++++++++++++++++++++++ src/core/result.py | 3 ++- src/core/ydl.py | 9 +++---- 9 files changed, 147 insertions(+), 21 deletions(-) create mode 100644 docker-compose.yml create mode 100644 src/core/async_queue.py create mode 100644 src/core/redis_client.py diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..ff16034 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,19 @@ +version: "2.1" +services: + rabbitmq: + image: rabbitmq:3.10.7-management + hostname: rabbitmq + restart: always + environment: + - RABBITMQ_DEFAULT_USER=guest + - RABBITMQ_DEFAULT_PASS=guest + volumes: + - ./rabbitmq:/var/lib/rabbitmq + ports: + - 15672:15672 + - 5672:5672 + redis: + container_name: redis_video_downloader + image: redis:latest + ports: + - "6379:6379" diff --git a/poetry.lock b/poetry.lock index 52c048a..2f7101c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -61,6 +61,17 @@ doc = ["Sphinx", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd- test = ["anyio[trio]", "coverage[toml] (>=4.5)", "hypothesis (>=4.0)", "mock (>=4)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"] trio = ["trio (<0.22)"] +[[package]] +name = "async-timeout" +version = "4.0.3" +description = "Timeout context manager for asyncio programs" +optional = false +python-versions = ">=3.7" +files = [ + {file = "async-timeout-4.0.3.tar.gz", hash = "sha256:4640d96be84d82d02ed59ea2b7105a0f7b33abe8703703cd0ab0bf87c427522f"}, + {file = "async_timeout-4.0.3-py3-none-any.whl", hash = "sha256:7405140ff1230c310e51dc27b3145b9092d659ce68ff733fb0cefe3ee42be028"}, +] + [[package]] name = "click" version = "8.1.6" @@ -463,6 +474,24 @@ files = [ [package.extras] dev = ["atomicwrites (==1.2.1)", "attrs (==19.2.0)", "coverage (==6.5.0)", "hatch", "invoke (==1.7.3)", "more-itertools (==4.3.0)", "pbr (==4.3.0)", "pluggy (==1.0.0)", "py (==1.11.0)", "pytest (==7.2.0)", "pytest-cov (==4.0.0)", "pytest-timeout (==2.1.0)", "pyyaml (==5.1)"] +[[package]] +name = "redis" +version = "5.0.0" +description = "Python client for Redis database and key-value store" +optional = false +python-versions = ">=3.7" +files = [ + {file = "redis-5.0.0-py3-none-any.whl", hash = "sha256:06570d0b2d84d46c21defc550afbaada381af82f5b83e5b3777600e05d8e2ed0"}, + {file = "redis-5.0.0.tar.gz", hash = "sha256:5cea6c0d335c9a7332a460ed8729ceabb4d0c489c7285b0a86dbbf8a017bd120"}, +] + +[package.dependencies] +async-timeout = {version = ">=4.0.2", markers = "python_full_version <= \"3.11.2\""} + +[package.extras] +hiredis = ["hiredis (>=1.0.0)"] +ocsp = ["cryptography (>=36.0.1)", "pyopenssl (==20.0.1)", "requests (>=2.26.0)"] + [[package]] name = "setproctitle" version = "1.3.2" @@ -709,4 +738,4 @@ resolved_reference = "86e3cf5e5849aefcc540c19bb5fa5ab7f470d1c1" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "06de05e76cc4835c6fe983c2a8092f0126ddf52c98ca1a498d6e54289adda068" +content-hash = "efcb5800a420dd066a767d8a7a2765c047e58a76e3472b6cf11dd2e64abb6c8d" diff --git a/pyproject.toml b/pyproject.toml index 74d541a..157e224 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ python-multipart = "^0.0.6" pika = "^1.3.2" aio-pika = "^9.2.2" setproctitle = "^1.3.2" +redis = "^5.0.0" [build-system] diff --git a/src/core/async_queue.py b/src/core/async_queue.py new file mode 100644 index 0000000..e90f47b --- /dev/null +++ b/src/core/async_queue.py @@ -0,0 +1,12 @@ +import asyncio + +from src.core.redis_client import RedisClient + +redis = RedisClient() + + +class AsyncQueue(asyncio.Queue): + + async def put(self, item): + await redis.set_task_to_queue(item) + return await super().put(item) diff --git a/src/core/master_service.py b/src/core/master_service.py index 2eb4c5e..379cb07 100644 --- a/src/core/master_service.py +++ b/src/core/master_service.py @@ -1,10 +1,15 @@ import asyncio import concurrent.futures as pool +import os.path +import subprocess + from functools import partial from fastapi import HTTPException +from src.core.async_queue import AsyncQueue from src.core.rabbitmq import get_messages +from src.core.redis_client import RedisClient from src.core.result import Result, ResultTypeEnum from src.core.ydl import VideoDownloader from src.exceptions.download_exceptions import SiteNotImplementedException @@ -16,12 +21,16 @@ class MasterService: self.MAX_EXECUTOR_WORKERS = 8 self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS, initializer=executor_initializer) - self.queue = asyncio.Queue() + self.queue = AsyncQueue() self.rabbit_consumer = get_messages self.currently_underway = {} # contains currently in progress videos async def run(self): - # TODO add cleanup procedure for existing exector procs + subprocess.run( + "for pid in $(ps -ef | grep video_downloader_executor_process | awk '{print $2}'); do kill -9 $pid; done", + shell=True, capture_output=True + ) + tasks = [self.loop.create_task(self.create_workers()) for i in range(self.MAX_EXECUTOR_WORKERS + 1)] await asyncio.gather(self.rabbit_consumer(self.queue), *tasks) @@ -29,15 +38,17 @@ class MasterService: async def create_workers(self): while True: video_params = await self.queue.get() + redis = RedisClient() + await redis.del_task_from_queue_and_add_to_tasks(task=video_params) self.currently_underway[video_params['link']] = video_params - # TODO save current copy of self.currently_underway to DB\file\etc download_task = self.loop.run_in_executor(self.executor, partial( MasterService.video_processing_executor, video_params=video_params + ) ) - ) - if download_task.done(): - result = download_task.result() - # process result + + result = await download_task + await redis.del_task_from_tasks_and_add_to_task_done(task=video_params) + # TODO process result self.queue.task_done() @staticmethod @@ -46,9 +57,16 @@ class MasterService: "format": video_params["format"], "merge_output_format": video_params["merge_output_format"], 'outtmpl': video_params["outtmpl"], + "quiet": True } downloader = VideoDownloader(link=video_params["link"], ydl_opts=ydl_opts) + video_info = downloader.get_info() + if os.path.exists( + os.path.join(os.getcwd() + f"Youtube/{video_info['id']}_{video_info['width']}.{video_info['ext']}") + ): + return Result(result_type=ResultTypeEnum.EXIST) try: + downloader.ydl_opts["quiet"] = False result = downloader.download() return result except SiteNotImplementedException as ex: @@ -59,7 +77,6 @@ class MasterService: @staticmethod def video_processing_executor(video_params: dict): - # TODO check if video exists on server etc. Result(type=ResultType.EXISTS, value=False) try: MasterService.video_download(video_params=video_params) return Result(result_type=ResultTypeEnum.DONE) @@ -74,4 +91,3 @@ def executor_initializer(): return True - diff --git a/src/core/rabbitmq.py b/src/core/rabbitmq.py index 93ee577..8a1a0bc 100644 --- a/src/core/rabbitmq.py +++ b/src/core/rabbitmq.py @@ -13,19 +13,15 @@ async def on_message(message: AbstractIncomingMessage, queue) -> None: async def get_messages(inner_queue) -> None: - # Perform connection async with await connect("amqp://guest:guest@localhost/") as connection: - # Creating a channel channel = await connection.channel() await channel.set_qos(prefetch_count=1) - # Declaring queue queue = await channel.declare_queue( "hello", durable=True, ) - # Start listening the queue with name 'task_queue' await queue.consume(partial(on_message, queue=inner_queue)) print(" [*] Waiting for messages. To exit press CTRL+C") diff --git a/src/core/redis_client.py b/src/core/redis_client.py new file mode 100644 index 0000000..7666286 --- /dev/null +++ b/src/core/redis_client.py @@ -0,0 +1,55 @@ +import json + +import redis.asyncio as redis + + +class RedisClient: + SET_NAME = "queue" + TASKS_NAME = "tasks_working" + TASKS_DONE_NAME = "tasks_done" + + def __init__(self): + self.connection = redis.Redis(host="localhost", port=6379, db=0) + + async def _set_task(self, task: dict) -> int: + async with self.connection as connection: + res = await connection.set(f'{self.TASKS_NAME}:{task["link"]}', json.dumps(task, indent=4).encode('utf-8')) + return res + + async def _set_task_done(self, task: dict) -> int: + async with self.connection as connection: + res = await connection.set( + f'{self.TASKS_DONE_NAME}:1:{task["link"]}', + json.dumps(task, indent=4).encode('utf-8') + ) + return res + + async def _del_task(self, task: dict) -> int: + async with self.connection as connection: + res = await connection.delete(f'{self.TASKS_NAME}:{task["link"]}') + return res + + async def set_task_to_queue(self, task: dict) -> int: + async with self.connection as connection: + res = await connection.sadd(self.SET_NAME, json.dumps(task, indent=4).encode('utf-8')) + return res + + async def get_queue(self) -> set: + async with self.connection as connection: + res = await connection.smembers(self.SET_NAME) + return res + + async def del_task_from_queue(self, task: dict) -> int: + async with self.connection as connection: + res = await connection.srem(self.SET_NAME, json.dumps(task, indent=4).encode('utf-8')) + return res + + async def del_task_from_queue_and_add_to_tasks(self, task: dict) -> int: + + await self.del_task_from_queue(task) + return await self._set_task(task) + + async def del_task_from_tasks_and_add_to_task_done(self, task: dict) -> int: + await self._del_task(task) + return await self._set_task_done(task) + diff --git a/src/core/result.py b/src/core/result.py index afc7f33..4a80747 100644 --- a/src/core/result.py +++ b/src/core/result.py @@ -4,10 +4,11 @@ from enum import Enum class ResultTypeEnum(Enum): EXCEPTION = "Error" DONE = "Done" + EXIST = "Exist" class Result: - def __init__(self, result_type: ResultTypeEnum, value: Exception = None): + def __init__(self, result_type: ResultTypeEnum, value: Exception | bool = None): self.result_type = result_type self.value = value diff --git a/src/core/ydl.py b/src/core/ydl.py index 776c402..cc53688 100644 --- a/src/core/ydl.py +++ b/src/core/ydl.py @@ -24,12 +24,9 @@ class VideoDownloader: self.username = username self.password = password - @staticmethod - def get_unique_video_filename(): - prefix = f'vid_{datetime.now().strftime("%Y%m%d%H%M%S")}' - random_uuid4 = uuid.uuid4().hex[:8] - filename = f"{prefix}_{random_uuid4}" - return filename + def get_info(self): + with youtube_dl.YoutubeDL(self.ydl_opts if self.ydl_opts else {}) as ydl: + return ydl.extract_info(self.link, download=False) def download(self): domain = urlparse(self.link).netloc