From 31928cca48b34e5d138ccc2d38da5dbf0d96da2d Mon Sep 17 00:00:00 2001 From: Dantenerosas Date: Mon, 25 Sep 2023 04:05:42 +0300 Subject: [PATCH] minor fixes, rework web service, add features --- poetry.lock | 34 ++++++++++- pyproject.toml | 1 + src/core/master_service.py | 17 +----- src/core/redis_client.py | 51 +++++++--------- src/core/ydl.py | 6 ++ src/parsers/base_parser.py | 4 ++ src/web/main.py | 111 +++++++++++++++++++++++------------ src/web/schemes/submit.py | 3 + src/web/templates/index.html | 39 +++++++++++- 9 files changed, 182 insertions(+), 84 deletions(-) diff --git a/poetry.lock b/poetry.lock index 173e864..426ec76 100644 --- a/poetry.lock +++ b/poetry.lock @@ -860,6 +860,24 @@ files = [ {file = "jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe"}, ] +[[package]] +name = "loguru" +version = "0.7.2" +description = "Python logging made (stupidly) simple" +optional = false +python-versions = ">=3.5" +files = [ + {file = "loguru-0.7.2-py3-none-any.whl", hash = "sha256:003d71e3d3ed35f0f8984898359d65b79e5b21943f78af86aa5491210429b8eb"}, + {file = "loguru-0.7.2.tar.gz", hash = "sha256:e671a53522515f34fd406340ee968cb9ecafbc4b36c679da03c18fd8d0bd51ac"}, +] + +[package.dependencies] +colorama = {version = ">=0.3.4", markers = "sys_platform == \"win32\""} +win32-setctime = {version = ">=1.0.0", markers = "sys_platform == \"win32\""} + +[package.extras] +dev = ["Sphinx (==7.2.5)", "colorama (==0.4.5)", "colorama (==0.4.6)", "exceptiongroup (==1.1.3)", "freezegun (==1.1.0)", "freezegun (==1.2.2)", "mypy (==v0.910)", "mypy (==v0.971)", "mypy (==v1.4.1)", "mypy (==v1.5.1)", "pre-commit (==3.4.0)", "pytest (==6.1.2)", "pytest (==7.4.0)", "pytest-cov (==2.12.1)", "pytest-cov (==4.1.0)", "pytest-mypy-plugins (==1.9.3)", "pytest-mypy-plugins (==3.0.0)", "sphinx-autobuild (==2021.3.14)", "sphinx-rtd-theme (==1.3.0)", "tox (==3.27.1)", "tox (==4.11.0)"] + [[package]] name = "lxml" version = "4.9.3" @@ -1851,6 +1869,20 @@ files = [ {file = "websockets-11.0.3.tar.gz", hash = "sha256:88fc51d9a26b10fc331be344f1781224a375b78488fc343620184e95a4b27016"}, ] +[[package]] +name = "win32-setctime" +version = "1.1.0" +description = "A small Python utility to set file creation time on Windows" +optional = false +python-versions = ">=3.5" +files = [ + {file = "win32_setctime-1.1.0-py3-none-any.whl", hash = "sha256:231db239e959c2fe7eb1d7dc129f11172354f98361c4fa2d6d2d7e278baa8aad"}, + {file = "win32_setctime-1.1.0.tar.gz", hash = "sha256:15cf5750465118d6929ae4de4eb46e8edae9a5634350c01ba582df868e932cb2"}, +] + +[package.extras] +dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"] + [[package]] name = "yarl" version = "1.9.2" @@ -1975,4 +2007,4 @@ websockets = "*" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "adb19063d7f961b9bf479d99616e36f0b47d2b1479279fe2f60cef048ed70a64" +content-hash = "1b704294868cb5a6ebb4ce3b9ffb373899bec9ce312ad94810bc1fe0d21ffb7d" diff --git a/pyproject.toml b/pyproject.toml index 6cc5337..4bc532f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ lxml = "^4.9.3" minio = "^7.1.16" aiogram = "3.0" pydantic = "^2.3.0" +loguru = "^0.7.2" [build-system] diff --git a/src/core/master_service.py b/src/core/master_service.py index 20cee2f..0fd0b56 100644 --- a/src/core/master_service.py +++ b/src/core/master_service.py @@ -1,7 +1,6 @@ import asyncio import concurrent.futures as pool import subprocess -import traceback from functools import partial from urllib.parse import urlparse @@ -17,6 +16,7 @@ from src.parsers.base_parser import BaseParser # TODO: добавить логгер с временными метками в yt-dlp + class MasterService: def __init__(self): self.loop = asyncio.get_event_loop() @@ -40,10 +40,7 @@ class MasterService: async def create_workers(self): while True: video_params = await self.queue.get() - # TODO: позднее написать функцию для определения парсера автоматически redis = RedisClient() - # TODO: проверить что в редисе задача либо уже выполнена, т.е. сразу отдать ссылку, либо что она ранее была закончена с ошибкой - # и проверять словарь self.currently_underway, для надёжности await redis.del_task_from_queue_and_add_to_tasks(task=video_params) self.currently_underway[video_params['link']] = video_params @@ -53,7 +50,7 @@ class MasterService: result: Result = await download_task if result.result_type in [ResultTypeEnum.DONE, ResultTypeEnum.EXIST]: - await redis.del_task_from_tasks_and_add_to_task_done(task=result.value) + await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params) await publish_message_with_task_done(task=result.value) self.queue.task_done() else: @@ -62,16 +59,11 @@ class MasterService: "result": result.value, "status": "error" } - await redis.del_task_from_tasks_and_add_to_task_done(task=error_message) + await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params) await publish_message_with_task_done(task=error_message) if video_params['link'] in self.currently_underway: del self.currently_underway[video_params['link']] - # TODO process result - # Result.Done \ Result.Exist - уведомить что задача выполнена, и отослать во вторую очередь сообщений - # RabbitMQ сообщение об этом - # Result.Error - в таблице Редиса для выполненых задач, пометить, что это ошибка и уведомить об этом - # по второй очереди сообщений и потом почистить self.currently_underway @staticmethod def video_download(video_params: dict): @@ -113,9 +105,6 @@ class MasterService: }) except SiteNotImplementedException as ex: return Result(result_type=ResultTypeEnum.EXCEPTION, value=ex.default_message) - - except Exception as ex: - return Result(result_type=ResultTypeEnum.EXCEPTION, value=traceback.format_exc()) # TODO upload to server diff --git a/src/core/redis_client.py b/src/core/redis_client.py index 428a033..fa608b3 100644 --- a/src/core/redis_client.py +++ b/src/core/redis_client.py @@ -11,22 +11,14 @@ class RedisClient: def __init__(self): self.connection = redis.Redis(host="localhost", port=6379, db=0) - async def _set_task(self, task: dict) -> int: + async def _set_task(self, task: dict, queue_name) -> int: async with self.connection as connection: - res = await connection.set(f'{self.TASKS_NAME}:{task["link"]}', json.dumps(task, indent=4).encode('utf-8')) + res = await connection.sadd(f'{queue_name}:1', json.dumps(task, indent=4).encode('utf-8')) return res - async def _set_task_done(self, task: dict) -> int: + async def _del_task(self, task: dict, queue_name) -> int: async with self.connection as connection: - res = await connection.sadd( - f'{self.TASKS_DONE_NAME}:1', - json.dumps(task, indent=4).encode('utf-8') - ) - return res - - async def _del_task(self, task: dict) -> int: - async with self.connection as connection: - res = await connection.delete(f'{self.TASKS_NAME}:{task["link"]}') + res = await connection.srem(f'{queue_name}:1', json.dumps(task, indent=4).encode('utf-8')) return res async def set_task_to_queue(self, task: dict) -> int: @@ -36,7 +28,17 @@ class RedisClient: async def get_queue(self) -> set: async with self.connection as connection: - res = await connection.smembers(self.SET_NAME) + res = await connection.smembers(self.SET_NAME + f":1") + return res + + async def get_tasks(self) -> set: + async with self.connection as connection: + res = await connection.smembers(self.TASKS_NAME + f":1") + return res + + async def get_task_done_queue(self) -> set: + async with self.connection as connection: + res = await connection.smembers(self.TASKS_DONE_NAME + f":1") return res async def del_task_from_queue(self, task: dict) -> int: @@ -45,27 +47,14 @@ class RedisClient: return res async def del_task_from_queue_and_add_to_tasks(self, task: dict) -> int: + await self._del_task(task, self.SET_NAME) + return await self._set_task(task, self.TASKS_NAME) - await self.del_task_from_queue(task) - return await self._set_task(task) - - async def del_task_from_tasks_and_add_to_task_done(self, task: dict) -> int: - await self._del_task(task) - return await self._set_task_done(task) - - async def get_task_done_queue(self) -> set: - async with self.connection as connection: - res = await connection.smembers(self.TASKS_DONE_NAME + f":1") - return res + async def del_task_from_tasks_and_add_to_task_done(self, task: dict, working_task: dict) -> int: + await self._del_task(working_task, self.TASKS_NAME) + return await self._set_task(task, self.TASKS_DONE_NAME) async def del_task_from_task_done_queue(self, task) -> int: async with self.connection as connection: res = await connection.srem(self.TASKS_DONE_NAME + f":1", json.dumps(task, indent=4).encode('utf-8')) return res - - async def get_tasks_queue(self) -> set: - async with self.connection as connection: - res = await connection.json().get(self.TASKS_NAME) - return res - - diff --git a/src/core/ydl.py b/src/core/ydl.py index 5f083a4..22e4280 100644 --- a/src/core/ydl.py +++ b/src/core/ydl.py @@ -27,6 +27,12 @@ class VideoDownloader: def download(self): # TODO: удалить все файлы связанные с текущим видео, которые сейчас остались + base = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + base_download_dir = os.path.join(base, os.pardir, "downloads", self.info['extractor_key']) + for root, dirs, files in os.walk(base_download_dir): + for file in files: + if file.find(self.info['id']) != -1 and file.find('.part') != -1: + os.remove(base_download_dir + f"/{file}") with YoutubeDL(self.ydl_opts if self.ydl_opts else {}) as ydl: ydl.download([self.link]) return self.info diff --git a/src/parsers/base_parser.py b/src/parsers/base_parser.py index 78bb6b6..8dfab29 100644 --- a/src/parsers/base_parser.py +++ b/src/parsers/base_parser.py @@ -1,6 +1,9 @@ +import datetime import errno import os +from loguru import logger + from src.core.ydl import VideoDownloader from src.exceptions.download_exceptions import FileAlreadyExistException @@ -15,6 +18,7 @@ class BaseParser: def video_download(self): ydl_opts = { "format": self.params["format"], + "logger": logger, "merge_output_format": self.params["merge_output_format"], 'outtmpl': self.params["outtmpl"], "quiet": True diff --git a/src/web/main.py b/src/web/main.py index 45ac821..b4ff51f 100644 --- a/src/web/main.py +++ b/src/web/main.py @@ -1,4 +1,3 @@ -import asyncio import json import os from ast import literal_eval @@ -6,6 +5,7 @@ from ast import literal_eval import uvicorn from aio_pika import connect, Message, DeliveryMode from fastapi import FastAPI, Request, Depends +from loguru import logger from starlette.middleware.cors import CORSMiddleware from starlette.responses import JSONResponse, FileResponse, StreamingResponse from starlette.templating import Jinja2Templates @@ -39,7 +39,19 @@ async def is_task_already_done_or_exist(redis: RedisClient, link: str): if len(tasks) > 0: task = tasks[0] - await redis.del_task_from_task_done_queue(task) + return task + + +async def is_task_in_process(redis: RedisClient, link: str): + messages = await redis.get_tasks() + + tasks = [ + literal_eval(message.decode('utf-8')) for message in messages + if literal_eval(message.decode('utf-8'))["link"] == link + ] + + if len(tasks) > 0: + task = tasks[0] return task @@ -47,28 +59,33 @@ async def is_task_already_done_or_exist(redis: RedisClient, link: str): async def index(request: Request): return templates.TemplateResponse("index.html", {"request": request}) + @app.post('/submit/') async def get_url_for_download_video(request: Request, data: SubmitIn = Depends()): - ''' + """ TODO: - Сабмит должен проверить что задача может быть уже выполненой (отдать ссылку в ответе) + Сабмит должен проверить что задача может быть уже выполненой (отдать ссылку в ответе) или ещё в работе (сообщить об этом в ответе, можно вывести на форму, что такая ссылка уже скачивается, ожидайте) Если условия выше провалены, то мы делаем новую задачу в очередь с переданными параметрами и сообщаем об этом клиенту с кодом (200 или 201) - - Дополнительно, нужен отдельный метод (ури), который позволит получать статус задачи. Опрашиваться примерно раз в 5с, + + Дополнительно, нужен отдельный метод (ури), который позволит получать статус задачи. Опрашиваться примерно раз в 5с, возможны увелечения тайминга в зависимости от ответа апи (на будущее) Варианты ответа 1) такой задачи нет (404) 2) такая задача есть и выполняется (200 ли?) 3) такая задача есть и завершена (200 и выдать ссылку на загрузку) - 4) такая задача есть и завершена, но с ошибкой (500 и сообщение о том, что можно попробовать выполнить задачу занов, - попутно удалив задачу из выполненых, с очисткой мусора за ней) - ''' + 4) такая задача есть и завершена, но с ошибкой (500 и сообщение о том, что можно попробовать выполнить задачу заново, + попутно удалив задачу из выполненных, с очисткой мусора за ней) + """ red = RedisClient() task_done = await is_task_already_done_or_exist(red, data.link) + task_in_process = await is_task_in_process(red, data.link) + if task_in_process: + return JSONResponse({"result": "Задача в работе. Ожидайте"}) if task_done: link_to_download_video = str(request.base_url) + "get/?file_path=" + task_done["result"] return JSONResponse({"result": link_to_download_video}) + # TODO: учесть, что если делать запрос CURL\urllib3\etc, в теле может быть несколько ссылок -> должно быть создано несколько задач async with await connect("amqp://guest:guest@localhost/") as connection: # Creating a channel @@ -76,7 +93,7 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends( body = [ { "link": data.link, - "format": f"bestvideo[ext={data.format.value}]+bestaudio[ext={data.format.value}]/best[ext={data.format.value}]/best", + "format": f"bestvideo[ext={data.video_format.value}]+bestaudio[ext={data.audio_format.value}]/best[ext={data.video_format.value}]/best", "merge_output_format": data.merge_output_format.value, "outtmpl": f"downloads/%(extractor_key)s/%(id)s_%(width)sp.%(ext)s", }, ] @@ -94,31 +111,11 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends( routing_key='hello', ) - print(f" [x] Sent '{link}'") - - while True: - try: - messages = await red.get_task_done_queue() - tasks = [ - literal_eval(message.decode('utf-8')) for message in messages - if literal_eval(message.decode('utf-8'))["link"] == link["link"] - ] - error_tasks = [tasks.pop(tasks.index(error_task)) for error_task in tasks if error_task["status"] == "error"] - # TODO: если уже была попытка сделать задачу и в редисе она с ошибкой, то переташить её в очередь на выполнение с очисткой состояние об ошибке - if len(error_tasks) > 0: - return JSONResponse({"result": f"STATUS: ERROR {error_tasks[-1]['result']}"}) - if len(tasks) > 0: - task = tasks[0] - await red.del_task_from_task_done_queue(task) - break - await asyncio.sleep(5) - except (AttributeError, IndexError): - await asyncio.sleep(5) - continue - link_to_download_video = str(request.base_url) + "get/?file_path=" + task["result"] - - # TODO: возможно возвращать идентификаторы задач aka куски ссылок - return JSONResponse({"result": link_to_download_video}) + logger.info(f" [x] Sent '{link}'") + # TODO: возможно возвращать идентификаторы задач aka куски ссылок + return JSONResponse(status_code=200, content={"result": f"Задача поставлена в работу, ссылка: {link['link']}"}) + # TODO: если уже была попытка сделать задачу и в редисе она с ошибкой, то переташить её в очередь на + # выполнение с очисткой состояние об ошибке @app.get('/get/', response_class=FileResponse, status_code=200) @@ -133,4 +130,46 @@ async def download_video(file_path): return StreamingResponse(iterfile(), media_type="video/mp4") -uvicorn.run("src.web.main:app", host="0.0.0.0", log_level="info") +@app.get('/check/', response_class=FileResponse, status_code=200) +async def download_video(request: Request, link: str): + try: + red = RedisClient() + messages_task_done = await red.get_task_done_queue() + messages_tasks = await red.get_tasks() + + tasks_done = [ + literal_eval(message.decode('utf-8')) for message in messages_task_done + if literal_eval(message.decode('utf-8'))["link"] == link + ] + tasks = [ + literal_eval(message.decode('utf-8')) for message in messages_tasks + if literal_eval(message.decode('utf-8'))["link"] == link + ] + + error_tasks = [ + tasks_done.pop(tasks_done.index(error_task)) for error_task in tasks_done if error_task["status"] == "error" + ] + if len(tasks) > 0: + task = tasks[0] + return JSONResponse( + status_code=202, + content={"result": f"Задача {task['link']} в данный момент в работе, выполняется"} + ) + # TODO: если уже была попытка сделать задачу и в редисе она с ошибкой, то переташить её в очередь на выполнение с очисткой состояние об ошибке + if len(error_tasks) > 0: + error_task = error_tasks[0] + await red.del_task_from_task_done_queue(error_task) + return JSONResponse(status_code=510, + content={"result": f"Задача выполнена с ошибкой, попробуйте загрузить еще раз"}) + if len(tasks_done) > 0: + link_to_download_video = str(request.base_url) + "get/?file_path=" + tasks_done[0]["result"] + return JSONResponse({"result": link_to_download_video}) + + except (AttributeError, IndexError): + return JSONResponse(status_code=404, content={"result": "Задача не найдена"}) + except Exception as ex: + print(ex) + + +if __name__ == '__main__': + uvicorn.run("src.web.main:app", host="0.0.0.0", log_level="info") diff --git a/src/web/schemes/submit.py b/src/web/schemes/submit.py index 0915935..1ee470b 100644 --- a/src/web/schemes/submit.py +++ b/src/web/schemes/submit.py @@ -9,6 +9,7 @@ vext: Video Extension (mp4 > mov > webm > flv > other). If --prefer-free-formats aext: Audio Extension (m4a > aac > mp3 > ogg > opus > webm > other). If --prefer-free-formats is used, the order changes to ogg > opus > webm > mp3 > m4a > aac ''' + class VideoFormatEnum(Enum): format_3gp = "3gp" format_flv = "flv" @@ -16,6 +17,7 @@ class VideoFormatEnum(Enum): format_mov = "mov" format_webm = "webm" + class AudioFormatEnum(Enum): format_aac = "aac" format_m4a = "m4a" @@ -34,6 +36,7 @@ class MergeOutputFormatEnum(Enum): format_mp4 = "mp4" format_webm = "webm" + @dataclass class SubmitIn: link: str = Form(...) diff --git a/src/web/templates/index.html b/src/web/templates/index.html index fdc8d7a..32a7ef3 100644 --- a/src/web/templates/index.html +++ b/src/web/templates/index.html @@ -71,7 +71,7 @@
- + @@ -87,10 +87,39 @@ - \ No newline at end of file + + +