added s3_client, refactored web and master services

This commit is contained in:
garickbadalov 2023-11-21 22:21:00 +03:00
parent 0b68675504
commit f1fdcf8fb4
12 changed files with 186 additions and 67 deletions

16
poetry.lock generated
View File

@ -1680,6 +1680,20 @@ files = [
[package.dependencies]
six = ">=1.5"
[[package]]
name = "python-dotenv"
version = "1.0.0"
description = "Read key-value pairs from a .env file and set them as environment variables"
optional = false
python-versions = ">=3.8"
files = [
{file = "python-dotenv-1.0.0.tar.gz", hash = "sha256:a8df96034aae6d2d50a4ebe8216326c61c3eb64836776504fcca410e5937a3ba"},
{file = "python_dotenv-1.0.0-py3-none-any.whl", hash = "sha256:f5971a9226b701070a4bf2c38c89e5a3f0d64de8debda981d1db98583009122a"},
]
[package.extras]
cli = ["click (>=5.0)"]
[[package]]
name = "python-multipart"
version = "0.0.6"
@ -2327,4 +2341,4 @@ websockets = "*"
[metadata]
lock-version = "2.0"
python-versions = "^3.11"
content-hash = "272fe31fba150b0b0fcca1b7d60f706dc2a05ea730ef19e34ccb8e5524f47d66"
content-hash = "b7973dc522b312b75a798bc966c5001b24e134cccd332de7b978e5a1ec495b57"

View File

@ -36,6 +36,7 @@ ply = "3.11"
ruamel-yaml = "0.17.21"
flask-login = "0.6.2"
pycryptodome = "3.18.0"
python-dotenv = "^1.0.0"
[build-system]

13
src/core/config.py Normal file
View File

@ -0,0 +1,13 @@
import os
from dotenv import load_dotenv
load_dotenv()
S3_HOST = os.environ.get("S3_HOST", "s3-api.grfc.ru")
S3_ACCESS_KEY = os.environ.get("S3_ACCESS_KEY", "cl-i-oculus-dev1")
S3_SECRET_KEY = os.environ.get("S3_SECRET_KEY", "Nom8qKEU6IYtQSrNt5ZPN1XncQTZdtUM")
S3_BUCKET_NAME = os.environ.get("S3_BUCKET_NAME", "clean-internet-oculus-integration-dev")
DEFAULT_DURATION = os.environ.get("DEFAULT_DURATION", 600)

View File

@ -1,6 +1,7 @@
import asyncio
import concurrent.futures as pool
import json
import os
import subprocess
import traceback
@ -12,6 +13,7 @@ from src.core.async_queue import AsyncQueue
from src.core.rabbitmq import get_messages, publish_message_with_task_done
from src.core.redis_client import RedisClient
from src.core.result import Result, ResultTypeEnum
from src.core.s3_client import S3Client
from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException
from src.parsers.MyMail.my_mail_parser import MyMailParser
from src.parsers.Yappy.yappy_parser import YappyParser
@ -50,7 +52,10 @@ class MasterService:
await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
async def result_processing(self, result: Result | list, redis: RedisClient, video_params: dict):
async def result_processing(self, result: Result | list, redis: RedisClient, s3_client: S3Client, video_params: dict):
file_path = os.path.join(os.getcwd() + "/downloads/")
links_to_download = s3_client.upload(file_name=result.value["result"], file_path=file_path)
result.value["result"] = links_to_download
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, link=video_params["link"])
await publish_message_with_task_done(task=result.value)
self.queue.task_done()
@ -59,6 +64,7 @@ class MasterService:
while True:
video_params = await self.queue.get()
redis = RedisClient()
s3_client = S3Client()
await redis.del_task_from_queue_and_add_to_tasks(link=video_params["link"], task=video_params)
self.currently_underway[video_params['link']] = video_params
@ -67,7 +73,7 @@ class MasterService:
))
result: Result = await download_task
await self.result_processing(result, redis, video_params)
await self.result_processing(result, redis, s3_client, video_params)
if video_params['link'] in self.currently_underway:
del self.currently_underway[video_params['link']]

View File

@ -46,3 +46,7 @@ class RedisClient:
async with self.connection as connection:
res = await connection.delete(self.TASKS_NAME)
return res
async def update_task_in_tasks_done(self, task: dict | list, link: str) -> int:
await self._del_task(self.TASKS_DONE_NAME, link)
return await self._set_task(self.TASKS_DONE_NAME, link, task)

66
src/core/s3_client.py Normal file
View File

