refactoring for new arch, added Redis, fixed filename, added video exists check

This commit is contained in:
garickbadalov 2023-08-27 16:27:28 +03:00
parent 05a7d7396a
commit 1a479db726
9 changed files with 147 additions and 21 deletions

19
docker-compose.yml Normal file
View File

@ -0,0 +1,19 @@
version: "2.1"
services:
rabbitmq:
image: rabbitmq:3.10.7-management
hostname: rabbitmq
restart: always
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
volumes:
- ./rabbitmq:/var/lib/rabbitmq
ports:
- 15672:15672
- 5672:5672
redis:
container_name: redis_video_downloader
image: redis:latest
ports:
- "6379:6379"

31
poetry.lock generated
View File

@ -61,6 +61,17 @@ doc = ["Sphinx", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd-
test = ["anyio[trio]", "coverage[toml] (>=4.5)", "hypothesis (>=4.0)", "mock (>=4)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"] test = ["anyio[trio]", "coverage[toml] (>=4.5)", "hypothesis (>=4.0)", "mock (>=4)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"]
trio = ["trio (<0.22)"] trio = ["trio (<0.22)"]
[[package]]
name = "async-timeout"
version = "4.0.3"
description = "Timeout context manager for asyncio programs"
optional = false
python-versions = ">=3.7"
files = [
{file = "async-timeout-4.0.3.tar.gz", hash = "sha256:4640d96be84d82d02ed59ea2b7105a0f7b33abe8703703cd0ab0bf87c427522f"},
{file = "async_timeout-4.0.3-py3-none-any.whl", hash = "sha256:7405140ff1230c310e51dc27b3145b9092d659ce68ff733fb0cefe3ee42be028"},
]
[[package]] [[package]]
name = "click" name = "click"
version = "8.1.6" version = "8.1.6"
@ -463,6 +474,24 @@ files = [
[package.extras] [package.extras]
dev = ["atomicwrites (==1.2.1)", "attrs (==19.2.0)", "coverage (==6.5.0)", "hatch", "invoke (==1.7.3)", "more-itertools (==4.3.0)", "pbr (==4.3.0)", "pluggy (==1.0.0)", "py (==1.11.0)", "pytest (==7.2.0)", "pytest-cov (==4.0.0)", "pytest-timeout (==2.1.0)", "pyyaml (==5.1)"] dev = ["atomicwrites (==1.2.1)", "attrs (==19.2.0)", "coverage (==6.5.0)", "hatch", "invoke (==1.7.3)", "more-itertools (==4.3.0)", "pbr (==4.3.0)", "pluggy (==1.0.0)", "py (==1.11.0)", "pytest (==7.2.0)", "pytest-cov (==4.0.0)", "pytest-timeout (==2.1.0)", "pyyaml (==5.1)"]
[[package]]
name = "redis"
version = "5.0.0"
description = "Python client for Redis database and key-value store"
optional = false
python-versions = ">=3.7"
files = [
{file = "redis-5.0.0-py3-none-any.whl", hash = "sha256:06570d0b2d84d46c21defc550afbaada381af82f5b83e5b3777600e05d8e2ed0"},
{file = "redis-5.0.0.tar.gz", hash = "sha256:5cea6c0d335c9a7332a460ed8729ceabb4d0c489c7285b0a86dbbf8a017bd120"},
]
[package.dependencies]
async-timeout = {version = ">=4.0.2", markers = "python_full_version <= \"3.11.2\""}
[package.extras]
hiredis = ["hiredis (>=1.0.0)"]
ocsp = ["cryptography (>=36.0.1)", "pyopenssl (==20.0.1)", "requests (>=2.26.0)"]
[[package]] [[package]]
name = "setproctitle" name = "setproctitle"
version = "1.3.2" version = "1.3.2"
@ -709,4 +738,4 @@ resolved_reference = "86e3cf5e5849aefcc540c19bb5fa5ab7f470d1c1"
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.11" python-versions = "^3.11"
content-hash = "06de05e76cc4835c6fe983c2a8092f0126ddf52c98ca1a498d6e54289adda068" content-hash = "efcb5800a420dd066a767d8a7a2765c047e58a76e3472b6cf11dd2e64abb6c8d"

View File

@ -15,6 +15,7 @@ python-multipart = "^0.0.6"
pika = "^1.3.2" pika = "^1.3.2"
aio-pika = "^9.2.2" aio-pika = "^9.2.2"
setproctitle = "^1.3.2" setproctitle = "^1.3.2"
redis = "^5.0.0"
[build-system] [build-system]

12
src/core/async_queue.py Normal file
View File

@ -0,0 +1,12 @@
import asyncio
from src.core.redis_client import RedisClient
redis = RedisClient()
class AsyncQueue(asyncio.Queue):
async def put(self, item):
await redis.set_task_to_queue(item)
return await super().put(item)

View File

@ -1,10 +1,15 @@
import asyncio import asyncio
import concurrent.futures as pool import concurrent.futures as pool
import os.path
import subprocess
from functools import partial from functools import partial
from fastapi import HTTPException from fastapi import HTTPException
from src.core.async_queue import AsyncQueue
from src.core.rabbitmq import get_messages from src.core.rabbitmq import get_messages
from src.core.redis_client import RedisClient
from src.core.result import Result, ResultTypeEnum from src.core.result import Result, ResultTypeEnum
from src.core.ydl import VideoDownloader from src.core.ydl import VideoDownloader
from src.exceptions.download_exceptions import SiteNotImplementedException from src.exceptions.download_exceptions import SiteNotImplementedException
@ -16,12 +21,16 @@ class MasterService:
self.MAX_EXECUTOR_WORKERS = 8 self.MAX_EXECUTOR_WORKERS = 8
self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS, self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS,
initializer=executor_initializer) initializer=executor_initializer)
self.queue = asyncio.Queue() self.queue = AsyncQueue()
self.rabbit_consumer = get_messages self.rabbit_consumer = get_messages
self.currently_underway = {} # contains currently in progress videos self.currently_underway = {} # contains currently in progress videos
async def run(self): async def run(self):
# TODO add cleanup procedure for existing exector procs subprocess.run(
"for pid in $(ps -ef | grep video_downloader_executor_process | awk '{print $2}'); do kill -9 $pid; done",
shell=True, capture_output=True
)
tasks = [self.loop.create_task(self.create_workers()) for i in range(self.MAX_EXECUTOR_WORKERS + 1)] tasks = [self.loop.create_task(self.create_workers()) for i in range(self.MAX_EXECUTOR_WORKERS + 1)]
await asyncio.gather(self.rabbit_consumer(self.queue), *tasks) await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
@ -29,15 +38,17 @@ class MasterService:
async def create_workers(self): async def create_workers(self):
while True: while True:
video_params = await self.queue.get() video_params = await self.queue.get()
redis = RedisClient()
await redis.del_task_from_queue_and_add_to_tasks(task=video_params)
self.currently_underway[video_params['link']] = video_params self.currently_underway[video_params['link']] = video_params
# TODO save current copy of self.currently_underway to DB\file\etc
download_task = self.loop.run_in_executor(self.executor, partial( download_task = self.loop.run_in_executor(self.executor, partial(
MasterService.video_processing_executor, video_params=video_params MasterService.video_processing_executor, video_params=video_params
)
) )
)
if download_task.done(): result = await download_task
result = download_task.result() await redis.del_task_from_tasks_and_add_to_task_done(task=video_params)
# process result # TODO process result
self.queue.task_done() self.queue.task_done()
@staticmethod @staticmethod
@ -46,9 +57,16 @@ class MasterService:
"format": video_params["format"], "format": video_params["format"],
"merge_output_format": video_params["merge_output_format"], "merge_output_format": video_params["merge_output_format"],
'outtmpl': video_params["outtmpl"], 'outtmpl': video_params["outtmpl"],
"quiet": True
} }
downloader = VideoDownloader(link=video_params["link"], ydl_opts=ydl_opts) downloader = VideoDownloader(link=video_params["link"], ydl_opts=ydl_opts)
video_info = downloader.get_info()
if os.path.exists(
os.path.join(os.getcwd() + f"Youtube/{video_info['id']}_{video_info['width']}.{video_info['ext']}")
):
return Result(result_type=ResultTypeEnum.EXIST)
try: try:
downloader.ydl_opts["quiet"] = False
result = downloader.download() result = downloader.download()
return result return result
except SiteNotImplementedException as ex: except SiteNotImplementedException as ex:
@ -59,7 +77,6 @@ class MasterService:
@staticmethod @staticmethod
def video_processing_executor(video_params: dict): def video_processing_executor(video_params: dict):
# TODO check if video exists on server etc. Result(type=ResultType.EXISTS, value=False)
try: try:
MasterService.video_download(video_params=video_params) MasterService.video_download(video_params=video_params)
return Result(result_type=ResultTypeEnum.DONE) return Result(result_type=ResultTypeEnum.DONE)
@ -74,4 +91,3 @@ def executor_initializer():
return True return True

