diff --git a/src/core/async_queue.py b/src/core/async_queue.py index e90f47b..67284e1 100644 --- a/src/core/async_queue.py +++ b/src/core/async_queue.py @@ -8,5 +8,5 @@ redis = RedisClient() class AsyncQueue(asyncio.Queue): async def put(self, item): - await redis.set_task_to_queue(item) + await redis.set_task_to_queue(item["link"], item) return await super().put(item) diff --git a/src/core/master_service.py b/src/core/master_service.py index c0f6f13..162785b 100644 --- a/src/core/master_service.py +++ b/src/core/master_service.py @@ -12,12 +12,10 @@ from src.core.redis_client import RedisClient from src.core.result import Result, ResultTypeEnum from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException from src.parsers.MyMail.my_mail_parser import MyMailParser -from src.parsers.Okru.ok_parser import OkParser from src.parsers.Yappy.yappy_parser import YappyParser from src.parsers.base_parser import BaseParser from loguru import logger from src.parsers.parser_mapping import get_parser -# TODO: добавить логгер с временными метками в yt-dlp class MasterService: @@ -40,12 +38,17 @@ class MasterService: await asyncio.gather(self.rabbit_consumer(self.queue), *tasks) + async def result_processing(self, result: Result | list, redis: RedisClient, video_params: dict): + await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, link=video_params["link"]) + await publish_message_with_task_done(task=result.value) + self.queue.task_done() + async def create_workers(self): while True: video_params = await self.queue.get() redis = RedisClient() await redis.del_tasks_queue() - await redis.del_task_from_queue_and_add_to_tasks(task=video_params) + await redis.del_task_from_queue_and_add_to_tasks(link=video_params["link"], task=video_params) self.currently_underway[video_params['link']] = video_params download_task = self.loop.run_in_executor(self.executor, partial( @@ -53,18 +56,7 @@ class MasterService: )) result: Result = await download_task - if result.result_type in [ResultTypeEnum.DONE, ResultTypeEnum.EXIST]: - await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params) - await publish_message_with_task_done(task=result.value) - self.queue.task_done() - else: - error_message = { - "link": video_params["link"], - "result": result.value, - "status": "error" - } - await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params) - await publish_message_with_task_done(task=error_message) + await self.result_processing(result, redis, video_params) if video_params['link'] in self.currently_underway: del self.currently_underway[video_params['link']] @@ -79,7 +71,7 @@ class MasterService: def get_parser(params: dict): try: url_parse_result = urlparse(params["link"]) - uri = f"{url_parse_result.netloc}{url_parse_result.path}" + uri = f"{url_parse_result.netloc}{url_parse_result.path}" logger.info(uri) # # TODO: похоже нужно переделать на регулярки, т.к. добавлять каждую вариацию домена моветон, вероятно я сделаюне- # parser_mapping = { @@ -106,7 +98,13 @@ class MasterService: "link": video_params["link"], "result": result, "status": "done" - }) + }) if not isinstance(result, list)\ + else Result(result_type=ResultTypeEnum.DONE, value={ + "link": video_params["link"], + "result": [result_part for result_part in result], + "status": "done" + }) + except FileAlreadyExistException as ex: return Result(result_type=ResultTypeEnum.EXIST, value={ "link": video_params["link"], @@ -120,16 +118,11 @@ class MasterService: "status": "error" }) except Exception as ex: - Result(result_type=ResultTypeEnum.EXCEPTION, value={ + return Result(result_type=ResultTypeEnum.EXCEPTION, value={ "link": video_params["link"], "result": traceback.format_exc(), "status": "error" }) - return Result(result_type=ResultTypeEnum.EXCEPTION, value={ - "link": video_params["link"], - "result": "Unknown exception encountered", - "status": "error" - }) # TODO upload to server diff --git a/src/core/rabbitmq.py b/src/core/rabbitmq.py index d169ae7..d940e99 100644 --- a/src/core/rabbitmq.py +++ b/src/core/rabbitmq.py @@ -28,7 +28,7 @@ async def get_messages(inner_queue) -> None: await asyncio.Future() -async def publish_message_with_task_done(task: dict) -> None: +async def publish_message_with_task_done(task: dict | list) -> None: queue_name = "tasks_done" async with await connect("amqp://guest:guest@localhost/") as connection: # Creating channel diff --git a/src/core/redis_client.py b/src/core/redis_client.py index b1285ed..5a42fcc 100644 --- a/src/core/redis_client.py +++ b/src/core/redis_client.py @@ -7,61 +7,57 @@ class RedisClient: SET_NAME = "queue" TASKS_NAME = "tasks_working" TASKS_DONE_NAME = "tasks_done" - - # TODO: переписать всё вазимодействие редиса обратно на ключ-значение def __init__(self): self.connection = redis.Redis(host="localhost", port=6379, db=0) - async def _set_task(self, task: dict, queue_name) -> int: + async def _set_task(self, queue_name: str, link: str, task: dict | list, ) -> int: async with self.connection as connection: - res = await connection.sadd(f'{queue_name}:1', json.dumps(task, indent=4).encode('utf-8')) + res = await connection.hset(queue_name, link, json.dumps(task, indent=4).encode('utf-8')) return res - async def _del_task(self, task: dict, queue_name) -> int: + async def _del_task(self, queue_name: str, link: str,) -> int: async with self.connection as connection: - res = await connection.srem(f'{queue_name}:1', json.dumps(task, indent=4).encode('utf-8')) + res = await connection.hdel(queue_name, link) return res - async def set_task_to_queue(self, task: dict) -> int: - async with self.connection as connection: - res = await connection.sadd(self.SET_NAME, json.dumps(task, indent=4).encode('utf-8')) + async def set_task_to_queue(self, link: str, task: dict | list) -> int: + res = await self._set_task(queue_name=self.SET_NAME, link=link, task=task) return res - async def get_queue(self) -> set: + async def get_all_tasks_from_queue(self, queue_name: str) -> dict: async with self.connection as connection: - res = await connection.smembers(self.SET_NAME + f":1") + res = await connection.hgetall(queue_name) return res - async def get_tasks(self) -> set: - async with self.connection as connection: - res = await connection.smembers(self.TASKS_NAME + f":1") - return res + # async def get_tasks(self) -> set: + # async with self.connection as connection: + # res = await connection.smembers(self.TASKS_NAME + f":1") + # return res - async def get_task_done_queue(self) -> set: - async with self.connection as connection: - res = await connection.smembers(self.TASKS_DONE_NAME + f":1") - return res + # async def get_task_done_queue(self) -> set: + # async with self.connection as connection: + # res = await connection.smembers(self.TASKS_DONE_NAME + f":1") + # return res - async def del_task_from_queue(self, task: dict) -> int: - async with self.connection as connection: - res = await connection.srem(self.SET_NAME, json.dumps(task, indent=4).encode('utf-8')) - return res + # async def del_task_from_queue(self, link, task: dict) -> int: + # async with self.connection as connection: + # res = await self._del_task(self.SET_NAME, json.dumps(task, indent=4).encode('utf-8')) + # return res - async def del_task_from_queue_and_add_to_tasks(self, task: dict) -> int: - await self._del_task(task, self.SET_NAME) - return await self._set_task(task, self.TASKS_NAME) + async def del_task_from_queue_and_add_to_tasks(self, link: str, task: dict | list) -> int: + await self._del_task(self.SET_NAME, link) + return await self._set_task(self.TASKS_NAME, link, task) - async def del_task_from_tasks_and_add_to_task_done(self, task: dict, working_task: dict) -> int: - await self._del_task(working_task, self.TASKS_NAME) - return await self._set_task(task, self.TASKS_DONE_NAME) + async def del_task_from_tasks_and_add_to_task_done(self, task: dict | list, link: str) -> int: + await self._del_task(self.TASKS_NAME, link) + return await self._set_task(self.TASKS_DONE_NAME, link, task) async def del_task_from_task_done_queue(self, task) -> int: - async with self.connection as connection: - res = await connection.srem(self.TASKS_DONE_NAME + f":1", json.dumps(task, indent=4).encode('utf-8')) + res = await self._del_task(self.TASKS_DONE_NAME, task["link"]) return res async def del_tasks_queue(self) -> int: async with self.connection as connection: - res = await connection.delete(self.TASKS_NAME + f":1") + res = await connection.delete(self.TASKS_NAME) return res diff --git a/src/core/result.py b/src/core/result.py index 31f72e3..2ff9517 100644 --- a/src/core/result.py +++ b/src/core/result.py @@ -10,7 +10,7 @@ class ResultTypeEnum(Enum): class Result: - def __init__(self, result_type: ResultTypeEnum, value: str | dict = None): + def __init__(self, result_type: ResultTypeEnum, value: str | dict | list = None): self.result_type = result_type self.value = value diff --git a/src/parsers/Okru/ok_parser.py b/src/parsers/Okru/ok_parser.py index fd04576..1697ec6 100644 --- a/src/parsers/Okru/ok_parser.py +++ b/src/parsers/Okru/ok_parser.py @@ -4,6 +4,7 @@ import requests from bs4 import BeautifulSoup +from src.exceptions.download_exceptions import FileAlreadyExistException from src.parsers.base_parser import BaseParser @@ -16,12 +17,24 @@ class OkParser(BaseParser): resp.encoding = self.BASE_ENCODING soup = BeautifulSoup(resp.text, 'lxml') required_div = [div for div in soup.find_all('div', {'class': 'invisible'}) if len(div['class']) < 2][0] - link = required_div.find('span').find('span').find('a').get("href") - self.params["link"] = link - return link + video_tags = required_div.find('span').find_all_next('span', {'itemprop': "video"}) + links = [video_tag.find('a').get("href") for video_tag in video_tags] + return links except Exception as ex: raise def video_download(self): - self.get_video_link() - super().video_download() + base_link = self.params["link"] + links = self.get_video_link() + file_paths = [] + for link in links: + try: + self.params["link"] = link + file_path = super().video_download() + file_paths.append(file_path) + except FileAlreadyExistException as ex: + file_paths.append(ex.message) + continue + self.params["link"] = base_link + return file_paths + diff --git a/src/parsers/base_parser.py b/src/parsers/base_parser.py index fecd537..3c3755a 100644 --- a/src/parsers/base_parser.py +++ b/src/parsers/base_parser.py @@ -20,11 +20,11 @@ class BaseParser: "logger": logger, "merge_output_format": self.params["merge_output_format"], 'outtmpl': self.params["outtmpl"], - "quiet": True + # "quiet": True } downloader = VideoDownloader(link=self.params["link"], ydl_opts=ydl_opts) downloader.get_info() - path_to_video = f"{downloader.info['extractor_key']}/{downloader.info['id']}_{downloader.info['width']}p.{downloader.info['ext']}" + path_to_video = f"{downloader.info['extractor_key']}/{downloader.info['id']}_{downloader.info['resolution']}.{downloader.info['ext']}" if os.path.exists(os.path.join(os.getcwd() + "/downloads/" + path_to_video)): raise FileAlreadyExistException(message=path_to_video) downloader.ydl_opts["quiet"] = False diff --git a/src/parsers/parser_mapping.py b/src/parsers/parser_mapping.py index aa1a4b5..5178c69 100644 --- a/src/parsers/parser_mapping.py +++ b/src/parsers/parser_mapping.py @@ -6,12 +6,14 @@ from src.parsers.Okru.ok_parser import OkParser from src.parsers.Yappy.yappy_parser import YappyParser from src.parsers.base_parser import BaseParser + def compile_regex(regex): return re.compile(regex, re.IGNORECASE | re.DOTALL | re.MULTILINE) + parser_mapping = OrderedDict( { - compile_regex(r"^my.mail.ru/") : MyMailParser, + compile_regex(r"^my.mail.ru/"): MyMailParser, compile_regex(r"^(?:www.)?(?:youtube.com|youtu.be)/"): BaseParser, compile_regex(r"^vk.com/"): BaseParser, compile_regex(r"^ok.ru/okvideo/topic"): OkParser, @@ -24,7 +26,8 @@ parser_mapping = OrderedDict( } ) + def get_parser(uri): for regex in parser_mapping: if regex.match(uri): - return parser_mapping[regex] \ No newline at end of file + return parser_mapping[regex] diff --git a/src/web/main.py b/src/web/main.py index fa57ce6..6bef8c0 100644 --- a/src/web/main.py +++ b/src/web/main.py @@ -27,7 +27,6 @@ app.add_middleware( allow_headers=["*"], ) - ''' await self.app(scope, receive, send) File "/home/admin/video_downloader_service/.venv/lib/python3.10/site-packages/starlette/routing.py", line 66, in app @@ -61,32 +60,19 @@ queue_name -> { async def is_task_already_done_or_exist(redis: RedisClient, link: str): - messages = await redis.get_task_done_queue() - temp = [json.loads(msg) for msg in messages] - tasks = [ - msg for msg in temp - if msg["link"] == link - and msg["status"] in ["done", "exist"] - ] + messages = await redis.get_all_tasks_from_queue(redis.TASKS_DONE_NAME) + tasks = {k.decode("utf-8"): json.loads(v.decode("utf-8")) for k, v in messages.items()} if messages else None - if len(tasks) > 0: - task = tasks[0] - if os.path.exists(os.path.join(os.getcwd(), os.pardir, os.pardir + "/downloads/" + task["result"])): - return task - await redis.del_task_from_task_done_queue(task) + if tasks and link in tasks and tasks[link]["status"] in ["done", "exist"]: + return tasks[link] async def is_task_in_process(redis: RedisClient, link: str): - messages = await redis.get_tasks() + messages = await redis.get_all_tasks_from_queue(redis.TASKS_NAME) + tasks = {k.decode("utf-8"): json.loads(v.decode("utf-8")) for k, v in messages.items()} if messages else None - tasks = [ - literal_eval(message.decode('utf-8')) for message in messages - if literal_eval(message.decode('utf-8'))["link"] == link - ] - - if len(tasks) > 0: - task = tasks[0] - return task + if tasks and link in tasks: + return tasks[link] @app.get("/") @@ -101,10 +87,13 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends( # TODO: где-то не обновился статус после выполнения\провала задачи task_in_process = await is_task_in_process(red, data.link) if task_in_process: - return JSONResponse({"result": "Задача в работе. Ожидайте"}) + return JSONResponse(status_code=202, content={"result": "Задача в работе. Ожидайте"}) if task_done: - link_to_download_video = str(request.base_url) + "get/?file_path=" + task_done["result"] - return JSONResponse({"result": link_to_download_video}) + if isinstance(task_done["result"], str): + links_to_download_video = [str(request.base_url) + "get/?file_path=" + task_done["result"]] + else: + links_to_download_video = [str(request.base_url) + "get/?file_path=" + path for path in task_done["result"]] + return JSONResponse({"result": links_to_download_video}) # TODO: учесть, что если делать запрос CURL\urllib3\etc, в теле может быть несколько ссылок -> должно быть создано несколько задач async with await connect("amqp://guest:guest@localhost/") as connection: @@ -115,7 +104,7 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends( "link": data.link, "format": f"bestvideo[ext={data.video_format.value}]+bestaudio[ext={data.audio_format.value}]/best[ext={data.video_format.value}]/best", "merge_output_format": data.merge_output_format.value, - "outtmpl": f"downloads/%(extractor_key)s/%(id)s_%(width)sp.%(ext)s", + "outtmpl": f"downloads/%(extractor_key)s/%(id)s_%(resolution)s.%(ext)s", }, ] # Sending the message for link in body: @@ -147,44 +136,41 @@ async def download_video(file_path): with open(base_download_dir + f'/{file_path}', mode="rb") as file_like: yield from file_like - return StreamingResponse(iterfile(), headers={'Content-Disposition': f'inline; filename="{file_path}"'}, media_type="video") + return StreamingResponse(iterfile(), headers={'Content-Disposition': f'inline; filename="{file_path}"'}, + media_type="video") @app.post('/check/', response_class=FileResponse, status_code=200) async def download_video(data: CheckIn, request: Request): try: red = RedisClient() + messages_task_done = await red.get_all_tasks_from_queue(red.TASKS_DONE_NAME) + messages_tasks = await red.get_all_tasks_from_queue(red.TASKS_NAME) - messages_task_done = await red.get_task_done_queue() - messages_tasks = await red.get_tasks() + tasks_done = {k.decode("utf-8"): json.loads(v.decode("utf-8")) for k, v in + messages_task_done.items()} if messages_task_done else None - tasks_done = [ - literal_eval(message.decode('utf-8')) for message in messages_task_done - if literal_eval(message.decode('utf-8'))["link"] == data.link - ] - tasks = [ - literal_eval(message.decode('utf-8')) for message in messages_tasks - if literal_eval(message.decode('utf-8'))["link"] == data.link - ] + tasks = {k.decode("utf-8"): json.loads(v.decode("utf-8")) for k, v in + messages_tasks.items()} if messages_tasks else None - error_tasks = [ - tasks_done.pop(tasks_done.index(error_task)) for error_task in tasks_done if error_task["status"] == "error" - ] if tasks_done else None - if tasks and len(tasks) > 0: - task = tasks[0] + if tasks and data.link in tasks: return JSONResponse( status_code=202, - content={"result": f"Задача {task['link']} в данный момент в работе, выполняется"} + content={"result": f"Задача {data.link} в данный момент в работе, выполняется"} ) # TODO: если уже была попытка сделать задачу и в редисе она с ошибкой, то переташить её в очередь на выполнение с очисткой состояние об ошибке - if error_tasks and len(error_tasks) > 0: - error_task = error_tasks[0] - await red.del_task_from_task_done_queue(error_task) + if data.link in tasks_done and tasks_done[data.link]["status"] == "error": + await red.del_task_from_task_done_queue(tasks_done[data.link]) return JSONResponse(status_code=510, content={"result": f"Задача выполнена с ошибкой, попробуйте загрузить еще раз"}) - if len(tasks_done) > 0: - link_to_download_video = str(request.base_url) + "get/?file_path=" + tasks_done[0]["result"] - return JSONResponse({"result": link_to_download_video}) + if tasks_done and data.link in tasks_done: + if isinstance(tasks_done[data.link]["result"], str): + links_to_download_video = [str(request.base_url) + "get/?file_path=" + tasks_done[data.link]["result"]] + else: + links_to_download_video = [str(request.base_url) + "get/?file_path=" + path for path in + tasks_done[data.link]["result"]] + return JSONResponse({"result": links_to_download_video}) + return JSONResponse(status_code=404, content={"result": "Задача не найдена"}) except (AttributeError, IndexError): return JSONResponse(status_code=404, content={"result": "Задача не найдена"}) diff --git a/src/web/templates/index.html b/src/web/templates/index.html index 2c36999..9bd7eb0 100644 --- a/src/web/templates/index.html +++ b/src/web/templates/index.html @@ -77,7 +77,7 @@ -
+