@ -0,0 +1,66 @@
from loguru import logger
from minio import Minio
from minio.commonconfig import CopySource
from src.core.config import S3_HOST, S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET_NAME
class S3Client:
HOST = S3_HOST
ACCESS_KEY = S3_ACCESS_KEY
SECRET_KEY = S3_SECRET_KEY
BUCKET_NAME = S3_BUCKET_NAME
def __init__(self):
self.client = Minio(
self.HOST,
access_key=self.ACCESS_KEY,
secret_key=self.SECRET_KEY,
secure=True
)
def _make_sure_bucket_exist(self):
found = self.client.bucket_exists(self.BUCKET_NAME)
if not found:
self.client.make_bucket(self.BUCKET_NAME)
else:
logger.info(f"Bucket {self.BUCKET_NAME} already exists")
def upload(self, file_name: str, file_path: str | list[str]):
self._make_sure_bucket_exist()
if isinstance(file_name, str):
file_path = file_path + file_name
self.client.fput_object(self.BUCKET_NAME, file_name, file_path)
logger.info(f"{file_path} is successfully uploaded as object {file_name} to bucket {self.BUCKET_NAME}.")
link_to_download = self.get(file_name)
return link_to_download
else:
result = []
for file_name_part in file_name:
current_file_path = file_path + file_name_part
self.client.fput_object(self.BUCKET_NAME, file_name_part, current_file_path)
logger.info(f"{current_file_path} is successfully uploaded as object {file_name_part} to bucket {self.BUCKET_NAME}.")
link_to_download = self.get(file_name_part)
result.append(link_to_download)
return result
def get(self, file_name: str):
self._make_sure_bucket_exist()
result = self.client.get_presigned_url(
"GET",
self.BUCKET_NAME,
file_name,
)
return result
def delete(self, file_name: str):
result = self.client.remove_object(self.BUCKET_NAME, file_name)
return result
def copy_to_another_bucket(self, file_name: str, bucket_name):
result = self.client.copy_object(
bucket_name,
file_name,
CopySource(self.BUCKET_NAME, file_name),
)
return result

View File

@ -1,32 +0,0 @@
from minio import Minio
from minio.error import S3Error
def main():
client = Minio(
"grfc.ru",
access_key="cl-i-oculus-dev1",
secret_key="Nom8qKEU6IYtQSrNt5ZPN1XncQTZdtUM",
secure=True
)
found = client.bucket_exists("clean-internet-oculus-integration-dev")
if not found:
client.make_bucket("clean-internet-oculus-integration-dev")
else:
print("Bucket 'clean-internet-oculus-integration-dev' already exists")
client.fput_object(
"clean-internet-oculus-integration-dev", "4uv2GNc_ybc_1080p.mp4", "/Users/garickbadalov/PycharmProjects/video_downloader_service/downloads/Youtube/4uv2GNc_ybc_1080p.mp4",
)
print(
"'/Users/garickbadalov/PycharmProjects/video_downloader_service/downloads/Youtube/4uv2GNc_ybc_1080p.mp4' is successfully uploaded as "
"object '4uv2GNc_ybc_1080p.mp4' to bucket 'clean-internet-oculus-integration-dev'."
)
if __name__ == "__main__":
try:
main()
except S3Error as exc:
print("error occurred.", exc)

View File

@ -1,8 +1,10 @@
import os
import re
import requests
from bs4 import BeautifulSoup
from lxml import etree
from src.exceptions.download_exceptions import FileAlreadyExistException
from src.parsers.base_parser import BaseParser
@ -16,9 +18,16 @@ class OkParser(BaseParser):
resp = requests.get(self.params["link"])
resp.encoding = self.BASE_ENCODING
soup = BeautifulSoup(resp.text, 'lxml')
required_div = [div for div in soup.find_all('div', {'class': 'invisible'}) if len(div['class']) < 2][0]
video_tags = required_div.find('span').find_all_next('span', {'itemprop': "video"})
links = [video_tag.find('a').get("href") for video_tag in video_tags]
if "topic" in self.params["link"]:
dom = etree.HTML(str(soup))
elements_with_video_id = dom.xpath(
"//div[@class='mlr_cnt']/div[contains(@data-l, 'gA,VIDEO,mB,movie,ti,')]/div[@class='vid-card "
"vid-card__xl']/div[@class='video-card_n-w']/a[contains(@onclick, 'OK.VideoPlayer.openMovie')]")
links = ["https://ok.ru/video/" + re.findall('\d+', elem.get("onclick"))[0] for elem in elements_with_video_id]
else:
required_div = [div for div in soup.find_all('div', {'class': 'invisible'}) if len(div['class']) < 2][0]
video_tags = required_div.find('span').find_all_next('span', {'itemprop': "video"})
links = [video_tag.find('a').get("href") for video_tag in video_tags]
return links
except Exception as ex:
raise

