added s3_client, refactored web and master services
This commit is contained in:
parent
146ed6792f
commit
d22f976d2f
16
poetry.lock
generated
16
poetry.lock
generated
@ -1680,6 +1680,20 @@ files = [
|
||||
[package.dependencies]
|
||||
six = ">=1.5"
|
||||
|
||||
[[package]]
|
||||
name = "python-dotenv"
|
||||
version = "1.0.0"
|
||||
description = "Read key-value pairs from a .env file and set them as environment variables"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "python-dotenv-1.0.0.tar.gz", hash = "sha256:a8df96034aae6d2d50a4ebe8216326c61c3eb64836776504fcca410e5937a3ba"},
|
||||
{file = "python_dotenv-1.0.0-py3-none-any.whl", hash = "sha256:f5971a9226b701070a4bf2c38c89e5a3f0d64de8debda981d1db98583009122a"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
cli = ["click (>=5.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "python-multipart"
|
||||
version = "0.0.6"
|
||||
@ -2327,4 +2341,4 @@ websockets = "*"
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.11"
|
||||
content-hash = "272fe31fba150b0b0fcca1b7d60f706dc2a05ea730ef19e34ccb8e5524f47d66"
|
||||
content-hash = "b7973dc522b312b75a798bc966c5001b24e134cccd332de7b978e5a1ec495b57"
|
||||
|
@ -36,6 +36,7 @@ ply = "3.11"
|
||||
ruamel-yaml = "0.17.21"
|
||||
flask-login = "0.6.2"
|
||||
pycryptodome = "3.18.0"
|
||||
python-dotenv = "^1.0.0"
|
||||
|
||||
|
||||
[build-system]
|
||||
|
13
src/core/config.py
Normal file
13
src/core/config.py
Normal file
@ -0,0 +1,13 @@
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
S3_HOST = os.environ.get("S3_HOST", "s3-api.grfc.ru")
|
||||
S3_ACCESS_KEY = os.environ.get("S3_ACCESS_KEY", "cl-i-oculus-dev1")
|
||||
S3_SECRET_KEY = os.environ.get("S3_SECRET_KEY", "Nom8qKEU6IYtQSrNt5ZPN1XncQTZdtUM")
|
||||
S3_BUCKET_NAME = os.environ.get("S3_BUCKET_NAME", "clean-internet-oculus-integration-dev")
|
||||
DEFAULT_DURATION = os.environ.get("DEFAULT_DURATION", 600)
|
@ -1,6 +1,7 @@
|
||||
import asyncio
|
||||
import concurrent.futures as pool
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import traceback
|
||||
|
||||
@ -12,6 +13,7 @@ from src.core.async_queue import AsyncQueue
|
||||
from src.core.rabbitmq import get_messages, publish_message_with_task_done
|
||||
from src.core.redis_client import RedisClient
|
||||
from src.core.result import Result, ResultTypeEnum
|
||||
from src.core.s3_client import S3Client
|
||||
from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException
|
||||
from src.parsers.MyMail.my_mail_parser import MyMailParser
|
||||
from src.parsers.Yappy.yappy_parser import YappyParser
|
||||
@ -50,7 +52,10 @@ class MasterService:
|
||||
|
||||
await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
|
||||
|
||||
async def result_processing(self, result: Result | list, redis: RedisClient, video_params: dict):
|
||||
async def result_processing(self, result: Result | list, redis: RedisClient, s3_client: S3Client, video_params: dict):
|
||||
file_path = os.path.join(os.getcwd() + "/downloads/")
|
||||
links_to_download = s3_client.upload(file_name=result.value["result"], file_path=file_path)
|
||||
result.value["result"] = links_to_download
|
||||
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, link=video_params["link"])
|
||||
await publish_message_with_task_done(task=result.value)
|
||||
self.queue.task_done()
|
||||
@ -59,6 +64,7 @@ class MasterService:
|
||||
while True:
|
||||
video_params = await self.queue.get()
|
||||
redis = RedisClient()
|
||||
s3_client = S3Client()
|
||||
await redis.del_task_from_queue_and_add_to_tasks(link=video_params["link"], task=video_params)
|
||||
self.currently_underway[video_params['link']] = video_params
|
||||
|
||||
@ -67,7 +73,7 @@ class MasterService:
|
||||
))
|
||||
|
||||
result: Result = await download_task
|
||||
await self.result_processing(result, redis, video_params)
|
||||
await self.result_processing(result, redis, s3_client, video_params)
|
||||
|
||||
if video_params['link'] in self.currently_underway:
|
||||
del self.currently_underway[video_params['link']]
|
||||
|
@ -46,3 +46,7 @@ class RedisClient:
|
||||
async with self.connection as connection:
|
||||
res = await connection.delete(self.TASKS_NAME)
|
||||
return res
|
||||
|
||||
async def update_task_in_tasks_done(self, task: dict | list, link: str) -> int:
|
||||
await self._del_task(self.TASKS_DONE_NAME, link)
|
||||
return await self._set_task(self.TASKS_DONE_NAME, link, task)
|
||||
|
66
src/core/s3_client.py
Normal file
66
src/core/s3_client.py
Normal file
@ -0,0 +1,66 @@
|
||||
from loguru import logger
|
||||
from minio import Minio
|
||||
from minio.commonconfig import CopySource
|
||||
|
||||
from src.core.config import S3_HOST, S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET_NAME
|
||||
|
||||
|
||||
class S3Client:
|
||||
HOST = S3_HOST
|
||||
ACCESS_KEY = S3_ACCESS_KEY
|
||||
SECRET_KEY = S3_SECRET_KEY
|
||||
BUCKET_NAME = S3_BUCKET_NAME
|
||||
|
||||
def __init__(self):
|
||||
self.client = Minio(
|
||||
self.HOST,
|
||||
access_key=self.ACCESS_KEY,
|
||||
secret_key=self.SECRET_KEY,
|
||||
secure=True
|
||||
)
|
||||
|
||||
def _make_sure_bucket_exist(self):
|
||||
found = self.client.bucket_exists(self.BUCKET_NAME)
|
||||
if not found:
|
||||
self.client.make_bucket(self.BUCKET_NAME)
|
||||
else:
|
||||
logger.info(f"Bucket {self.BUCKET_NAME} already exists")
|
||||
|
||||
def upload(self, file_name: str, file_path: str | list[str]):
|
||||
self._make_sure_bucket_exist()
|
||||
if isinstance(file_name, str):
|
||||
file_path = file_path + file_name
|
||||
self.client.fput_object(self.BUCKET_NAME, file_name, file_path)
|
||||
logger.info(f"{file_path} is successfully uploaded as object {file_name} to bucket {self.BUCKET_NAME}.")
|
||||
link_to_download = self.get(file_name)
|
||||
return link_to_download
|
||||
else:
|
||||
result = []
|
||||
for file_name_part in file_name:
|
||||
current_file_path = file_path + file_name_part
|
||||
self.client.fput_object(self.BUCKET_NAME, file_name_part, current_file_path)
|
||||
logger.info(f"{current_file_path} is successfully uploaded as object {file_name_part} to bucket {self.BUCKET_NAME}.")
|
||||
link_to_download = self.get(file_name_part)
|
||||
result.append(link_to_download)
|
||||
return result
|
||||
|
||||
def get(self, file_name: str):
|
||||
self._make_sure_bucket_exist()
|
||||
result = self.client.get_presigned_url(
|
||||
"GET",
|
||||
self.BUCKET_NAME,
|
||||
file_name,
|
||||
)
|
||||
return result
|
||||
|
||||
def delete(self, file_name: str):
|
||||
result = self.client.remove_object(self.BUCKET_NAME, file_name)
|
||||
return result
|
||||
|
||||
def copy_to_another_bucket(self, file_name: str, bucket_name):
|
||||
result = self.client.copy_object(
|
||||
bucket_name,
|
||||
file_name,
|
||||
CopySource(self.BUCKET_NAME, file_name),
|
||||
)
|
||||
return result
|
@ -1,32 +0,0 @@
|
||||
from minio import Minio
|
||||
from minio.error import S3Error
|
||||
|
||||
|
||||
def main():
|
||||
client = Minio(
|
||||
"grfc.ru",
|
||||
access_key="cl-i-oculus-dev1",
|
||||
secret_key="Nom8qKEU6IYtQSrNt5ZPN1XncQTZdtUM",
|
||||
secure=True
|
||||
)
|
||||
|
||||
found = client.bucket_exists("clean-internet-oculus-integration-dev")
|
||||
if not found:
|
||||
client.make_bucket("clean-internet-oculus-integration-dev")
|
||||
else:
|
||||
print("Bucket 'clean-internet-oculus-integration-dev' already exists")
|
||||
|
||||
client.fput_object(
|
||||
"clean-internet-oculus-integration-dev", "4uv2GNc_ybc_1080p.mp4", "/Users/garickbadalov/PycharmProjects/video_downloader_service/downloads/Youtube/4uv2GNc_ybc_1080p.mp4",
|
||||
)
|
||||
print(
|
||||
"'/Users/garickbadalov/PycharmProjects/video_downloader_service/downloads/Youtube/4uv2GNc_ybc_1080p.mp4' is successfully uploaded as "
|
||||
"object '4uv2GNc_ybc_1080p.mp4' to bucket 'clean-internet-oculus-integration-dev'."
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
main()
|
||||
except S3Error as exc:
|
||||
print("error occurred.", exc)
|
@ -1,8 +1,10 @@
|
||||
import os
|
||||
import re
|
||||
|
||||
import requests
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
from lxml import etree
|
||||
|
||||
from src.exceptions.download_exceptions import FileAlreadyExistException
|
||||
from src.parsers.base_parser import BaseParser
|
||||
@ -16,9 +18,16 @@ class OkParser(BaseParser):
|
||||
resp = requests.get(self.params["link"])
|
||||
resp.encoding = self.BASE_ENCODING
|
||||
soup = BeautifulSoup(resp.text, 'lxml')
|
||||
required_div = [div for div in soup.find_all('div', {'class': 'invisible'}) if len(div['class']) < 2][0]
|
||||
video_tags = required_div.find('span').find_all_next('span', {'itemprop': "video"})
|
||||
links = [video_tag.find('a').get("href") for video_tag in video_tags]
|
||||
if "topic" in self.params["link"]:
|
||||
dom = etree.HTML(str(soup))
|
||||
elements_with_video_id = dom.xpath(
|
||||
"//div[@class='mlr_cnt']/div[contains(@data-l, 'gA,VIDEO,mB,movie,ti,')]/div[@class='vid-card "
|
||||
"vid-card__xl']/div[@class='video-card_n-w']/a[contains(@onclick, 'OK.VideoPlayer.openMovie')]")
|
||||
links = ["https://ok.ru/video/" + re.findall('\d+', elem.get("onclick"))[0] for elem in elements_with_video_id]
|
||||
else:
|
||||
required_div = [div for div in soup.find_all('div', {'class': 'invisible'}) if len(div['class']) < 2][0]
|
||||
video_tags = required_div.find('span').find_all_next('span', {'itemprop': "video"})
|
||||
links = [video_tag.find('a').get("href") for video_tag in video_tags]
|
||||
return links
|
||||
except Exception as ex:
|
||||
raise
|
||||
|
@ -2,7 +2,9 @@ import errno
|
||||
import os
|
||||
|
||||
from loguru import logger
|
||||
from yt_dlp import download_range_func
|
||||
|
||||
from src.core.config import DEFAULT_DURATION
|
||||
from src.core.ydl import VideoDownloader
|
||||
from src.exceptions.download_exceptions import FileAlreadyExistException
|
||||
|
||||
@ -20,6 +22,7 @@ class BaseParser:
|
||||
"logger": logger,
|
||||
"merge_output_format": self.params["merge_output_format"],
|
||||
'outtmpl': self.params["outtmpl"],
|
||||
'download_ranges': download_range_func(None, [(0, int(DEFAULT_DURATION))]),
|
||||
# "quiet": True
|
||||
}
|
||||
downloader = VideoDownloader(link=self.params["link"], ydl_opts=ydl_opts)
|
||||
@ -29,14 +32,16 @@ class BaseParser:
|
||||
resolution = downloader.info['resolution']
|
||||
else:
|
||||
resolution = "NA"
|
||||
|
||||
base_file_name = f"{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
|
||||
if "Yahoo" in ydl_opts["outtmpl"]["default"]:
|
||||
path_to_video = f"Yahoo/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
|
||||
path_to_video = f"Yahoo/{base_file_name}"
|
||||
elif "ZenYandex" in ydl_opts["outtmpl"]["default"]:
|
||||
path_to_video = f"ZenYandex/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
|
||||
path_to_video = f"ZenYandex/{base_file_name}"
|
||||
elif "Bing" in ydl_opts["outtmpl"]["default"]:
|
||||
path_to_video = f"Bing/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
|
||||
path_to_video = f"Bing/{base_file_name}"
|
||||
else:
|
||||
path_to_video = f"{downloader.info['extractor_key']}/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
|
||||
path_to_video = f"{downloader.info['extractor_key']}/{base_file_name}"
|
||||
if os.path.exists(os.path.join(os.getcwd() + "/downloads/" + path_to_video)):
|
||||
raise FileAlreadyExistException(message=path_to_video)
|
||||
downloader.ydl_opts["quiet"] = False
|
||||
|
@ -1,16 +1,16 @@
|
||||
import json
|
||||
import os
|
||||
|
||||
import uvicorn
|
||||
import logging
|
||||
from aio_pika import connect, Message, DeliveryMode
|
||||
from fastapi import FastAPI, Request, Depends
|
||||
from starlette.middleware.cors import CORSMiddleware
|
||||
from starlette.responses import JSONResponse, FileResponse, StreamingResponse
|
||||
from starlette.responses import JSONResponse, FileResponse, Response
|
||||
from starlette.templating import Jinja2Templates
|
||||
|
||||
from src.core.redis_client import RedisClient
|
||||
from src.web.schemes.submit import SubmitIn, CheckIn
|
||||
from src.core.s3_client import S3Client
|
||||
from src.web.schemes.submit import SubmitIn, CheckIn, DeleteFromS3, CopyToAnotherBucketS3
|
||||
|
||||
app = FastAPI(
|
||||
title="video_downloader", openapi_url=f"/api/v1/openapi.json"
|
||||
@ -82,6 +82,7 @@ async def index(request: Request):
|
||||
@app.post('/submit')
|
||||
async def get_url_for_download_video(request: Request, data: SubmitIn = Depends()):
|
||||
red = RedisClient()
|
||||
s3_client = S3Client()
|
||||
task_done = await is_task_already_done_or_exist(red, data.link)
|
||||
# TODO: где-то не обновился статус после выполнения\провала задачи
|
||||
task_in_process = await is_task_in_process(red, data.link)
|
||||
@ -89,9 +90,15 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
|
||||
return JSONResponse(status_code=202, content={"result": "Задача в работе. Ожидайте"})
|
||||
if task_done:
|
||||
if isinstance(task_done["result"], str):
|
||||
links_to_download_video = [str(request.base_url) + "get/?file_path=" + task_done["result"]]
|
||||
file_name = task_done["result"][task_done["result"].index("dev/") + 4:task_done["result"].index("?")]
|
||||
links_to_download_video = [s3_client.get(file_name)]
|
||||
task_done["result"] = links_to_download_video[0]
|
||||
else:
|
||||
links_to_download_video = [str(request.base_url) + "get/?file_path=" + path for path in task_done["result"]]
|
||||
file_names = [task_done_part[task_done_part.index("dev/") + 4:task_done_part.index("?")] for
|
||||
task_done_part in task_done["result"]]
|
||||
links_to_download_video = [s3_client.get(file_name) for file_name in file_names]
|
||||
task_done["result"] = links_to_download_video
|
||||
await red.update_task_in_tasks_done(task_done, task_done["link"])
|
||||
return JSONResponse({"result": links_to_download_video})
|
||||
|
||||
# TODO: учесть, что если делать запрос CURL\urllib3\etc, в теле может быть несколько ссылок -> должно быть создано несколько задач
|
||||
@ -104,6 +111,7 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
|
||||
"format": f"bv[width={data.resolution.value}][ext={data.video_format.value}]+ba[ext={data.audio_format.value}]/"
|
||||
f"bv[width={data.resolution.value}][ext=mp4]+ba[ext=m4a]/"
|
||||
f"bv[width={data.resolution.value}][ext=webm]+ba[ext=webm]/"
|
||||
f"best[width={data.resolution.value}]/"
|
||||
f"best[ext={data.video_format.value}]/"
|
||||
f"best[ext!=unknown_video]",
|
||||
"merge_output_format": data.merge_output_format.value,
|
||||
@ -132,15 +140,10 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
|
||||
|
||||
@app.get('/get/', response_class=FileResponse, status_code=200)
|
||||
async def download_video(file_path):
|
||||
base = os.path.dirname(os.path.dirname(os.path.abspath(file_path)))
|
||||
base_download_dir = os.path.join(base, os.pardir, os.pardir, "downloads")
|
||||
|
||||
def iterfile():
|
||||
with open(base_download_dir + f'/{file_path}', mode="rb") as file_like:
|
||||
yield from file_like
|
||||
|
||||
return StreamingResponse(iterfile(), headers={'Content-Disposition': f'inline; filename="{file_path}"'},
|
||||
media_type="video")
|
||||
s3_client = S3Client()
|
||||
file_response = s3_client.get(file_path)
|
||||
return Response(content=file_response.data, headers={'Content-Disposition': f'inline; filename="{file_path}"'},
|
||||
media_type="video")
|
||||
|
||||
|
||||
@app.post('/check/', response_class=FileResponse, status_code=200)
|
||||
@ -168,9 +171,9 @@ async def download_video(data: CheckIn, request: Request):
|
||||
content={"result": f"Задача выполнена с ошибкой, попробуйте загрузить еще раз"})
|
||||
if tasks_done and data.link in tasks_done:
|
||||
if isinstance(tasks_done[data.link]["result"], str):
|
||||
links_to_download_video = [str(request.base_url) + "get/?file_path=" + tasks_done[data.link]["result"]]
|
||||
links_to_download_video = [tasks_done[data.link]["result"]]
|
||||
else:
|
||||
links_to_download_video = [str(request.base_url) + "get/?file_path=" + path for path in
|
||||
links_to_download_video = [link for link in
|
||||
tasks_done[data.link]["result"]]
|
||||
return JSONResponse({"result": links_to_download_video})
|
||||
return JSONResponse(status_code=404, content={"result": "Задача не найдена"})
|
||||
@ -180,4 +183,25 @@ async def download_video(data: CheckIn, request: Request):
|
||||
except Exception as ex:
|
||||
print(ex)
|
||||
|
||||
|
||||
@app.delete('/from-s3/', status_code=200)
|
||||
async def delete_video_from_s3(delete_data: DeleteFromS3):
|
||||
s3_client = S3Client()
|
||||
s3_client.delete(delete_data.file_name)
|
||||
return JSONResponse(
|
||||
status_code=200,
|
||||
content={"result": f"Файл {delete_data.file_name} успешно удален из корзины {s3_client.BUCKET_NAME}"}
|
||||
)
|
||||
|
||||
|
||||
@app.post('/copy-to-another-bucket/', status_code=200)
|
||||
async def delete_video_from_s3(data: CopyToAnotherBucketS3):
|
||||
s3_client = S3Client()
|
||||
s3_client.copy_to_another_bucket(data.file_name, data.bucket_name)
|
||||
return JSONResponse(
|
||||
status_code=200,
|
||||
content={"result": f"Файл {data.file_name} успешно скопирован в корзину {data.bucket_name}"}
|
||||
)
|
||||
|
||||
|
||||
uvicorn.run("src.web.main:app", host="0.0.0.0", log_level="info")
|
||||
|
@ -50,11 +50,20 @@ class MergeOutputFormatEnum(Enum):
|
||||
@dataclass
|
||||
class SubmitIn:
|
||||
link: str = Form(...)
|
||||
video_format: VideoFormatEnum = Form(default=MergeOutputFormatEnum.format_webm)
|
||||
audio_format: AudioFormatEnum = Form(default=AudioFormatEnum.format_webm)
|
||||
resolution: ResolutionEnum = Form(default=ResolutionEnum.resolution_1080)
|
||||
video_format: VideoFormatEnum = Form(default=MergeOutputFormatEnum.format_mp4)
|
||||
audio_format: AudioFormatEnum = Form(default=AudioFormatEnum.format_m4a)
|
||||
resolution: ResolutionEnum = Form(default=ResolutionEnum.resolution_720)
|
||||
merge_output_format: MergeOutputFormatEnum = Form(default=MergeOutputFormatEnum.format_mkv)
|
||||
|
||||
|
||||
class CheckIn(BaseModel):
|
||||
link: str
|
||||
|
||||
|
||||
class DeleteFromS3(BaseModel):
|
||||
file_name: str
|
||||
|
||||
|
||||
class CopyToAnotherBucketS3(BaseModel):
|
||||
file_name: str
|
||||
bucket_name: str
|
||||
|
@ -77,9 +77,9 @@
|
||||
<option hidden>Выбор формата видео</option>
|
||||
<option value="3gp">3gp</option>
|
||||
<option value="flv">flv</option>
|
||||
<option value="mp4">mp4</option>
|
||||
<option value="mp4" selected>mp4</option>
|
||||
<option value="mov">mov</option>
|
||||
<option value="webm" selected>webm</option>
|
||||
<option value="webm">webm</option>
|
||||
</select >
|
||||
</p>
|
||||
<p>Формат аудио
|
||||
@ -87,9 +87,9 @@
|
||||
<option hidden>Выбор формата аудио</option>
|
||||
<option value="mp3">mp3</option>
|
||||
<option value="ogg">ogg</option>
|
||||
<option value="m4a">m4a</option>
|
||||
<option value="m4a" selected>m4a</option>
|
||||
<option value="opus">opus</option>
|
||||
<option value="webm" selected>webm</option>
|
||||
<option value="webm">webm</option>
|
||||
<option value="wav">wav</option>
|
||||
<option value="aac">aac</option>
|
||||
</select >
|
||||
@ -100,8 +100,8 @@
|
||||
<option value="240">240</option>
|
||||
<option value="360">360</option>
|
||||
<option value="480" >480</option>
|
||||
<option value="720">720</option>
|
||||
<option value="1080" selected>1080</option>
|
||||
<option value="720" selected>720</option>
|
||||
<option value="1080">1080</option>
|
||||
<option value="2048">2048</option>
|
||||
<option value="3840">3840</option>
|
||||
</select >
|
||||
|
Loading…
x
Reference in New Issue
Block a user