video_downloader_service/src/web/main.py
nikili0n 801b9f2e52 up
2023-11-21 23:36:58 +03:00

137 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import os
from ast import literal_eval
import uvicorn
from aio_pika import connect, Message, DeliveryMode
from fastapi import FastAPI, Request, Depends
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import JSONResponse, FileResponse, StreamingResponse
from starlette.templating import Jinja2Templates
from src.core.redis_client import RedisClient
from src.web.schemes.submit import SubmitIn
app = FastAPI(
title="video_downloader", openapi_url=f"/api/v1/openapi.json"
)
templates = Jinja2Templates(directory="templates")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
async def is_task_already_done_or_exist(redis: RedisClient, link: str):
messages = await redis.get_task_done_queue()
tasks = [
literal_eval(message.decode('utf-8')) for message in messages
if literal_eval(message.decode('utf-8'))["link"] == link
and literal_eval(message.decode('utf-8'))["status"] in ["done", "exist"]
]
if len(tasks) > 0:
task = tasks[0]
await redis.del_task_from_task_done_queue(task)
return task
@app.get("/")
async def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post('/submit/')
async def get_url_for_download_video(request: Request, data: SubmitIn = Depends()):
'''
TODO:
Сабмит должен проверить что задача может быть уже выполненой (отдать ссылку в ответе)
или ещё в работе (сообщить об этом в ответе, можно вывести на форму, что такая ссылка уже скачивается, ожидайте)
Если условия выше провалены, то мы делаем новую задачу в очередь с переданными параметрами и сообщаем об этом клиенту с кодом (200 или 201)
Дополнительно, нужен отдельный метод (ури), который позволит получать статус задачи. Опрашиваться примерно раз в 5с,
возможны увелечения тайминга в зависимости от ответа апи (на будущее)
Варианты ответа
1) такой задачи нет (404)
2) такая задача есть и выполняется (200 ли?)
3) такая задача есть и завершена (200 и выдать ссылку на загрузку)
4) такая задача есть и завершена, но с ошибкой (500 и сообщение о том, что можно попробовать выполнить задачу занов,
попутно удалив задачу из выполненых, с очисткой мусора за ней)
'''
red = RedisClient()
task_done = await is_task_already_done_or_exist(red, data.link)
if task_done:
link_to_download_video = str(request.base_url) + "get/?file_path=" + task_done["result"]
return JSONResponse({"result": link_to_download_video})
# TODO: учесть, что если делать запрос CURL\urllib3\etc, в теле может быть несколько ссылок -> должно быть создано несколько задач
async with await connect("amqp://guest:guest@localhost/") as connection:
# Creating a channel
channel = await connection.channel()
body = [
{
"link": data.link,
"format": f"bestvideo[ext={data.format.value}]+bestaudio[ext={data.format.value}]/best[ext={data.format.value}]/best",
"merge_output_format": data.merge_output_format.value,
"outtmpl": f"downloads/%(extractor_key)s/%(id)s_%(width)sp.%(ext)s",
}, ]
# Sending the message
for link in body:
if "mail" in link["link"]:
link["parser"] = "MyMailRu"
elif "yappy" in link["link"]:
link["parser"] = "Yappy"
message = Message(
json.dumps(link, indent=4).encode('utf-8'), delivery_mode=DeliveryMode.PERSISTENT,
)
await channel.default_exchange.publish(
message,
routing_key='hello',
)
print(f" [x] Sent '{link}'")
while True:
try:
messages = await red.get_task_done_queue()
tasks = [
literal_eval(message.decode('utf-8')) for message in messages
if literal_eval(message.decode('utf-8'))["link"] == link["link"]
]
error_tasks = [tasks.pop(tasks.index(error_task)) for error_task in tasks if error_task["status"] == "error"]
# TODO: если уже была попытка сделать задачу и в редисе она с ошибкой, то переташить её в очередь на выполнение с очисткой состояние об ошибке
if len(error_tasks) > 0:
return JSONResponse({"result": f"STATUS: ERROR {error_tasks[-1]['result']}"})
if len(tasks) > 0:
task = tasks[0]
await red.del_task_from_task_done_queue(task)
break
await asyncio.sleep(5)
except (AttributeError, IndexError):
await asyncio.sleep(5)
continue
link_to_download_video = str(request.base_url) + "get/?file_path=" + task["result"]
# TODO: возможно возвращать идентификаторы задач aka куски ссылок
return JSONResponse({"result": link_to_download_video})
@app.get('/get/', response_class=FileResponse, status_code=200)
async def download_video(file_path):
base = os.path.dirname(os.path.dirname(os.path.abspath(file_path)))
base_download_dir = os.path.join(base, os.pardir, os.pardir, "downloads")
def iterfile():
with open(base_download_dir + f'/{file_path}', mode="rb") as file_like:
yield from file_like
return StreamingResponse(iterfile(), media_type="video/mp4")
uvicorn.run("src.web.main:app", host="0.0.0.0", log_level="info")