import json import os from ast import literal_eval import uvicorn from aio_pika import connect, Message, DeliveryMode from fastapi import FastAPI, Request, Depends from loguru import logger from starlette.middleware.cors import CORSMiddleware from starlette.responses import JSONResponse, FileResponse, StreamingResponse from starlette.templating import Jinja2Templates from src.core.redis_client import RedisClient from src.web.schemes.submit import SubmitIn app = FastAPI( title="video_downloader", openapi_url=f"/api/v1/openapi.json" ) templates = Jinja2Templates(directory="templates") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) ''' await self.app(scope, receive, send) File "/home/admin/video_downloader_service/.venv/lib/python3.10/site-packages/starlette/routing.py", line 66, in app response = await func(request) File "/home/admin/video_downloader_service/.venv/lib/python3.10/site-packages/fastapi/routing.py", line 273, in app raw_response = await run_endpoint_function( File "/home/admin/video_downloader_service/.venv/lib/python3.10/site-packages/fastapi/routing.py", line 190, in run_endpoint_function return await dependant.call(**values) File "/home/admin/video_downloader_service/src/web/main.py", line 81, in get_url_for_download_video task_done = await is_task_already_done_or_exist(red, data.link) File "/home/admin/video_downloader_service/src/web/main.py", line 34, in is_task_already_done_or_exist tasks = [ File "/home/admin/video_downloader_service/src/web/main.py", line 36, in if literal_eval(message.decode('utf-8'))["link"] == link TypeError: string indices must be integers ''' async def is_task_already_done_or_exist(redis: RedisClient, link: str): messages = await redis.get_task_done_queue() tasks = [ literal_eval(message.decode('utf-8')) for message in messages if literal_eval(message.decode('utf-8'))["link"] == link and literal_eval(message.decode('utf-8'))["status"] in ["done", "exist"] ] if len(tasks) > 0: task = tasks[0] return task async def is_task_in_process(redis: RedisClient, link: str): messages = await redis.get_tasks() tasks = [ literal_eval(message.decode('utf-8')) for message in messages if literal_eval(message.decode('utf-8'))["link"] == link ] if len(tasks) > 0: task = tasks[0] return task @app.get("/") async def index(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.post('/submit/') async def get_url_for_download_video(request: Request, data: SubmitIn = Depends()): red = RedisClient() task_done = await is_task_already_done_or_exist(red, data.link) task_in_process = await is_task_in_process(red, data.link) if task_in_process: return JSONResponse({"result": "Задача в работе. Ожидайте"}) if task_done: link_to_download_video = str(request.base_url) + "get/?file_path=" + task_done["result"] return JSONResponse({"result": link_to_download_video}) # TODO: учесть, что если делать запрос CURL\urllib3\etc, в теле может быть несколько ссылок -> должно быть создано несколько задач async with await connect("amqp://guest:guest@localhost/") as connection: # Creating a channel channel = await connection.channel() body = [ { "link": data.link, "format": f"bestvideo[ext={data.video_format.value}]+bestaudio[ext={data.audio_format.value}]/best[ext={data.video_format.value}]/best", "merge_output_format": data.merge_output_format.value, "outtmpl": f"downloads/%(extractor_key)s/%(id)s_%(width)sp.%(ext)s", }, ] # Sending the message for link in body: if "mail" in link["link"]: link["parser"] = "MyMailRu" elif "yappy" in link["link"]: link["parser"] = "Yappy" message = Message( json.dumps(link, indent=4).encode('utf-8'), delivery_mode=DeliveryMode.PERSISTENT, ) await channel.default_exchange.publish( message, routing_key='hello', ) logger.info(f" [x] Sent '{link}'") # TODO: возможно возвращать идентификаторы задач aka куски ссылок return JSONResponse(status_code=200, content={"result": f"Задача поставлена в работу, ссылка: {link['link']}"}) # TODO: если уже была попытка сделать задачу и в редисе она с ошибкой, то переташить её в очередь на # выполнение с очисткой состояние об ошибке @app.get('/get/', response_class=FileResponse, status_code=200) async def download_video(file_path): base = os.path.dirname(os.path.dirname(os.path.abspath(file_path))) base_download_dir = os.path.join(base, os.pardir, os.pardir, "downloads") def iterfile(): with open(base_download_dir + f'/{file_path}', mode="rb") as file_like: yield from file_like return StreamingResponse(iterfile(), media_type="video/mp4") @app.get('/check/', response_class=FileResponse, status_code=200) async def download_video(request: Request, link: str): try: red = RedisClient() messages_task_done = await red.get_task_done_queue() messages_tasks = await red.get_tasks() tasks_done = [ literal_eval(message.decode('utf-8')) for message in messages_task_done if literal_eval(message.decode('utf-8'))["link"] == link ] tasks = [ literal_eval(message.decode('utf-8')) for message in messages_tasks if literal_eval(message.decode('utf-8'))["link"] == link ] error_tasks = [ tasks_done.pop(tasks_done.index(error_task)) for error_task in tasks_done if error_task["status"] == "error" ] if len(tasks) > 0: task = tasks[0] return JSONResponse( status_code=202, content={"result": f"Задача {task['link']} в данный момент в работе, выполняется"} ) # TODO: если уже была попытка сделать задачу и в редисе она с ошибкой, то переташить её в очередь на выполнение с очисткой состояние об ошибке if len(error_tasks) > 0: error_task = error_tasks[0] await red.del_task_from_task_done_queue(error_task) return JSONResponse(status_code=510, content={"result": f"Задача выполнена с ошибкой, попробуйте загрузить еще раз"}) if len(tasks_done) > 0: link_to_download_video = str(request.base_url) + "get/?file_path=" + tasks_done[0]["result"] return JSONResponse({"result": link_to_download_video}) except (AttributeError, IndexError): return JSONResponse(status_code=404, content={"result": "Задача не найдена"}) except Exception as ex: print(ex) uvicorn.run("src.web.main:app", host="0.0.0.0", log_level="info")