diff --git a/poetry.lock b/poetry.lock index bc6e8d3..6af8923 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1680,6 +1680,20 @@ files = [ [package.dependencies] six = ">=1.5" +[[package]] +name = "python-dotenv" +version = "1.0.0" +description = "Read key-value pairs from a .env file and set them as environment variables" +optional = false +python-versions = ">=3.8" +files = [ + {file = "python-dotenv-1.0.0.tar.gz", hash = "sha256:a8df96034aae6d2d50a4ebe8216326c61c3eb64836776504fcca410e5937a3ba"}, + {file = "python_dotenv-1.0.0-py3-none-any.whl", hash = "sha256:f5971a9226b701070a4bf2c38c89e5a3f0d64de8debda981d1db98583009122a"}, +] + +[package.extras] +cli = ["click (>=5.0)"] + [[package]] name = "python-multipart" version = "0.0.6" @@ -2327,4 +2341,4 @@ websockets = "*" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "272fe31fba150b0b0fcca1b7d60f706dc2a05ea730ef19e34ccb8e5524f47d66" +content-hash = "b7973dc522b312b75a798bc966c5001b24e134cccd332de7b978e5a1ec495b57" diff --git a/pyproject.toml b/pyproject.toml index a9365e7..64dac5c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ ply = "3.11" ruamel-yaml = "0.17.21" flask-login = "0.6.2" pycryptodome = "3.18.0" +python-dotenv = "^1.0.0" [build-system] diff --git a/src/core/config.py b/src/core/config.py new file mode 100644 index 0000000..fb36a12 --- /dev/null +++ b/src/core/config.py @@ -0,0 +1,13 @@ +import os + +from dotenv import load_dotenv + + +load_dotenv() + + +S3_HOST = os.environ.get("S3_HOST", "s3-api.grfc.ru") +S3_ACCESS_KEY = os.environ.get("S3_ACCESS_KEY", "cl-i-oculus-dev1") +S3_SECRET_KEY = os.environ.get("S3_SECRET_KEY", "Nom8qKEU6IYtQSrNt5ZPN1XncQTZdtUM") +S3_BUCKET_NAME = os.environ.get("S3_BUCKET_NAME", "clean-internet-oculus-integration-dev") +DEFAULT_DURATION = os.environ.get("DEFAULT_DURATION", 600) diff --git a/src/core/master_service.py b/src/core/master_service.py index 6419ac8..ca54d3d 100644 --- a/src/core/master_service.py +++ b/src/core/master_service.py @@ -1,6 +1,7 @@ import asyncio import concurrent.futures as pool import json +import os import subprocess import traceback @@ -12,6 +13,7 @@ from src.core.async_queue import AsyncQueue from src.core.rabbitmq import get_messages, publish_message_with_task_done from src.core.redis_client import RedisClient from src.core.result import Result, ResultTypeEnum +from src.core.s3_client import S3Client from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException from src.parsers.MyMail.my_mail_parser import MyMailParser from src.parsers.Yappy.yappy_parser import YappyParser @@ -50,7 +52,10 @@ class MasterService: await asyncio.gather(self.rabbit_consumer(self.queue), *tasks) - async def result_processing(self, result: Result | list, redis: RedisClient, video_params: dict): + async def result_processing(self, result: Result | list, redis: RedisClient, s3_client: S3Client, video_params: dict): + file_path = os.path.join(os.getcwd() + "/downloads/") + links_to_download = s3_client.upload(file_name=result.value["result"], file_path=file_path) + result.value["result"] = links_to_download await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, link=video_params["link"]) await publish_message_with_task_done(task=result.value) self.queue.task_done() @@ -59,6 +64,7 @@ class MasterService: while True: video_params = await self.queue.get() redis = RedisClient() + s3_client = S3Client() await redis.del_task_from_queue_and_add_to_tasks(link=video_params["link"], task=video_params) self.currently_underway[video_params['link']] = video_params @@ -67,7 +73,7 @@ class MasterService: )) result: Result = await download_task - await self.result_processing(result, redis, video_params) + await self.result_processing(result, redis, s3_client, video_params) if video_params['link'] in self.currently_underway: del self.currently_underway[video_params['link']] diff --git a/src/core/redis_client.py b/src/core/redis_client.py index c304de8..adb17c1 100644 --- a/src/core/redis_client.py +++ b/src/core/redis_client.py @@ -46,3 +46,7 @@ class RedisClient: async with self.connection as connection: res = await connection.delete(self.TASKS_NAME) return res + + async def update_task_in_tasks_done(self, task: dict | list, link: str) -> int: + await self._del_task(self.TASKS_DONE_NAME, link) + return await self._set_task(self.TASKS_DONE_NAME, link, task) diff --git a/src/core/s3_client.py b/src/core/s3_client.py new file mode 100644 index 0000000..21ffe95 --- /dev/null +++ b/src/core/s3_client.py @@ -0,0 +1,66 @@ +from loguru import logger +from minio import Minio +from minio.commonconfig import CopySource + +from src.core.config import S3_HOST, S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET_NAME + + +class S3Client: + HOST = S3_HOST + ACCESS_KEY = S3_ACCESS_KEY + SECRET_KEY = S3_SECRET_KEY + BUCKET_NAME = S3_BUCKET_NAME + + def __init__(self): + self.client = Minio( + self.HOST, + access_key=self.ACCESS_KEY, + secret_key=self.SECRET_KEY, + secure=True + ) + + def _make_sure_bucket_exist(self): + found = self.client.bucket_exists(self.BUCKET_NAME) + if not found: + self.client.make_bucket(self.BUCKET_NAME) + else: + logger.info(f"Bucket {self.BUCKET_NAME} already exists") + + def upload(self, file_name: str, file_path: str | list[str]): + self._make_sure_bucket_exist() + if isinstance(file_name, str): + file_path = file_path + file_name + self.client.fput_object(self.BUCKET_NAME, file_name, file_path) + logger.info(f"{file_path} is successfully uploaded as object {file_name} to bucket {self.BUCKET_NAME}.") + link_to_download = self.get(file_name) + return link_to_download + else: + result = [] + for file_name_part in file_name: + current_file_path = file_path + file_name_part + self.client.fput_object(self.BUCKET_NAME, file_name_part, current_file_path) + logger.info(f"{current_file_path} is successfully uploaded as object {file_name_part} to bucket {self.BUCKET_NAME}.") + link_to_download = self.get(file_name_part) + result.append(link_to_download) + return result + + def get(self, file_name: str): + self._make_sure_bucket_exist() + result = self.client.get_presigned_url( + "GET", + self.BUCKET_NAME, + file_name, + ) + return result + + def delete(self, file_name: str): + result = self.client.remove_object(self.BUCKET_NAME, file_name) + return result + + def copy_to_another_bucket(self, file_name: str, bucket_name): + result = self.client.copy_object( + bucket_name, + file_name, + CopySource(self.BUCKET_NAME, file_name), + ) + return result diff --git a/src/core/uploader.py b/src/core/uploader.py deleted file mode 100644 index fe05fc6..0000000 --- a/src/core/uploader.py +++ /dev/null @@ -1,32 +0,0 @@ -from minio import Minio -from minio.error import S3Error - - -def main(): - client = Minio( - "grfc.ru", - access_key="cl-i-oculus-dev1", - secret_key="Nom8qKEU6IYtQSrNt5ZPN1XncQTZdtUM", - secure=True - ) - - found = client.bucket_exists("clean-internet-oculus-integration-dev") - if not found: - client.make_bucket("clean-internet-oculus-integration-dev") - else: - print("Bucket 'clean-internet-oculus-integration-dev' already exists") - - client.fput_object( - "clean-internet-oculus-integration-dev", "4uv2GNc_ybc_1080p.mp4", "/Users/garickbadalov/PycharmProjects/video_downloader_service/downloads/Youtube/4uv2GNc_ybc_1080p.mp4", - ) - print( - "'/Users/garickbadalov/PycharmProjects/video_downloader_service/downloads/Youtube/4uv2GNc_ybc_1080p.mp4' is successfully uploaded as " - "object '4uv2GNc_ybc_1080p.mp4' to bucket 'clean-internet-oculus-integration-dev'." - ) - - -if __name__ == "__main__": - try: - main() - except S3Error as exc: - print("error occurred.", exc) \ No newline at end of file diff --git a/src/parsers/Okru/ok_parser.py b/src/parsers/Okru/ok_parser.py index 1697ec6..9973b0f 100644 --- a/src/parsers/Okru/ok_parser.py +++ b/src/parsers/Okru/ok_parser.py @@ -1,8 +1,10 @@ import os +import re import requests from bs4 import BeautifulSoup +from lxml import etree from src.exceptions.download_exceptions import FileAlreadyExistException from src.parsers.base_parser import BaseParser @@ -16,9 +18,16 @@ class OkParser(BaseParser): resp = requests.get(self.params["link"]) resp.encoding = self.BASE_ENCODING soup = BeautifulSoup(resp.text, 'lxml') - required_div = [div for div in soup.find_all('div', {'class': 'invisible'}) if len(div['class']) < 2][0] - video_tags = required_div.find('span').find_all_next('span', {'itemprop': "video"}) - links = [video_tag.find('a').get("href") for video_tag in video_tags] + if "topic" in self.params["link"]: + dom = etree.HTML(str(soup)) + elements_with_video_id = dom.xpath( + "//div[@class='mlr_cnt']/div[contains(@data-l, 'gA,VIDEO,mB,movie,ti,')]/div[@class='vid-card " + "vid-card__xl']/div[@class='video-card_n-w']/a[contains(@onclick, 'OK.VideoPlayer.openMovie')]") + links = ["https://ok.ru/video/" + re.findall('\d+', elem.get("onclick"))[0] for elem in elements_with_video_id] + else: + required_div = [div for div in soup.find_all('div', {'class': 'invisible'}) if len(div['class']) < 2][0] + video_tags = required_div.find('span').find_all_next('span', {'itemprop': "video"}) + links = [video_tag.find('a').get("href") for video_tag in video_tags] return links except Exception as ex: raise diff --git a/src/parsers/base_parser.py b/src/parsers/base_parser.py index 1adca12..0f9417f 100644 --- a/src/parsers/base_parser.py +++ b/src/parsers/base_parser.py @@ -2,7 +2,9 @@ import errno import os from loguru import logger +from yt_dlp import download_range_func +from src.core.config import DEFAULT_DURATION from src.core.ydl import VideoDownloader from src.exceptions.download_exceptions import FileAlreadyExistException @@ -20,6 +22,7 @@ class BaseParser: "logger": logger, "merge_output_format": self.params["merge_output_format"], 'outtmpl': self.params["outtmpl"], + 'download_ranges': download_range_func(None, [(0, int(DEFAULT_DURATION))]), # "quiet": True } downloader = VideoDownloader(link=self.params["link"], ydl_opts=ydl_opts) @@ -29,14 +32,16 @@ class BaseParser: resolution = downloader.info['resolution'] else: resolution = "NA" + + base_file_name = f"{downloader.info['id']}_{resolution}.{downloader.info['ext']}" if "Yahoo" in ydl_opts["outtmpl"]["default"]: - path_to_video = f"Yahoo/{downloader.info['id']}_{resolution}.{downloader.info['ext']}" + path_to_video = f"Yahoo/{base_file_name}" elif "ZenYandex" in ydl_opts["outtmpl"]["default"]: - path_to_video = f"ZenYandex/{downloader.info['id']}_{resolution}.{downloader.info['ext']}" + path_to_video = f"ZenYandex/{base_file_name}" elif "Bing" in ydl_opts["outtmpl"]["default"]: - path_to_video = f"Bing/{downloader.info['id']}_{resolution}.{downloader.info['ext']}" + path_to_video = f"Bing/{base_file_name}" else: - path_to_video = f"{downloader.info['extractor_key']}/{downloader.info['id']}_{resolution}.{downloader.info['ext']}" + path_to_video = f"{downloader.info['extractor_key']}/{base_file_name}" if os.path.exists(os.path.join(os.getcwd() + "/downloads/" + path_to_video)): raise FileAlreadyExistException(message=path_to_video) downloader.ydl_opts["quiet"] = False diff --git a/src/web/main.py b/src/web/main.py index abb62d5..161266c 100644 --- a/src/web/main.py +++ b/src/web/main.py @@ -1,16 +1,16 @@ import json -import os import uvicorn import logging from aio_pika import connect, Message, DeliveryMode from fastapi import FastAPI, Request, Depends from starlette.middleware.cors import CORSMiddleware -from starlette.responses import JSONResponse, FileResponse, StreamingResponse +from starlette.responses import JSONResponse, FileResponse, Response from starlette.templating import Jinja2Templates from src.core.redis_client import RedisClient -from src.web.schemes.submit import SubmitIn, CheckIn +from src.core.s3_client import S3Client +from src.web.schemes.submit import SubmitIn, CheckIn, DeleteFromS3, CopyToAnotherBucketS3 app = FastAPI( title="video_downloader", openapi_url=f"/api/v1/openapi.json" @@ -82,6 +82,7 @@ async def index(request: Request): @app.post('/submit') async def get_url_for_download_video(request: Request, data: SubmitIn = Depends()): red = RedisClient() + s3_client = S3Client() task_done = await is_task_already_done_or_exist(red, data.link) # TODO: где-то не обновился статус после выполнения\провала задачи task_in_process = await is_task_in_process(red, data.link) @@ -89,9 +90,15 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends( return JSONResponse(status_code=202, content={"result": "Задача в работе. Ожидайте"}) if task_done: if isinstance(task_done["result"], str): - links_to_download_video = [str(request.base_url) + "get/?file_path=" + task_done["result"]] + file_name = task_done["result"][task_done["result"].index("dev/") + 4:task_done["result"].index("?")] + links_to_download_video = [s3_client.get(file_name)] + task_done["result"] = links_to_download_video[0] else: - links_to_download_video = [str(request.base_url) + "get/?file_path=" + path for path in task_done["result"]] + file_names = [task_done_part[task_done_part.index("dev/") + 4:task_done_part.index("?")] for + task_done_part in task_done["result"]] + links_to_download_video = [s3_client.get(file_name) for file_name in file_names] + task_done["result"] = links_to_download_video + await red.update_task_in_tasks_done(task_done, task_done["link"]) return JSONResponse({"result": links_to_download_video}) # TODO: учесть, что если делать запрос CURL\urllib3\etc, в теле может быть несколько ссылок -> должно быть создано несколько задач @@ -104,6 +111,7 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends( "format": f"bv[width={data.resolution.value}][ext={data.video_format.value}]+ba[ext={data.audio_format.value}]/" f"bv[width={data.resolution.value}][ext=mp4]+ba[ext=m4a]/" f"bv[width={data.resolution.value}][ext=webm]+ba[ext=webm]/" + f"best[width={data.resolution.value}]/" f"best[ext={data.video_format.value}]/" f"best[ext!=unknown_video]", "merge_output_format": data.merge_output_format.value, @@ -132,15 +140,10 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends( @app.get('/get/', response_class=FileResponse, status_code=200) async def download_video(file_path): - base = os.path.dirname(os.path.dirname(os.path.abspath(file_path))) - base_download_dir = os.path.join(base, os.pardir, os.pardir, "downloads") - - def iterfile(): - with open(base_download_dir + f'/{file_path}', mode="rb") as file_like: - yield from file_like - - return StreamingResponse(iterfile(), headers={'Content-Disposition': f'inline; filename="{file_path}"'}, - media_type="video") + s3_client = S3Client() + file_response = s3_client.get(file_path) + return Response(content=file_response.data, headers={'Content-Disposition': f'inline; filename="{file_path}"'}, + media_type="video") @app.post('/check/', response_class=FileResponse, status_code=200) @@ -168,9 +171,9 @@ async def download_video(data: CheckIn, request: Request): content={"result": f"Задача выполнена с ошибкой, попробуйте загрузить еще раз"}) if tasks_done and data.link in tasks_done: if isinstance(tasks_done[data.link]["result"], str): - links_to_download_video = [str(request.base_url) + "get/?file_path=" + tasks_done[data.link]["result"]] + links_to_download_video = [tasks_done[data.link]["result"]] else: - links_to_download_video = [str(request.base_url) + "get/?file_path=" + path for path in + links_to_download_video = [link for link in tasks_done[data.link]["result"]] return JSONResponse({"result": links_to_download_video}) return JSONResponse(status_code=404, content={"result": "Задача не найдена"}) @@ -180,4 +183,25 @@ async def download_video(data: CheckIn, request: Request): except Exception as ex: print(ex) + +@app.delete('/from-s3/', status_code=200) +async def delete_video_from_s3(delete_data: DeleteFromS3): + s3_client = S3Client() + s3_client.delete(delete_data.file_name) + return JSONResponse( + status_code=200, + content={"result": f"Файл {delete_data.file_name} успешно удален из корзины {s3_client.BUCKET_NAME}"} + ) + + +@app.post('/copy-to-another-bucket/', status_code=200) +async def delete_video_from_s3(data: CopyToAnotherBucketS3): + s3_client = S3Client() + s3_client.copy_to_another_bucket(data.file_name, data.bucket_name) + return JSONResponse( + status_code=200, + content={"result": f"Файл {data.file_name} успешно скопирован в корзину {data.bucket_name}"} + ) + + uvicorn.run("src.web.main:app", host="0.0.0.0", log_level="info") diff --git a/src/web/schemes/submit.py b/src/web/schemes/submit.py index 629b69f..abc88d0 100644 --- a/src/web/schemes/submit.py +++ b/src/web/schemes/submit.py @@ -50,11 +50,20 @@ class MergeOutputFormatEnum(Enum): @dataclass class SubmitIn: link: str = Form(...) - video_format: VideoFormatEnum = Form(default=MergeOutputFormatEnum.format_webm) - audio_format: AudioFormatEnum = Form(default=AudioFormatEnum.format_webm) - resolution: ResolutionEnum = Form(default=ResolutionEnum.resolution_1080) + video_format: VideoFormatEnum = Form(default=MergeOutputFormatEnum.format_mp4) + audio_format: AudioFormatEnum = Form(default=AudioFormatEnum.format_m4a) + resolution: ResolutionEnum = Form(default=ResolutionEnum.resolution_720) merge_output_format: MergeOutputFormatEnum = Form(default=MergeOutputFormatEnum.format_mkv) class CheckIn(BaseModel): link: str + + +class DeleteFromS3(BaseModel): + file_name: str + + +class CopyToAnotherBucketS3(BaseModel): + file_name: str + bucket_name: str diff --git a/src/web/templates/index.html b/src/web/templates/index.html index a965b0a..96ac597 100644 --- a/src/web/templates/index.html +++ b/src/web/templates/index.html @@ -77,9 +77,9 @@ - + - +
Формат аудио @@ -87,9 +87,9 @@ - + - + @@ -100,8 +100,8 @@ - - + +