View File

@ -2,7 +2,9 @@ import errno
import os
from loguru import logger
from yt_dlp import download_range_func
from src.core.config import DEFAULT_DURATION
from src.core.ydl import VideoDownloader
from src.exceptions.download_exceptions import FileAlreadyExistException
@ -20,6 +22,7 @@ class BaseParser:
"logger": logger,
"merge_output_format": self.params["merge_output_format"],
'outtmpl': self.params["outtmpl"],
'download_ranges': download_range_func(None, [(0, int(DEFAULT_DURATION))]),
# "quiet": True
}
downloader = VideoDownloader(link=self.params["link"], ydl_opts=ydl_opts)
@ -29,14 +32,16 @@ class BaseParser:
resolution = downloader.info['resolution']
else:
resolution = "NA"
base_file_name = f"{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
if "Yahoo" in ydl_opts["outtmpl"]["default"]:
path_to_video = f"Yahoo/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
path_to_video = f"Yahoo/{base_file_name}"
elif "ZenYandex" in ydl_opts["outtmpl"]["default"]:
path_to_video = f"ZenYandex/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
path_to_video = f"ZenYandex/{base_file_name}"
elif "Bing" in ydl_opts["outtmpl"]["default"]:
path_to_video = f"Bing/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
path_to_video = f"Bing/{base_file_name}"
else:
path_to_video = f"{downloader.info['extractor_key']}/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
path_to_video = f"{downloader.info['extractor_key']}/{base_file_name}"
if os.path.exists(os.path.join(os.getcwd() + "/downloads/" + path_to_video)):
raise FileAlreadyExistException(message=path_to_video)
downloader.ydl_opts["quiet"] = False

View File

