import json import os from ast import literal_eval import uvicorn from aio_pika import connect, Message, DeliveryMode from fastapi import FastAPI, Request, Depends import logging from starlette.middleware.cors import CORSMiddleware from starlette.responses import JSONResponse, FileResponse, StreamingResponse from starlette.templating import Jinja2Templates from src.core.redis_client import RedisClient from src.web.schemes.submit import SubmitIn app = FastAPI( title="video_downloader", openapi_url=f"/api/v1/openapi.json" ) templates = Jinja2Templates(directory="templates") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) ''' await self.app(scope, receive, send) File "/home/admin/video_downloader_service/.venv/lib/python3.10/site-packages/starlette/routing.py", line 66, in app response = await func(request) File "/home/admin/video_downloader_service/.venv/lib/python3.10/site-packages/fastapi/routing.py", line 273, in app raw_response = await run_endpoint_function( File "/home/admin/video_downloader_service/.venv/lib/python3.10/site-packages/fastapi/routing.py", line 190, in run_endpoint_function return await dependant.call(**values) File "/home/admin/video_downloader_service/src/web/main.py", line 81, in get_url_for_download_video task_done = await is_task_already_done_or_exist(red, data.link) File "/home/admin/video_downloader_service/src/web/main.py", line 34, in is_task_already_done_or_exist tasks = [ File "/home/admin/video_downloader_service/src/web/main.py", line 36, in if literal_eval(message.decode('utf-8'))["link"] == link TypeError: string indices must be integers ''' ''' queue_name -> [ "{link...}", "{link2...}", "{link3..}" ] queue_name -> { "link1" -> {vars}, "link2" -> {vars}, "link3" -> {vars}, } ''' async def is_task_already_done_or_exist(redis: RedisClient, link: str): messages = await redis.get_task_done_queue() temp = [json.loads(msg) for msg in messages] tasks = [ msg for msg in temp if msg["link"] == link and msg["status"] in ["done", "exist"] ] if len(tasks) > 0: task = tasks[0] return task async def is_task_in_process(redis: RedisClient, link: str): messages = await redis.get_tasks() tasks = [ literal_eval(message.decode('utf-8')) for message in messages if literal_eval(message.decode('utf-8'))["link"] == link ] if len(tasks) > 0: task = tasks[0] return task @app.get("/") async def index(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.post('/submit/') async def get_url_for_download_video(request: Request, data: SubmitIn = Depends()): red = RedisClient() task_done = await is_task_already_done_or_exist(red, data.link) # TODO: где-то не обновился статус после выполнения\провала задачи task_in_process = await is_task_in_process(red, data.link) if task_in_process: return JSONResponse({"result": "Задача в работе. Ожидайте"}) if task_done: link_to_download_video = str(request.base_url) + "get/?file_path=" + task_done["result"] return JSONResponse({"result": link_to_download_video}) # TODO: учесть, что если делать запрос CURL\urllib3\etc, в теле может быть несколько ссылок -> должно быть создано несколько задач async with await connect("amqp://guest:guest@localhost/") as connection: # Creating a channel channel = await connection.channel() body = [ { "link": data.link, "format": f"bestvideo[ext={data.video_format.value}]+bestaudio[ext={data.audio_format.value}]/best[ext={data.video_format.value}]/best", "merge_output_format": data.merge_output_format.value, "outtmpl": f"downloads/%(extractor_key)s/%(id)s_%(width)sp.%(ext)s", }, ] # Sending the message for link in body: if "mail" in link["link"]: link["parser"] = "MyMailRu" elif "yappy" in link["link"]: link["parser"] = "Yappy" message = Message( json.dumps(link, indent=4).encode('utf-8'), delivery_mode=DeliveryMode.PERSISTENT, ) await channel.default_exchange.publish( message, routing_key='hello', ) logging.info(f" [x] Sent '{link}'") # TODO: возможно возвращать идентификаторы задач aka куски ссылок return JSONResponse(status_code=201, content={"result": f"Задача поставлена в работу, ссылка: {link['link']}"}) # TODO: если уже была попытка сделать задачу и в редисе она с ошибкой, то переташить её в очередь на # выполнение с очисткой состояние об ошибке @app.get('/get/', response_class=FileResponse, status_code=200) async def download_video(file_path): base = os.path.dirname(os.path.dirname(os.path.abspath(file_path))) base_download_dir = os.path.join(base, os.pardir, os.pardir, "downloads") def iterfile(): with open(base_download_dir + f'/{file_path}', mode="rb") as file_like: yield from file_like return StreamingResponse(iterfile(), media_type="video/mp4") @app.post('/check/', response_class=FileResponse, status_code=200) async def download_video(request: Request, link: str): try: red = RedisClient() messages_task_done = await red.get_task_done_queue() messages_tasks = await red.get_tasks() tasks_done = [ literal_eval(message.decode('utf-8')) for message in messages_task_done if literal_eval(message.decode('utf-8'))["link"] == link ] tasks = [ literal_eval(message.decode('utf-8')) for message in messages_tasks if literal_eval(message.decode('utf-8'))["link"] == link ] error_tasks = [ tasks_done.pop(tasks_done.index(error_task)) for error_task in tasks_done if error_task["status"] == "error" ] if tasks_done else None if tasks and len(tasks) > 0: task = tasks[0] return JSONResponse( status_code=202, content={"result": f"Задача {task['link']} в данный момент в работе, выполняется"} ) # TODO: если уже была попытка сделать задачу и в редисе она с ошибкой, то переташить её в очередь на выполнение с очисткой состояние об ошибке if error_tasks and len(error_tasks) > 0: error_task = error_tasks[0] await red.del_task_from_task_done_queue(error_task) return JSONResponse(status_code=510, content={"result": f"Задача выполнена с ошибкой, попробуйте загрузить еще раз"}) if len(tasks_done) > 0: link_to_download_video = str(request.base_url) + "get/?file_path=" + tasks_done[0]["result"] return JSONResponse({"result": link_to_download_video}) except (AttributeError, IndexError): return JSONResponse(status_code=404, content={"result": "Задача не найдена"}) except Exception as ex: print(ex) uvicorn.run("src.web.main:app", host="0.0.0.0", log_level="info")