View File

@ -13,19 +13,15 @@ async def on_message(message: AbstractIncomingMessage, queue) -> None:
async def get_messages(inner_queue) -> None: async def get_messages(inner_queue) -> None:
# Perform connection
async with await connect("amqp://guest:guest@localhost/") as connection: async with await connect("amqp://guest:guest@localhost/") as connection:
# Creating a channel
channel = await connection.channel() channel = await connection.channel()
await channel.set_qos(prefetch_count=1) await channel.set_qos(prefetch_count=1)
# Declaring queue
queue = await channel.declare_queue( queue = await channel.declare_queue(
"hello", "hello",
durable=True, durable=True,
) )
# Start listening the queue with name 'task_queue'
await queue.consume(partial(on_message, queue=inner_queue)) await queue.consume(partial(on_message, queue=inner_queue))
print(" [*] Waiting for messages. To exit press CTRL+C") print(" [*] Waiting for messages. To exit press CTRL+C")

55
src/core/redis_client.py Normal file
View File

@ -0,0 +1,55 @@
import json
import redis.asyncio as redis
class RedisClient:
SET_NAME = "queue"
TASKS_NAME = "tasks_working"
TASKS_DONE_NAME = "tasks_done"
def __init__(self):
self.connection = redis.Redis(host="localhost", port=6379, db=0)
async def _set_task(self, task: dict) -> int:
async with self.connection as connection:
res = await connection.set(f'{self.TASKS_NAME}:{task["link"]}', json.dumps(task, indent=4).encode('utf-8'))
return res
async def _set_task_done(self, task: dict) -> int:
async with self.connection as connection:
res = await connection.set(
f'{self.TASKS_DONE_NAME}:1:{task["link"]}',
json.dumps(task, indent=4).encode('utf-8')
)
return res
async def _del_task(self, task: dict) -> int:
async with self.connection as connection:
res = await connection.delete(f'{self.TASKS_NAME}:{task["link"]}')
return res
async def set_task_to_queue(self, task: dict) -> int:
async with self.connection as connection:
res = await connection.sadd(self.SET_NAME, json.dumps(task, indent=4).encode('utf-8'))
return res
async def get_queue(self) -> set:
async with self.connection as connection:
res = await connection.smembers(self.SET_NAME)
return res
async def del_task_from_queue(self, task: dict) -> int:
async with self.connection as connection:
res = await connection.srem(self.SET_NAME, json.dumps(task, indent=4).encode('utf-8'))
return res
async def del_task_from_queue_and_add_to_tasks(self, task: dict) -> int:
await self.del_task_from_queue(task)
return await self._set_task(task)
async def del_task_from_tasks_and_add_to_task_done(self, task: dict) -> int:
await self._del_task(task)
return await self._set_task_done(task)

View File

@ -4,10 +4,11 @@ from enum import Enum
class ResultTypeEnum(Enum): class ResultTypeEnum(Enum):
EXCEPTION = "Error" EXCEPTION = "Error"
DONE = "Done" DONE = "Done"
EXIST = "Exist"
class Result: class Result:
def __init__(self, result_type: ResultTypeEnum, value: Exception = None): def __init__(self, result_type: ResultTypeEnum, value: Exception | bool = None):
self.result_type = result_type self.result_type = result_type
self.value = value self.value = value

View File

@ -24,12 +24,9 @@ class VideoDownloader:
self.username = username self.username = username
self.password = password self.password = password
@staticmethod def get_info(self):
def get_unique_video_filename(): with youtube_dl.YoutubeDL(self.ydl_opts if self.ydl_opts else {}) as ydl:
prefix = f'vid_{datetime.now().strftime("%Y%m%d%H%M%S")}' return ydl.extract_info(self.link, download=False)
random_uuid4 = uuid.uuid4().hex[:8]
filename = f"{prefix}_{random_uuid4}"
return filename
def download(self): def download(self):
domain = urlparse(self.link).netloc domain = urlparse(self.link).netloc