@ -1,16 +1,16 @@
import json
import os
import uvicorn
import logging
from aio_pika import connect, Message, DeliveryMode
from fastapi import FastAPI, Request, Depends
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import JSONResponse, FileResponse, StreamingResponse
from starlette.responses import JSONResponse, FileResponse, Response
from starlette.templating import Jinja2Templates
from src.core.redis_client import RedisClient
from src.web.schemes.submit import SubmitIn, CheckIn
from src.core.s3_client import S3Client
from src.web.schemes.submit import SubmitIn, CheckIn, DeleteFromS3, CopyToAnotherBucketS3
app = FastAPI(
title="video_downloader", openapi_url=f"/api/v1/openapi.json"
@ -82,6 +82,7 @@ async def index(request: Request):
@app.post('/submit')
async def get_url_for_download_video(request: Request, data: SubmitIn = Depends()):
red = RedisClient()
s3_client = S3Client()
task_done = await is_task_already_done_or_exist(red, data.link)
# TODO: где-то не обновился статус после выполнения\провала задачи
task_in_process = await is_task_in_process(red, data.link)
@ -89,9 +90,15 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
return JSONResponse(status_code=202, content={"result": "Задача в работе. Ожидайте"})
if task_done:
if isinstance(task_done["result"], str):
links_to_download_video = [str(request.base_url) + "get/?file_path=" + task_done["result"]]
file_name = task_done["result"][task_done["result"].index("dev/") + 4:task_done["result"].index("?")]
links_to_download_video = [s3_client.get(file_name)]
task_done["result"] = links_to_download_video[0]
else:
links_to_download_video = [str(request.base_url) + "get/?file_path=" + path for path in task_done["result"]]
file_names = [task_done_part[task_done_part.index("dev/") + 4:task_done_part.index("?")] for
task_done_part in task_done["result"]]
links_to_download_video = [s3_client.get(file_name) for file_name in file_names]
task_done["result"] = links_to_download_video
await red.update_task_in_tasks_done(task_done, task_done["link"])
return JSONResponse({"result": links_to_download_video})
# TODO: учесть, что если делать запрос CURL\urllib3\etc, в теле может быть несколько ссылок -> должно быть создано несколько задач
@ -104,6 +111,7 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
"format": f"bv[width={data.resolution.value}][ext={data.video_format.value}]+ba[ext={data.audio_format.value}]/"
f"bv[width={data.resolution.value}][ext=mp4]+ba[ext=m4a]/"
f"bv[width={data.resolution.value}][ext=webm]+ba[ext=webm]/"
f"best[width={data.resolution.value}]/"
f"best[ext={data.video_format.value}]/"
f"best[ext!=unknown_video]",
"merge_output_format": data.merge_output_format.value,
@ -132,15 +140,10 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
@app.get('/get/', response_class=FileResponse, status_code=200)
async def download_video(file_path):
base = os.path.dirname(os.path.dirname(os.path.abspath(file_path)))
base_download_dir = os.path.join(base, os.pardir, os.pardir, "downloads")
def iterfile():
with open(base_download_dir + f'/{file_path}', mode="rb") as file_like:
yield from file_like
return StreamingResponse(iterfile(), headers={'Content-Disposition': f'inline; filename="{file_path}"'},
media_type="video")
s3_client = S3Client()
file_response = s3_client.get(file_path)
return Response(content=file_response.data, headers={'Content-Disposition': f'inline; filename="{file_path}"'},
media_type="video")
@app.post('/check/', response_class=FileResponse, status_code=200)
@ -168,9 +171,9 @@ async def download_video(data: CheckIn, request: Request):
content={"result": f"Задача выполнена с ошибкой, попробуйте загрузить еще раз"})
if tasks_done and data.link in tasks_done:
if isinstance(tasks_done[data.link]["result"], str):
links_to_download_video = [str(request.base_url) + "get/?file_path=" + tasks_done[data.link]["result"]]
links_to_download_video = [tasks_done[data.link]["result"]]
else:
links_to_download_video = [str(request.base_url) + "get/?file_path=" + path for path in
links_to_download_video = [link for link in
tasks_done[data.link]["result"]]
return JSONResponse({"result": links_to_download_video})
return JSONResponse(status_code=404, content={"result": "Задача не найдена"})
@ -180,4 +183,25 @@ async def download_video(data: CheckIn, request: Request):
except Exception as ex:
print(ex)
@app.delete('/from-s3/', status_code=200)
async def delete_video_from_s3(delete_data: DeleteFromS3):
s3_client = S3Client()
s3_client.delete(delete_data.file_name)
return JSONResponse(
status_code=200,
content={"result": f"Файл {delete_data.file_name} успешно удален из корзины {s3_client.BUCKET_NAME}"}
)
@app.post('/copy-to-another-bucket/', status_code=200)
async def delete_video_from_s3(data: CopyToAnotherBucketS3):
s3_client = S3Client()
s3_client.copy_to_another_bucket(data.file_name, data.bucket_name)
return JSONResponse(
status_code=200,
content={"result": f"Файл {data.file_name} успешно скопирован в корзину {data.bucket_name}"}
)
uvicorn.run("src.web.main:app", host="0.0.0.0", log_level="info")

View File

@ -50,11 +50,20 @@ class MergeOutputFormatEnum(Enum):
@dataclass
class SubmitIn:
link: str = Form(...)
video_format: VideoFormatEnum = Form(default=MergeOutputFormatEnum.format_webm)
audio_format: AudioFormatEnum = Form(default=AudioFormatEnum.format_webm)
resolution: ResolutionEnum = Form(default=ResolutionEnum.resolution_1080)
video_format: VideoFormatEnum = Form(default=MergeOutputFormatEnum.format_mp4)
audio_format: AudioFormatEnum = Form(default=AudioFormatEnum.format_m4a)
resolution: ResolutionEnum = Form(default=ResolutionEnum.resolution_720)
merge_output_format: MergeOutputFormatEnum = Form(default=MergeOutputFormatEnum.format_mkv)
class CheckIn(BaseModel):
link: str
class DeleteFromS3(BaseModel):
file_name: str
class CopyToAnotherBucketS3(BaseModel):
file_name: str
bucket_name: str

View File

@ -77,9 +77,9 @@
<option hidden>Выбор формата видео</option>
<option value="3gp">3gp</option>
<option value="flv">flv</option>
<option value="mp4">mp4</option>
<option value="mp4" selected>mp4</option>
<option value="mov">mov</option>
<option value="webm" selected>webm</option>
<option value="webm">webm</option>
</select >
</p>
<p>Формат аудио
@ -87,9 +87,9 @@
<option hidden>Выбор формата аудио</option>
<option value="mp3">mp3</option>
<option value="ogg">ogg</option>
<option value="m4a">m4a</option>
<option value="m4a" selected>m4a</option>
<option value="opus">opus</option>
<option value="webm" selected>webm</option>
<option value="webm">webm</option>
<option value="wav">wav</option>
<option value="aac">aac</option>
</select >
@ -100,8 +100,8 @@
<option value="240">240</option>
<option value="360">360</option>
<option value="480" >480</option>
<option value="720">720</option>
<option value="1080" selected>1080</option>
<option value="720" selected>720</option>
<option value="1080">1080</option>
<option value="2048">2048</option>
<option value="3840">3840</option>
</select >