From d22f976d2f6a2ca68131984f5e1c8b2adaec858a Mon Sep 17 00:00:00 2001
From: Dantenerosas
Date: Tue, 21 Nov 2023 22:21:00 +0300
Subject: [PATCH] added s3_client, refactored web and master services
---
poetry.lock | 16 ++++++++-
pyproject.toml | 1 +
src/core/config.py | 13 +++++++
src/core/master_service.py | 10 ++++--
src/core/redis_client.py | 4 +++
src/core/s3_client.py | 66 +++++++++++++++++++++++++++++++++++
src/core/uploader.py | 32 -----------------
src/parsers/Okru/ok_parser.py | 15 ++++++--
src/parsers/base_parser.py | 13 ++++---
src/web/main.py | 56 ++++++++++++++++++++---------
src/web/schemes/submit.py | 15 ++++++--
src/web/templates/index.html | 12 +++----
12 files changed, 186 insertions(+), 67 deletions(-)
create mode 100644 src/core/config.py
create mode 100644 src/core/s3_client.py
delete mode 100644 src/core/uploader.py
diff --git a/poetry.lock b/poetry.lock
index bc6e8d3..6af8923 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -1680,6 +1680,20 @@ files = [
[package.dependencies]
six = ">=1.5"
+[[package]]
+name = "python-dotenv"
+version = "1.0.0"
+description = "Read key-value pairs from a .env file and set them as environment variables"
+optional = false
+python-versions = ">=3.8"
+files = [
+ {file = "python-dotenv-1.0.0.tar.gz", hash = "sha256:a8df96034aae6d2d50a4ebe8216326c61c3eb64836776504fcca410e5937a3ba"},
+ {file = "python_dotenv-1.0.0-py3-none-any.whl", hash = "sha256:f5971a9226b701070a4bf2c38c89e5a3f0d64de8debda981d1db98583009122a"},
+]
+
+[package.extras]
+cli = ["click (>=5.0)"]
+
[[package]]
name = "python-multipart"
version = "0.0.6"
@@ -2327,4 +2341,4 @@ websockets = "*"
[metadata]
lock-version = "2.0"
python-versions = "^3.11"
-content-hash = "272fe31fba150b0b0fcca1b7d60f706dc2a05ea730ef19e34ccb8e5524f47d66"
+content-hash = "b7973dc522b312b75a798bc966c5001b24e134cccd332de7b978e5a1ec495b57"
diff --git a/pyproject.toml b/pyproject.toml
index a9365e7..64dac5c 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -36,6 +36,7 @@ ply = "3.11"
ruamel-yaml = "0.17.21"
flask-login = "0.6.2"
pycryptodome = "3.18.0"
+python-dotenv = "^1.0.0"
[build-system]
diff --git a/src/core/config.py b/src/core/config.py
new file mode 100644
index 0000000..fb36a12
--- /dev/null
+++ b/src/core/config.py
@@ -0,0 +1,13 @@
+import os
+
+from dotenv import load_dotenv
+
+
+load_dotenv()
+
+
+S3_HOST = os.environ.get("S3_HOST", "s3-api.grfc.ru")
+S3_ACCESS_KEY = os.environ.get("S3_ACCESS_KEY", "cl-i-oculus-dev1")
+S3_SECRET_KEY = os.environ.get("S3_SECRET_KEY", "Nom8qKEU6IYtQSrNt5ZPN1XncQTZdtUM")
+S3_BUCKET_NAME = os.environ.get("S3_BUCKET_NAME", "clean-internet-oculus-integration-dev")
+DEFAULT_DURATION = os.environ.get("DEFAULT_DURATION", 600)
diff --git a/src/core/master_service.py b/src/core/master_service.py
index 6419ac8..ca54d3d 100644
--- a/src/core/master_service.py
+++ b/src/core/master_service.py
@@ -1,6 +1,7 @@
import asyncio
import concurrent.futures as pool
import json
+import os
import subprocess
import traceback
@@ -12,6 +13,7 @@ from src.core.async_queue import AsyncQueue
from src.core.rabbitmq import get_messages, publish_message_with_task_done
from src.core.redis_client import RedisClient
from src.core.result import Result, ResultTypeEnum
+from src.core.s3_client import S3Client
from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException
from src.parsers.MyMail.my_mail_parser import MyMailParser
from src.parsers.Yappy.yappy_parser import YappyParser
@@ -50,7 +52,10 @@ class MasterService:
await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
- async def result_processing(self, result: Result | list, redis: RedisClient, video_params: dict):
+ async def result_processing(self, result: Result | list, redis: RedisClient, s3_client: S3Client, video_params: dict):
+ file_path = os.path.join(os.getcwd() + "/downloads/")
+ links_to_download = s3_client.upload(file_name=result.value["result"], file_path=file_path)
+ result.value["result"] = links_to_download
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, link=video_params["link"])
await publish_message_with_task_done(task=result.value)
self.queue.task_done()
@@ -59,6 +64,7 @@ class MasterService:
while True:
video_params = await self.queue.get()
redis = RedisClient()
+ s3_client = S3Client()
await redis.del_task_from_queue_and_add_to_tasks(link=video_params["link"], task=video_params)
self.currently_underway[video_params['link']] = video_params
@@ -67,7 +73,7 @@ class MasterService:
))
result: Result = await download_task
- await self.result_processing(result, redis, video_params)
+ await self.result_processing(result, redis, s3_client, video_params)
if video_params['link'] in self.currently_underway:
del self.currently_underway[video_params['link']]
diff --git a/src/core/redis_client.py b/src/core/redis_client.py
index c304de8..adb17c1 100644
--- a/src/core/redis_client.py
+++ b/src/core/redis_client.py
@@ -46,3 +46,7 @@ class RedisClient:
async with self.connection as connection:
res = await connection.delete(self.TASKS_NAME)
return res
+
+ async def update_task_in_tasks_done(self, task: dict | list, link: str) -> int:
+ await self._del_task(self.TASKS_DONE_NAME, link)
+ return await self._set_task(self.TASKS_DONE_NAME, link, task)
diff --git a/src/core/s3_client.py b/src/core/s3_client.py
new file mode 100644
index 0000000..21ffe95
--- /dev/null
+++ b/src/core/s3_client.py
@@ -0,0 +1,66 @@
+from loguru import logger
+from minio import Minio
+from minio.commonconfig import CopySource
+
+from src.core.config import S3_HOST, S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET_NAME
+
+
+class S3Client:
+ HOST = S3_HOST
+ ACCESS_KEY = S3_ACCESS_KEY
+ SECRET_KEY = S3_SECRET_KEY
+ BUCKET_NAME = S3_BUCKET_NAME
+
+ def __init__(self):
+ self.client = Minio(
+ self.HOST,
+ access_key=self.ACCESS_KEY,
+ secret_key=self.SECRET_KEY,
+ secure=True
+ )
+
+ def _make_sure_bucket_exist(self):
+ found = self.client.bucket_exists(self.BUCKET_NAME)
+ if not found:
+ self.client.make_bucket(self.BUCKET_NAME)
+ else:
+ logger.info(f"Bucket {self.BUCKET_NAME} already exists")
+
+ def upload(self, file_name: str, file_path: str | list[str]):
+ self._make_sure_bucket_exist()
+ if isinstance(file_name, str):
+ file_path = file_path + file_name
+ self.client.fput_object(self.BUCKET_NAME, file_name, file_path)
+ logger.info(f"{file_path} is successfully uploaded as object {file_name} to bucket {self.BUCKET_NAME}.")
+ link_to_download = self.get(file_name)
+ return link_to_download
+ else:
+ result = []
+ for file_name_part in file_name:
+ current_file_path = file_path + file_name_part
+ self.client.fput_object(self.BUCKET_NAME, file_name_part, current_file_path)
+ logger.info(f"{current_file_path} is successfully uploaded as object {file_name_part} to bucket {self.BUCKET_NAME}.")
+ link_to_download = self.get(file_name_part)
+ result.append(link_to_download)
+ return result
+
+ def get(self, file_name: str):
+ self._make_sure_bucket_exist()
+ result = self.client.get_presigned_url(
+ "GET",
+ self.BUCKET_NAME,
+ file_name,
+ )
+ return result
+
+ def delete(self, file_name: str):
+ result = self.client.remove_object(self.BUCKET_NAME, file_name)
+ return result
+
+ def copy_to_another_bucket(self, file_name: str, bucket_name):
+ result = self.client.copy_object(
+ bucket_name,
+ file_name,
+ CopySource(self.BUCKET_NAME, file_name),
+ )
+ return result
diff --git a/src/core/uploader.py b/src/core/uploader.py
deleted file mode 100644
index fe05fc6..0000000
--- a/src/core/uploader.py
+++ /dev/null
@@ -1,32 +0,0 @@
-from minio import Minio
-from minio.error import S3Error
-
-
-def main():
- client = Minio(
- "grfc.ru",
- access_key="cl-i-oculus-dev1",
- secret_key="Nom8qKEU6IYtQSrNt5ZPN1XncQTZdtUM",
- secure=True
- )
-
- found = client.bucket_exists("clean-internet-oculus-integration-dev")
- if not found:
- client.make_bucket("clean-internet-oculus-integration-dev")
- else:
- print("Bucket 'clean-internet-oculus-integration-dev' already exists")
-
- client.fput_object(
- "clean-internet-oculus-integration-dev", "4uv2GNc_ybc_1080p.mp4", "/Users/garickbadalov/PycharmProjects/video_downloader_service/downloads/Youtube/4uv2GNc_ybc_1080p.mp4",
- )
- print(
- "'/Users/garickbadalov/PycharmProjects/video_downloader_service/downloads/Youtube/4uv2GNc_ybc_1080p.mp4' is successfully uploaded as "
- "object '4uv2GNc_ybc_1080p.mp4' to bucket 'clean-internet-oculus-integration-dev'."
- )
-
-
-if __name__ == "__main__":
- try:
- main()
- except S3Error as exc:
- print("error occurred.", exc)
\ No newline at end of file
diff --git a/src/parsers/Okru/ok_parser.py b/src/parsers/Okru/ok_parser.py
index 1697ec6..9973b0f 100644
--- a/src/parsers/Okru/ok_parser.py
+++ b/src/parsers/Okru/ok_parser.py
@@ -1,8 +1,10 @@
import os
+import re
import requests
from bs4 import BeautifulSoup
+from lxml import etree
from src.exceptions.download_exceptions import FileAlreadyExistException
from src.parsers.base_parser import BaseParser
@@ -16,9 +18,16 @@ class OkParser(BaseParser):
resp = requests.get(self.params["link"])
resp.encoding = self.BASE_ENCODING
soup = BeautifulSoup(resp.text, 'lxml')
- required_div = [div for div in soup.find_all('div', {'class': 'invisible'}) if len(div['class']) < 2][0]
- video_tags = required_div.find('span').find_all_next('span', {'itemprop': "video"})
- links = [video_tag.find('a').get("href") for video_tag in video_tags]
+ if "topic" in self.params["link"]:
+ dom = etree.HTML(str(soup))
+ elements_with_video_id = dom.xpath(
+ "//div[@class='mlr_cnt']/div[contains(@data-l, 'gA,VIDEO,mB,movie,ti,')]/div[@class='vid-card "
+ "vid-card__xl']/div[@class='video-card_n-w']/a[contains(@onclick, 'OK.VideoPlayer.openMovie')]")
+ links = ["https://ok.ru/video/" + re.findall('\d+', elem.get("onclick"))[0] for elem in elements_with_video_id]
+ else:
+ required_div = [div for div in soup.find_all('div', {'class': 'invisible'}) if len(div['class']) < 2][0]
+ video_tags = required_div.find('span').find_all_next('span', {'itemprop': "video"})
+ links = [video_tag.find('a').get("href") for video_tag in video_tags]
return links
except Exception as ex:
raise
diff --git a/src/parsers/base_parser.py b/src/parsers/base_parser.py
index 1adca12..0f9417f 100644
--- a/src/parsers/base_parser.py
+++ b/src/parsers/base_parser.py
@@ -2,7 +2,9 @@ import errno
import os
from loguru import logger
+from yt_dlp import download_range_func
+from src.core.config import DEFAULT_DURATION
from src.core.ydl import VideoDownloader
from src.exceptions.download_exceptions import FileAlreadyExistException
@@ -20,6 +22,7 @@ class BaseParser:
"logger": logger,
"merge_output_format": self.params["merge_output_format"],
'outtmpl': self.params["outtmpl"],
+ 'download_ranges': download_range_func(None, [(0, int(DEFAULT_DURATION))]),
# "quiet": True
}
downloader = VideoDownloader(link=self.params["link"], ydl_opts=ydl_opts)
@@ -29,14 +32,16 @@ class BaseParser:
resolution = downloader.info['resolution']
else:
resolution = "NA"
+
+ base_file_name = f"{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
if "Yahoo" in ydl_opts["outtmpl"]["default"]:
- path_to_video = f"Yahoo/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
+ path_to_video = f"Yahoo/{base_file_name}"
elif "ZenYandex" in ydl_opts["outtmpl"]["default"]:
- path_to_video = f"ZenYandex/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
+ path_to_video = f"ZenYandex/{base_file_name}"
elif "Bing" in ydl_opts["outtmpl"]["default"]:
- path_to_video = f"Bing/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
+ path_to_video = f"Bing/{base_file_name}"
else:
- path_to_video = f"{downloader.info['extractor_key']}/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
+ path_to_video = f"{downloader.info['extractor_key']}/{base_file_name}"
if os.path.exists(os.path.join(os.getcwd() + "/downloads/" + path_to_video)):
raise FileAlreadyExistException(message=path_to_video)
downloader.ydl_opts["quiet"] = False
diff --git a/src/web/main.py b/src/web/main.py
index abb62d5..161266c 100644
--- a/src/web/main.py
+++ b/src/web/main.py
@@ -1,16 +1,16 @@
import json
-import os
import uvicorn
import logging
from aio_pika import connect, Message, DeliveryMode
from fastapi import FastAPI, Request, Depends
from starlette.middleware.cors import CORSMiddleware
-from starlette.responses import JSONResponse, FileResponse, StreamingResponse
+from starlette.responses import JSONResponse, FileResponse, Response
from starlette.templating import Jinja2Templates
from src.core.redis_client import RedisClient
-from src.web.schemes.submit import SubmitIn, CheckIn
+from src.core.s3_client import S3Client
+from src.web.schemes.submit import SubmitIn, CheckIn, DeleteFromS3, CopyToAnotherBucketS3
app = FastAPI(
title="video_downloader", openapi_url=f"/api/v1/openapi.json"
@@ -82,6 +82,7 @@ async def index(request: Request):
@app.post('/submit')
async def get_url_for_download_video(request: Request, data: SubmitIn = Depends()):
red = RedisClient()
+ s3_client = S3Client()
task_done = await is_task_already_done_or_exist(red, data.link)
# TODO: где-то не обновился статус после выполнения\провала задачи
task_in_process = await is_task_in_process(red, data.link)
@@ -89,9 +90,15 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
return JSONResponse(status_code=202, content={"result": "Задача в работе. Ожидайте"})
if task_done:
if isinstance(task_done["result"], str):
- links_to_download_video = [str(request.base_url) + "get/?file_path=" + task_done["result"]]
+ file_name = task_done["result"][task_done["result"].index("dev/") + 4:task_done["result"].index("?")]
+ links_to_download_video = [s3_client.get(file_name)]
+ task_done["result"] = links_to_download_video[0]
else:
- links_to_download_video = [str(request.base_url) + "get/?file_path=" + path for path in task_done["result"]]
+ file_names = [task_done_part[task_done_part.index("dev/") + 4:task_done_part.index("?")] for
+ task_done_part in task_done["result"]]
+ links_to_download_video = [s3_client.get(file_name) for file_name in file_names]
+ task_done["result"] = links_to_download_video
+ await red.update_task_in_tasks_done(task_done, task_done["link"])
return JSONResponse({"result": links_to_download_video})
# TODO: учесть, что если делать запрос CURL\urllib3\etc, в теле может быть несколько ссылок -> должно быть создано несколько задач
@@ -104,6 +111,7 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
"format": f"bv[width={data.resolution.value}][ext={data.video_format.value}]+ba[ext={data.audio_format.value}]/"
f"bv[width={data.resolution.value}][ext=mp4]+ba[ext=m4a]/"
f"bv[width={data.resolution.value}][ext=webm]+ba[ext=webm]/"
+ f"best[width={data.resolution.value}]/"
f"best[ext={data.video_format.value}]/"
f"best[ext!=unknown_video]",
"merge_output_format": data.merge_output_format.value,
@@ -132,15 +140,10 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
@app.get('/get/', response_class=FileResponse, status_code=200)
async def download_video(file_path):
- base = os.path.dirname(os.path.dirname(os.path.abspath(file_path)))
- base_download_dir = os.path.join(base, os.pardir, os.pardir, "downloads")
-
- def iterfile():
- with open(base_download_dir + f'/{file_path}', mode="rb") as file_like:
- yield from file_like
-
- return StreamingResponse(iterfile(), headers={'Content-Disposition': f'inline; filename="{file_path}"'},
- media_type="video")
+ s3_client = S3Client()
+ file_response = s3_client.get(file_path)
+ return Response(content=file_response.data, headers={'Content-Disposition': f'inline; filename="{file_path}"'},
+ media_type="video")
@app.post('/check/', response_class=FileResponse, status_code=200)
@@ -168,9 +171,9 @@ async def download_video(data: CheckIn, request: Request):
content={"result": f"Задача выполнена с ошибкой, попробуйте загрузить еще раз"})
if tasks_done and data.link in tasks_done:
if isinstance(tasks_done[data.link]["result"], str):
- links_to_download_video = [str(request.base_url) + "get/?file_path=" + tasks_done[data.link]["result"]]
+ links_to_download_video = [tasks_done[data.link]["result"]]
else:
- links_to_download_video = [str(request.base_url) + "get/?file_path=" + path for path in
+ links_to_download_video = [link for link in
tasks_done[data.link]["result"]]
return JSONResponse({"result": links_to_download_video})
return JSONResponse(status_code=404, content={"result": "Задача не найдена"})
@@ -180,4 +183,25 @@ async def download_video(data: CheckIn, request: Request):
except Exception as ex:
print(ex)
+
+@app.delete('/from-s3/', status_code=200)
+async def delete_video_from_s3(delete_data: DeleteFromS3):
+ s3_client = S3Client()
+ s3_client.delete(delete_data.file_name)
+ return JSONResponse(
+ status_code=200,
+ content={"result": f"Файл {delete_data.file_name} успешно удален из корзины {s3_client.BUCKET_NAME}"}
+ )
+
+
+@app.post('/copy-to-another-bucket/', status_code=200)
+async def delete_video_from_s3(data: CopyToAnotherBucketS3):
+ s3_client = S3Client()
+ s3_client.copy_to_another_bucket(data.file_name, data.bucket_name)
+ return JSONResponse(
+ status_code=200,
+ content={"result": f"Файл {data.file_name} успешно скопирован в корзину {data.bucket_name}"}
+ )
+
+
uvicorn.run("src.web.main:app", host="0.0.0.0", log_level="info")
diff --git a/src/web/schemes/submit.py b/src/web/schemes/submit.py
index 629b69f..abc88d0 100644
--- a/src/web/schemes/submit.py
+++ b/src/web/schemes/submit.py
@@ -50,11 +50,20 @@ class MergeOutputFormatEnum(Enum):
@dataclass
class SubmitIn:
link: str = Form(...)
- video_format: VideoFormatEnum = Form(default=MergeOutputFormatEnum.format_webm)
- audio_format: AudioFormatEnum = Form(default=AudioFormatEnum.format_webm)
- resolution: ResolutionEnum = Form(default=ResolutionEnum.resolution_1080)
+ video_format: VideoFormatEnum = Form(default=MergeOutputFormatEnum.format_mp4)
+ audio_format: AudioFormatEnum = Form(default=AudioFormatEnum.format_m4a)
+ resolution: ResolutionEnum = Form(default=ResolutionEnum.resolution_720)
merge_output_format: MergeOutputFormatEnum = Form(default=MergeOutputFormatEnum.format_mkv)
class CheckIn(BaseModel):
link: str
+
+
+class DeleteFromS3(BaseModel):
+ file_name: str
+
+
+class CopyToAnotherBucketS3(BaseModel):
+ file_name: str
+ bucket_name: str
diff --git a/src/web/templates/index.html b/src/web/templates/index.html
index a965b0a..96ac597 100644
--- a/src/web/templates/index.html
+++ b/src/web/templates/index.html
@@ -77,9 +77,9 @@
-
+
-
+
Формат аудио
@@ -87,9 +87,9 @@
-
+
-
+
@@ -100,8 +100,8 @@
-
-
+
+