minor fixes, rework web service, add features
This commit is contained in:
		
							
								
								
									
										34
									
								
								poetry.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										34
									
								
								poetry.lock
									
									
									
										generated
									
									
									
								
							@@ -860,6 +860,24 @@ files = [
 | 
				
			|||||||
    {file = "jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe"},
 | 
					    {file = "jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe"},
 | 
				
			||||||
]
 | 
					]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					[[package]]
 | 
				
			||||||
 | 
					name = "loguru"
 | 
				
			||||||
 | 
					version = "0.7.2"
 | 
				
			||||||
 | 
					description = "Python logging made (stupidly) simple"
 | 
				
			||||||
 | 
					optional = false
 | 
				
			||||||
 | 
					python-versions = ">=3.5"
 | 
				
			||||||
 | 
					files = [
 | 
				
			||||||
 | 
					    {file = "loguru-0.7.2-py3-none-any.whl", hash = "sha256:003d71e3d3ed35f0f8984898359d65b79e5b21943f78af86aa5491210429b8eb"},
 | 
				
			||||||
 | 
					    {file = "loguru-0.7.2.tar.gz", hash = "sha256:e671a53522515f34fd406340ee968cb9ecafbc4b36c679da03c18fd8d0bd51ac"},
 | 
				
			||||||
 | 
					]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					[package.dependencies]
 | 
				
			||||||
 | 
					colorama = {version = ">=0.3.4", markers = "sys_platform == \"win32\""}
 | 
				
			||||||
 | 
					win32-setctime = {version = ">=1.0.0", markers = "sys_platform == \"win32\""}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					[package.extras]
 | 
				
			||||||
 | 
					dev = ["Sphinx (==7.2.5)", "colorama (==0.4.5)", "colorama (==0.4.6)", "exceptiongroup (==1.1.3)", "freezegun (==1.1.0)", "freezegun (==1.2.2)", "mypy (==v0.910)", "mypy (==v0.971)", "mypy (==v1.4.1)", "mypy (==v1.5.1)", "pre-commit (==3.4.0)", "pytest (==6.1.2)", "pytest (==7.4.0)", "pytest-cov (==2.12.1)", "pytest-cov (==4.1.0)", "pytest-mypy-plugins (==1.9.3)", "pytest-mypy-plugins (==3.0.0)", "sphinx-autobuild (==2021.3.14)", "sphinx-rtd-theme (==1.3.0)", "tox (==3.27.1)", "tox (==4.11.0)"]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
[[package]]
 | 
					[[package]]
 | 
				
			||||||
name = "lxml"
 | 
					name = "lxml"
 | 
				
			||||||
version = "4.9.3"
 | 
					version = "4.9.3"
 | 
				
			||||||
@@ -1851,6 +1869,20 @@ files = [
 | 
				
			|||||||
    {file = "websockets-11.0.3.tar.gz", hash = "sha256:88fc51d9a26b10fc331be344f1781224a375b78488fc343620184e95a4b27016"},
 | 
					    {file = "websockets-11.0.3.tar.gz", hash = "sha256:88fc51d9a26b10fc331be344f1781224a375b78488fc343620184e95a4b27016"},
 | 
				
			||||||
]
 | 
					]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					[[package]]
 | 
				
			||||||
 | 
					name = "win32-setctime"
 | 
				
			||||||
 | 
					version = "1.1.0"
 | 
				
			||||||
 | 
					description = "A small Python utility to set file creation time on Windows"
 | 
				
			||||||
 | 
					optional = false
 | 
				
			||||||
 | 
					python-versions = ">=3.5"
 | 
				
			||||||
 | 
					files = [
 | 
				
			||||||
 | 
					    {file = "win32_setctime-1.1.0-py3-none-any.whl", hash = "sha256:231db239e959c2fe7eb1d7dc129f11172354f98361c4fa2d6d2d7e278baa8aad"},
 | 
				
			||||||
 | 
					    {file = "win32_setctime-1.1.0.tar.gz", hash = "sha256:15cf5750465118d6929ae4de4eb46e8edae9a5634350c01ba582df868e932cb2"},
 | 
				
			||||||
 | 
					]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					[package.extras]
 | 
				
			||||||
 | 
					dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
[[package]]
 | 
					[[package]]
 | 
				
			||||||
name = "yarl"
 | 
					name = "yarl"
 | 
				
			||||||
version = "1.9.2"
 | 
					version = "1.9.2"
 | 
				
			||||||
@@ -1975,4 +2007,4 @@ websockets = "*"
 | 
				
			|||||||
[metadata]
 | 
					[metadata]
 | 
				
			||||||
lock-version = "2.0"
 | 
					lock-version = "2.0"
 | 
				
			||||||
python-versions = "^3.11"
 | 
					python-versions = "^3.11"
 | 
				
			||||||
content-hash = "adb19063d7f961b9bf479d99616e36f0b47d2b1479279fe2f60cef048ed70a64"
 | 
					content-hash = "1b704294868cb5a6ebb4ce3b9ffb373899bec9ce312ad94810bc1fe0d21ffb7d"
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -24,6 +24,7 @@ lxml = "^4.9.3"
 | 
				
			|||||||
minio = "^7.1.16"
 | 
					minio = "^7.1.16"
 | 
				
			||||||
aiogram = "3.0"
 | 
					aiogram = "3.0"
 | 
				
			||||||
pydantic = "^2.3.0"
 | 
					pydantic = "^2.3.0"
 | 
				
			||||||
 | 
					loguru = "^0.7.2"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
[build-system]
 | 
					[build-system]
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,7 +1,6 @@
 | 
				
			|||||||
import asyncio
 | 
					import asyncio
 | 
				
			||||||
import concurrent.futures as pool
 | 
					import concurrent.futures as pool
 | 
				
			||||||
import subprocess
 | 
					import subprocess
 | 
				
			||||||
import traceback
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
from functools import partial
 | 
					from functools import partial
 | 
				
			||||||
from urllib.parse import urlparse
 | 
					from urllib.parse import urlparse
 | 
				
			||||||
@@ -17,6 +16,7 @@ from src.parsers.base_parser import BaseParser
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
# TODO: добавить логгер с временными метками в yt-dlp
 | 
					# TODO: добавить логгер с временными метками в yt-dlp
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class MasterService:
 | 
					class MasterService:
 | 
				
			||||||
    def __init__(self):
 | 
					    def __init__(self):
 | 
				
			||||||
        self.loop = asyncio.get_event_loop()
 | 
					        self.loop = asyncio.get_event_loop()
 | 
				
			||||||
@@ -40,10 +40,7 @@ class MasterService:
 | 
				
			|||||||
    async def create_workers(self):
 | 
					    async def create_workers(self):
 | 
				
			||||||
        while True:
 | 
					        while True:
 | 
				
			||||||
            video_params = await self.queue.get()
 | 
					            video_params = await self.queue.get()
 | 
				
			||||||
            # TODO: позднее написать функцию для определения парсера автоматически
 | 
					 | 
				
			||||||
            redis = RedisClient()
 | 
					            redis = RedisClient()
 | 
				
			||||||
            # TODO: проверить что в редисе задача либо уже выполнена, т.е. сразу отдать ссылку, либо что она ранее была закончена с ошибкой
 | 
					 | 
				
			||||||
            #       и проверять словарь self.currently_underway, для надёжности
 | 
					 | 
				
			||||||
            await redis.del_task_from_queue_and_add_to_tasks(task=video_params)
 | 
					            await redis.del_task_from_queue_and_add_to_tasks(task=video_params)
 | 
				
			||||||
            self.currently_underway[video_params['link']] = video_params
 | 
					            self.currently_underway[video_params['link']] = video_params
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -53,7 +50,7 @@ class MasterService:
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
            result: Result = await download_task
 | 
					            result: Result = await download_task
 | 
				
			||||||
            if result.result_type in [ResultTypeEnum.DONE, ResultTypeEnum.EXIST]:
 | 
					            if result.result_type in [ResultTypeEnum.DONE, ResultTypeEnum.EXIST]:
 | 
				
			||||||
                await redis.del_task_from_tasks_and_add_to_task_done(task=result.value)
 | 
					                await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params)
 | 
				
			||||||
                await publish_message_with_task_done(task=result.value)
 | 
					                await publish_message_with_task_done(task=result.value)
 | 
				
			||||||
                self.queue.task_done()
 | 
					                self.queue.task_done()
 | 
				
			||||||
            else:
 | 
					            else:
 | 
				
			||||||
@@ -62,16 +59,11 @@ class MasterService:
 | 
				
			|||||||
                    "result": result.value,
 | 
					                    "result": result.value,
 | 
				
			||||||
                    "status": "error"
 | 
					                    "status": "error"
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
                await redis.del_task_from_tasks_and_add_to_task_done(task=error_message)
 | 
					                await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params)
 | 
				
			||||||
                await publish_message_with_task_done(task=error_message)
 | 
					                await publish_message_with_task_done(task=error_message)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if video_params['link'] in self.currently_underway:
 | 
					            if video_params['link'] in self.currently_underway:
 | 
				
			||||||
                del self.currently_underway[video_params['link']]
 | 
					                del self.currently_underway[video_params['link']]
 | 
				
			||||||
            # TODO process result
 | 
					 | 
				
			||||||
            #  Result.Done \ Result.Exist - уведомить что задача выполнена, и отослать во вторую очередь сообщений
 | 
					 | 
				
			||||||
            #  RabbitMQ сообщение об этом
 | 
					 | 
				
			||||||
            #  Result.Error - в таблице Редиса для выполненых задач, пометить, что это ошибка и уведомить об этом
 | 
					 | 
				
			||||||
            #  по второй очереди сообщений и потом почистить self.currently_underway
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @staticmethod
 | 
					    @staticmethod
 | 
				
			||||||
    def video_download(video_params: dict):
 | 
					    def video_download(video_params: dict):
 | 
				
			||||||
@@ -113,9 +105,6 @@ class MasterService:
 | 
				
			|||||||
            })
 | 
					            })
 | 
				
			||||||
        except SiteNotImplementedException as ex:
 | 
					        except SiteNotImplementedException as ex:
 | 
				
			||||||
            return Result(result_type=ResultTypeEnum.EXCEPTION, value=ex.default_message)
 | 
					            return Result(result_type=ResultTypeEnum.EXCEPTION, value=ex.default_message)
 | 
				
			||||||
 | 
					 | 
				
			||||||
        except Exception as ex:
 | 
					 | 
				
			||||||
            return Result(result_type=ResultTypeEnum.EXCEPTION, value=traceback.format_exc())
 | 
					 | 
				
			||||||
        # TODO upload to server
 | 
					        # TODO upload to server
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -11,22 +11,14 @@ class RedisClient:
 | 
				
			|||||||
    def __init__(self):
 | 
					    def __init__(self):
 | 
				
			||||||
        self.connection = redis.Redis(host="localhost", port=6379, db=0)
 | 
					        self.connection = redis.Redis(host="localhost", port=6379, db=0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def _set_task(self, task: dict) -> int:
 | 
					    async def _set_task(self, task: dict, queue_name) -> int:
 | 
				
			||||||
        async with self.connection as connection:
 | 
					        async with self.connection as connection:
 | 
				
			||||||
            res = await connection.set(f'{self.TASKS_NAME}:{task["link"]}', json.dumps(task, indent=4).encode('utf-8'))
 | 
					            res = await connection.sadd(f'{queue_name}:1', json.dumps(task, indent=4).encode('utf-8'))
 | 
				
			||||||
        return res
 | 
					        return res
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def _set_task_done(self, task: dict) -> int:
 | 
					    async def _del_task(self, task: dict, queue_name) -> int:
 | 
				
			||||||
        async with self.connection as connection:
 | 
					        async with self.connection as connection:
 | 
				
			||||||
            res = await connection.sadd(
 | 
					            res = await connection.srem(f'{queue_name}:1', json.dumps(task, indent=4).encode('utf-8'))
 | 
				
			||||||
                f'{self.TASKS_DONE_NAME}:1',
 | 
					 | 
				
			||||||
                json.dumps(task, indent=4).encode('utf-8')
 | 
					 | 
				
			||||||
                                       )
 | 
					 | 
				
			||||||
        return res
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    async def _del_task(self, task: dict) -> int:
 | 
					 | 
				
			||||||
        async with self.connection as connection:
 | 
					 | 
				
			||||||
            res = await connection.delete(f'{self.TASKS_NAME}:{task["link"]}')
 | 
					 | 
				
			||||||
        return res
 | 
					        return res
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def set_task_to_queue(self, task: dict) -> int:
 | 
					    async def set_task_to_queue(self, task: dict) -> int:
 | 
				
			||||||
@@ -36,7 +28,17 @@ class RedisClient:
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    async def get_queue(self) -> set:
 | 
					    async def get_queue(self) -> set:
 | 
				
			||||||
        async with self.connection as connection:
 | 
					        async with self.connection as connection:
 | 
				
			||||||
            res = await connection.smembers(self.SET_NAME)
 | 
					            res = await connection.smembers(self.SET_NAME + f":1")
 | 
				
			||||||
 | 
					        return res
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async def get_tasks(self) -> set:
 | 
				
			||||||
 | 
					        async with self.connection as connection:
 | 
				
			||||||
 | 
					            res = await connection.smembers(self.TASKS_NAME + f":1")
 | 
				
			||||||
 | 
					        return res
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async def get_task_done_queue(self) -> set:
 | 
				
			||||||
 | 
					        async with self.connection as connection:
 | 
				
			||||||
 | 
					            res = await connection.smembers(self.TASKS_DONE_NAME + f":1")
 | 
				
			||||||
        return res
 | 
					        return res
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def del_task_from_queue(self, task: dict) -> int:
 | 
					    async def del_task_from_queue(self, task: dict) -> int:
 | 
				
			||||||
@@ -45,27 +47,14 @@ class RedisClient:
 | 
				
			|||||||
        return res
 | 
					        return res
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def del_task_from_queue_and_add_to_tasks(self, task: dict) -> int:
 | 
					    async def del_task_from_queue_and_add_to_tasks(self, task: dict) -> int:
 | 
				
			||||||
 | 
					        await self._del_task(task, self.SET_NAME)
 | 
				
			||||||
 | 
					        return await self._set_task(task, self.TASKS_NAME)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        await self.del_task_from_queue(task)
 | 
					    async def del_task_from_tasks_and_add_to_task_done(self, task: dict, working_task: dict) -> int:
 | 
				
			||||||
        return await self._set_task(task)
 | 
					        await self._del_task(working_task, self.TASKS_NAME)
 | 
				
			||||||
 | 
					        return await self._set_task(task, self.TASKS_DONE_NAME)
 | 
				
			||||||
    async def del_task_from_tasks_and_add_to_task_done(self, task: dict) -> int:
 | 
					 | 
				
			||||||
        await self._del_task(task)
 | 
					 | 
				
			||||||
        return await self._set_task_done(task)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    async def get_task_done_queue(self) -> set:
 | 
					 | 
				
			||||||
        async with self.connection as connection:
 | 
					 | 
				
			||||||
            res = await connection.smembers(self.TASKS_DONE_NAME + f":1")
 | 
					 | 
				
			||||||
        return res
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def del_task_from_task_done_queue(self, task) -> int:
 | 
					    async def del_task_from_task_done_queue(self, task) -> int:
 | 
				
			||||||
        async with self.connection as connection:
 | 
					        async with self.connection as connection:
 | 
				
			||||||
            res = await connection.srem(self.TASKS_DONE_NAME + f":1", json.dumps(task, indent=4).encode('utf-8'))
 | 
					            res = await connection.srem(self.TASKS_DONE_NAME + f":1", json.dumps(task, indent=4).encode('utf-8'))
 | 
				
			||||||
        return res
 | 
					        return res
 | 
				
			||||||
 | 
					 | 
				
			||||||
    async def get_tasks_queue(self) -> set:
 | 
					 | 
				
			||||||
        async with self.connection as connection:
 | 
					 | 
				
			||||||
            res = await connection.json().get(self.TASKS_NAME)
 | 
					 | 
				
			||||||
        return res
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
@@ -27,6 +27,12 @@ class VideoDownloader:
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    def download(self):
 | 
					    def download(self):
 | 
				
			||||||
        # TODO: удалить все файлы связанные с текущим видео, которые сейчас остались
 | 
					        # TODO: удалить все файлы связанные с текущим видео, которые сейчас остались
 | 
				
			||||||
 | 
					        base = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 | 
				
			||||||
 | 
					        base_download_dir = os.path.join(base, os.pardir, "downloads", self.info['extractor_key'])
 | 
				
			||||||
 | 
					        for root, dirs, files in os.walk(base_download_dir):
 | 
				
			||||||
 | 
					            for file in files:
 | 
				
			||||||
 | 
					                if file.find(self.info['id']) != -1 and file.find('.part') != -1:
 | 
				
			||||||
 | 
					                    os.remove(base_download_dir + f"/{file}")
 | 
				
			||||||
        with YoutubeDL(self.ydl_opts if self.ydl_opts else {}) as ydl:
 | 
					        with YoutubeDL(self.ydl_opts if self.ydl_opts else {}) as ydl:
 | 
				
			||||||
            ydl.download([self.link])
 | 
					            ydl.download([self.link])
 | 
				
			||||||
            return self.info
 | 
					            return self.info
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,6 +1,9 @@
 | 
				
			|||||||
 | 
					import datetime
 | 
				
			||||||
import errno
 | 
					import errno
 | 
				
			||||||
import os
 | 
					import os
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from loguru import logger
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from src.core.ydl import VideoDownloader
 | 
					from src.core.ydl import VideoDownloader
 | 
				
			||||||
from src.exceptions.download_exceptions import FileAlreadyExistException
 | 
					from src.exceptions.download_exceptions import FileAlreadyExistException
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -15,6 +18,7 @@ class BaseParser:
 | 
				
			|||||||
    def video_download(self):
 | 
					    def video_download(self):
 | 
				
			||||||
        ydl_opts = {
 | 
					        ydl_opts = {
 | 
				
			||||||
            "format": self.params["format"],
 | 
					            "format": self.params["format"],
 | 
				
			||||||
 | 
					            "logger": logger,
 | 
				
			||||||
            "merge_output_format": self.params["merge_output_format"],
 | 
					            "merge_output_format": self.params["merge_output_format"],
 | 
				
			||||||
            'outtmpl': self.params["outtmpl"],
 | 
					            'outtmpl': self.params["outtmpl"],
 | 
				
			||||||
            "quiet": True
 | 
					            "quiet": True
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										105
									
								
								src/web/main.py
									
									
									
									
									
								
							
							
						
						
									
										105
									
								
								src/web/main.py
									
									
									
									
									
								
							@@ -1,4 +1,3 @@
 | 
				
			|||||||
import asyncio
 | 
					 | 
				
			||||||
import json
 | 
					import json
 | 
				
			||||||
import os
 | 
					import os
 | 
				
			||||||
from ast import literal_eval
 | 
					from ast import literal_eval
 | 
				
			||||||
@@ -6,6 +5,7 @@ from ast import literal_eval
 | 
				
			|||||||
import uvicorn
 | 
					import uvicorn
 | 
				
			||||||
from aio_pika import connect, Message, DeliveryMode
 | 
					from aio_pika import connect, Message, DeliveryMode
 | 
				
			||||||
from fastapi import FastAPI, Request, Depends
 | 
					from fastapi import FastAPI, Request, Depends
 | 
				
			||||||
 | 
					from loguru import logger
 | 
				
			||||||
from starlette.middleware.cors import CORSMiddleware
 | 
					from starlette.middleware.cors import CORSMiddleware
 | 
				
			||||||
from starlette.responses import JSONResponse, FileResponse, StreamingResponse
 | 
					from starlette.responses import JSONResponse, FileResponse, StreamingResponse
 | 
				
			||||||
from starlette.templating import Jinja2Templates
 | 
					from starlette.templating import Jinja2Templates
 | 
				
			||||||
@@ -39,7 +39,19 @@ async def is_task_already_done_or_exist(redis: RedisClient, link: str):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    if len(tasks) > 0:
 | 
					    if len(tasks) > 0:
 | 
				
			||||||
        task = tasks[0]
 | 
					        task = tasks[0]
 | 
				
			||||||
        await redis.del_task_from_task_done_queue(task)
 | 
					        return task
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					async def is_task_in_process(redis: RedisClient, link: str):
 | 
				
			||||||
 | 
					    messages = await redis.get_tasks()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    tasks = [
 | 
				
			||||||
 | 
					        literal_eval(message.decode('utf-8')) for message in messages
 | 
				
			||||||
 | 
					        if literal_eval(message.decode('utf-8'))["link"] == link
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if len(tasks) > 0:
 | 
				
			||||||
 | 
					        task = tasks[0]
 | 
				
			||||||
        return task
 | 
					        return task
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -47,9 +59,10 @@ async def is_task_already_done_or_exist(redis: RedisClient, link: str):
 | 
				
			|||||||
async def index(request: Request):
 | 
					async def index(request: Request):
 | 
				
			||||||
    return templates.TemplateResponse("index.html", {"request": request})
 | 
					    return templates.TemplateResponse("index.html", {"request": request})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@app.post('/submit/')
 | 
					@app.post('/submit/')
 | 
				
			||||||
async def get_url_for_download_video(request: Request, data: SubmitIn = Depends()):
 | 
					async def get_url_for_download_video(request: Request, data: SubmitIn = Depends()):
 | 
				
			||||||
    '''
 | 
					    """
 | 
				
			||||||
    TODO:
 | 
					    TODO:
 | 
				
			||||||
    Сабмит должен проверить что задача может быть уже выполненой (отдать ссылку в ответе)
 | 
					    Сабмит должен проверить что задача может быть уже выполненой (отдать ссылку в ответе)
 | 
				
			||||||
        или ещё в работе (сообщить об этом в ответе, можно вывести на форму, что такая ссылка уже скачивается, ожидайте)
 | 
					        или ещё в работе (сообщить об этом в ответе, можно вывести на форму, что такая ссылка уже скачивается, ожидайте)
 | 
				
			||||||
@@ -61,14 +74,18 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
 | 
				
			|||||||
    1) такой задачи нет (404)
 | 
					    1) такой задачи нет (404)
 | 
				
			||||||
    2) такая задача есть и выполняется (200 ли?)
 | 
					    2) такая задача есть и выполняется (200 ли?)
 | 
				
			||||||
    3) такая задача есть и завершена (200 и выдать ссылку на загрузку)
 | 
					    3) такая задача есть и завершена (200 и выдать ссылку на загрузку)
 | 
				
			||||||
    4) такая задача есть и завершена, но с ошибкой (500 и сообщение о том, что можно попробовать выполнить задачу занов, 
 | 
					    4) такая задача есть и завершена, но с ошибкой (500 и сообщение о том, что можно попробовать выполнить задачу заново,
 | 
				
			||||||
                                                    попутно удалив задачу из выполненых, с очисткой мусора за ней)
 | 
					                                                    попутно удалив задачу из выполненных, с очисткой мусора за ней)
 | 
				
			||||||
    '''
 | 
					    """
 | 
				
			||||||
    red = RedisClient()
 | 
					    red = RedisClient()
 | 
				
			||||||
    task_done = await is_task_already_done_or_exist(red, data.link)
 | 
					    task_done = await is_task_already_done_or_exist(red, data.link)
 | 
				
			||||||
 | 
					    task_in_process = await is_task_in_process(red, data.link)
 | 
				
			||||||
 | 
					    if task_in_process:
 | 
				
			||||||
 | 
					        return JSONResponse({"result": "Задача в работе. Ожидайте"})
 | 
				
			||||||
    if task_done:
 | 
					    if task_done:
 | 
				
			||||||
        link_to_download_video = str(request.base_url) + "get/?file_path=" + task_done["result"]
 | 
					        link_to_download_video = str(request.base_url) + "get/?file_path=" + task_done["result"]
 | 
				
			||||||
        return JSONResponse({"result": link_to_download_video})
 | 
					        return JSONResponse({"result": link_to_download_video})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # TODO: учесть, что если делать запрос CURL\urllib3\etc, в теле может быть несколько ссылок -> должно быть создано несколько задач
 | 
					    # TODO: учесть, что если делать запрос CURL\urllib3\etc, в теле может быть несколько ссылок -> должно быть создано несколько задач
 | 
				
			||||||
    async with await connect("amqp://guest:guest@localhost/") as connection:
 | 
					    async with await connect("amqp://guest:guest@localhost/") as connection:
 | 
				
			||||||
        # Creating a channel
 | 
					        # Creating a channel
 | 
				
			||||||
@@ -76,7 +93,7 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
 | 
				
			|||||||
        body = [
 | 
					        body = [
 | 
				
			||||||
            {
 | 
					            {
 | 
				
			||||||
                "link": data.link,
 | 
					                "link": data.link,
 | 
				
			||||||
                "format": f"bestvideo[ext={data.format.value}]+bestaudio[ext={data.format.value}]/best[ext={data.format.value}]/best",
 | 
					                "format": f"bestvideo[ext={data.video_format.value}]+bestaudio[ext={data.audio_format.value}]/best[ext={data.video_format.value}]/best",
 | 
				
			||||||
                "merge_output_format": data.merge_output_format.value,
 | 
					                "merge_output_format": data.merge_output_format.value,
 | 
				
			||||||
                "outtmpl": f"downloads/%(extractor_key)s/%(id)s_%(width)sp.%(ext)s",
 | 
					                "outtmpl": f"downloads/%(extractor_key)s/%(id)s_%(width)sp.%(ext)s",
 | 
				
			||||||
            }, ]
 | 
					            }, ]
 | 
				
			||||||
@@ -94,31 +111,11 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
 | 
				
			|||||||
                routing_key='hello',
 | 
					                routing_key='hello',
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        print(f" [x] Sent '{link}'")
 | 
					        logger.info(f" [x] Sent '{link}'")
 | 
				
			||||||
 | 
					        # TODO: возможно возвращать идентификаторы задач aka куски ссылок
 | 
				
			||||||
        while True:
 | 
					        return JSONResponse(status_code=200, content={"result": f"Задача поставлена в работу, ссылка: {link['link']}"})
 | 
				
			||||||
            try:
 | 
					        # TODO: если уже была попытка сделать задачу и в редисе она с ошибкой, то переташить её в очередь на
 | 
				
			||||||
                messages = await red.get_task_done_queue()
 | 
					        #  выполнение с очисткой состояние об ошибке
 | 
				
			||||||
                tasks = [
 | 
					 | 
				
			||||||
                    literal_eval(message.decode('utf-8')) for message in messages
 | 
					 | 
				
			||||||
                    if literal_eval(message.decode('utf-8'))["link"] == link["link"]
 | 
					 | 
				
			||||||
                ]
 | 
					 | 
				
			||||||
                error_tasks = [tasks.pop(tasks.index(error_task)) for error_task in tasks if error_task["status"] == "error"]
 | 
					 | 
				
			||||||
                # TODO: если уже была попытка сделать задачу и в редисе она с ошибкой, то переташить её в очередь на выполнение с очисткой состояние об ошибке
 | 
					 | 
				
			||||||
                if len(error_tasks) > 0:
 | 
					 | 
				
			||||||
                    return JSONResponse({"result": f"STATUS: ERROR {error_tasks[-1]['result']}"})
 | 
					 | 
				
			||||||
                if len(tasks) > 0:
 | 
					 | 
				
			||||||
                    task = tasks[0]
 | 
					 | 
				
			||||||
                    await red.del_task_from_task_done_queue(task)
 | 
					 | 
				
			||||||
                    break
 | 
					 | 
				
			||||||
                await asyncio.sleep(5)
 | 
					 | 
				
			||||||
            except (AttributeError, IndexError):
 | 
					 | 
				
			||||||
                await asyncio.sleep(5)
 | 
					 | 
				
			||||||
                continue
 | 
					 | 
				
			||||||
        link_to_download_video = str(request.base_url) + "get/?file_path=" + task["result"]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # TODO: возможно возвращать идентификаторы задач aka куски ссылок
 | 
					 | 
				
			||||||
    return JSONResponse({"result": link_to_download_video})
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@app.get('/get/', response_class=FileResponse, status_code=200)
 | 
					@app.get('/get/', response_class=FileResponse, status_code=200)
 | 
				
			||||||
@@ -133,4 +130,46 @@ async def download_video(file_path):
 | 
				
			|||||||
    return StreamingResponse(iterfile(), media_type="video/mp4")
 | 
					    return StreamingResponse(iterfile(), media_type="video/mp4")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
uvicorn.run("src.web.main:app", host="0.0.0.0", log_level="info")
 | 
					@app.get('/check/', response_class=FileResponse, status_code=200)
 | 
				
			||||||
 | 
					async def download_video(request: Request, link: str):
 | 
				
			||||||
 | 
					    try:
 | 
				
			||||||
 | 
					        red = RedisClient()
 | 
				
			||||||
 | 
					        messages_task_done = await red.get_task_done_queue()
 | 
				
			||||||
 | 
					        messages_tasks = await red.get_tasks()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        tasks_done = [
 | 
				
			||||||
 | 
					            literal_eval(message.decode('utf-8')) for message in messages_task_done
 | 
				
			||||||
 | 
					            if literal_eval(message.decode('utf-8'))["link"] == link
 | 
				
			||||||
 | 
					        ]
 | 
				
			||||||
 | 
					        tasks = [
 | 
				
			||||||
 | 
					            literal_eval(message.decode('utf-8')) for message in messages_tasks
 | 
				
			||||||
 | 
					            if literal_eval(message.decode('utf-8'))["link"] == link
 | 
				
			||||||
 | 
					        ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        error_tasks = [
 | 
				
			||||||
 | 
					            tasks_done.pop(tasks_done.index(error_task)) for error_task in tasks_done if error_task["status"] == "error"
 | 
				
			||||||
 | 
					        ]
 | 
				
			||||||
 | 
					        if len(tasks) > 0:
 | 
				
			||||||
 | 
					            task = tasks[0]
 | 
				
			||||||
 | 
					            return JSONResponse(
 | 
				
			||||||
 | 
					                status_code=202,
 | 
				
			||||||
 | 
					                content={"result": f"Задача {task['link']} в данный момент в работе, выполняется"}
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					        # TODO: если уже была попытка сделать задачу и в редисе она с ошибкой, то переташить её в очередь на выполнение с очисткой состояние об ошибке
 | 
				
			||||||
 | 
					        if len(error_tasks) > 0:
 | 
				
			||||||
 | 
					            error_task = error_tasks[0]
 | 
				
			||||||
 | 
					            await red.del_task_from_task_done_queue(error_task)
 | 
				
			||||||
 | 
					            return JSONResponse(status_code=510,
 | 
				
			||||||
 | 
					                                content={"result": f"Задача выполнена с ошибкой, попробуйте загрузить еще раз"})
 | 
				
			||||||
 | 
					        if len(tasks_done) > 0:
 | 
				
			||||||
 | 
					            link_to_download_video = str(request.base_url) + "get/?file_path=" + tasks_done[0]["result"]
 | 
				
			||||||
 | 
					            return JSONResponse({"result": link_to_download_video})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    except (AttributeError, IndexError):
 | 
				
			||||||
 | 
					        return JSONResponse(status_code=404, content={"result": "Задача не найдена"})
 | 
				
			||||||
 | 
					    except Exception as ex:
 | 
				
			||||||
 | 
					        print(ex)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					if __name__ == '__main__':
 | 
				
			||||||
 | 
					    uvicorn.run("src.web.main:app", host="0.0.0.0", log_level="info")
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -9,6 +9,7 @@ vext: Video Extension (mp4 > mov > webm > flv > other). If --prefer-free-formats
 | 
				
			|||||||
aext: Audio Extension (m4a > aac > mp3 > ogg > opus > webm > other). If --prefer-free-formats is used, the order changes to ogg > opus > webm > mp3 > m4a > aac
 | 
					aext: Audio Extension (m4a > aac > mp3 > ogg > opus > webm > other). If --prefer-free-formats is used, the order changes to ogg > opus > webm > mp3 > m4a > aac
 | 
				
			||||||
'''
 | 
					'''
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class VideoFormatEnum(Enum):
 | 
					class VideoFormatEnum(Enum):
 | 
				
			||||||
    format_3gp = "3gp"
 | 
					    format_3gp = "3gp"
 | 
				
			||||||
    format_flv = "flv"
 | 
					    format_flv = "flv"
 | 
				
			||||||
@@ -16,6 +17,7 @@ class VideoFormatEnum(Enum):
 | 
				
			|||||||
    format_mov = "mov"
 | 
					    format_mov = "mov"
 | 
				
			||||||
    format_webm = "webm"
 | 
					    format_webm = "webm"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class AudioFormatEnum(Enum):
 | 
					class AudioFormatEnum(Enum):
 | 
				
			||||||
    format_aac = "aac"
 | 
					    format_aac = "aac"
 | 
				
			||||||
    format_m4a = "m4a"
 | 
					    format_m4a = "m4a"
 | 
				
			||||||
@@ -34,6 +36,7 @@ class MergeOutputFormatEnum(Enum):
 | 
				
			|||||||
    format_mp4 = "mp4"
 | 
					    format_mp4 = "mp4"
 | 
				
			||||||
    format_webm = "webm"
 | 
					    format_webm = "webm"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@dataclass
 | 
					@dataclass
 | 
				
			||||||
class SubmitIn:
 | 
					class SubmitIn:
 | 
				
			||||||
    link: str = Form(...)
 | 
					    link: str = Form(...)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -71,7 +71,7 @@
 | 
				
			|||||||
</style>
 | 
					</style>
 | 
				
			||||||
<body>
 | 
					<body>
 | 
				
			||||||
    <form method="post" action="/submit" id="download">
 | 
					    <form method="post" action="/submit" id="download">
 | 
				
			||||||
         <input type="text" name="link" placeholder="link">
 | 
					         <input type="text" name="link" id="link" placeholder="link">
 | 
				
			||||||
         <input type="text" name="video_format" placeholder="video_format">
 | 
					         <input type="text" name="video_format" placeholder="video_format">
 | 
				
			||||||
         <input type="text" name="audio_format" placeholder="audio_format">
 | 
					         <input type="text" name="audio_format" placeholder="audio_format">
 | 
				
			||||||
         <input type="text" name="merge_output_format" placeholder="merge_output_format">
 | 
					         <input type="text" name="merge_output_format" placeholder="merge_output_format">
 | 
				
			||||||
@@ -87,10 +87,39 @@
 | 
				
			|||||||
    </div>
 | 
					    </div>
 | 
				
			||||||
</body>
 | 
					</body>
 | 
				
			||||||
<script>
 | 
					<script>
 | 
				
			||||||
 | 
					    function sendReq() {
 | 
				
			||||||
 | 
					        document.forms.download.querySelector('[type="submit"]').disabled = true;
 | 
				
			||||||
 | 
					        document.forms.download.querySelector('.submit-spinner').classList.remove('submit-spinner_hide');
 | 
				
			||||||
 | 
					        const link = document.getElementById("link").value
 | 
				
			||||||
 | 
					        const xhr2 = new XMLHttpRequest();
 | 
				
			||||||
 | 
					        xhr2.open('GET', 'http://0.0.0.0:8000/check/?link=' + link);
 | 
				
			||||||
 | 
					        xhr2.responseType = 'json';
 | 
				
			||||||
 | 
					        xhr2.onload = function() {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if (xhr2.status !== 200) {
 | 
				
			||||||
 | 
					                if ('response' in xhr2 && xhr2.response !== null) {
 | 
				
			||||||
 | 
					                    console.log(xhr2.response)
 | 
				
			||||||
 | 
					                    result.innerHTML = xhr2.response.result;
 | 
				
			||||||
 | 
					                    result.href = xhr2.response.result;
 | 
				
			||||||
 | 
					                };
 | 
				
			||||||
 | 
					                setTimeout(sendReq, 5000);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            } else if (xhr2.status === 200) {
 | 
				
			||||||
 | 
					                result.innerHTML = xhr2.response.result;
 | 
				
			||||||
 | 
					                result.href = xhr2.response.result;
 | 
				
			||||||
 | 
					                document.forms.download.querySelector('[type="submit"]').disabled = false;
 | 
				
			||||||
 | 
					                document.forms.download.querySelector('.submit-spinner').classList.add('submit-spinner_hide');
 | 
				
			||||||
 | 
					              };
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
 | 
					        xhr2.send()
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    function sendForm() {
 | 
					    function sendForm() {
 | 
				
			||||||
    const xhr = new XMLHttpRequest();
 | 
					    const xhr = new XMLHttpRequest();
 | 
				
			||||||
    xhr.open('POST', document.forms.download.action);
 | 
					    xhr.open('POST', document.forms.download.action);
 | 
				
			||||||
    xhr.responseType = 'json';
 | 
					    xhr.responseType = 'json';
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    xhr.onload = () => {
 | 
					    xhr.onload = () => {
 | 
				
			||||||
      document.forms.download.querySelector('[type="submit"]').disabled = false;
 | 
					      document.forms.download.querySelector('[type="submit"]').disabled = false;
 | 
				
			||||||
      document.forms.download.querySelector('.submit-spinner').classList.add('submit-spinner_hide');
 | 
					      document.forms.download.querySelector('.submit-spinner').classList.add('submit-spinner_hide');
 | 
				
			||||||
@@ -108,11 +137,17 @@
 | 
				
			|||||||
    document.forms.download.querySelector('[type="submit"]').disabled = true;
 | 
					    document.forms.download.querySelector('[type="submit"]').disabled = true;
 | 
				
			||||||
    document.forms.download.querySelector('.submit-spinner').classList.remove('submit-spinner_hide');
 | 
					    document.forms.download.querySelector('.submit-spinner').classList.remove('submit-spinner_hide');
 | 
				
			||||||
    xhr.send(new FormData(document.forms.download));
 | 
					    xhr.send(new FormData(document.forms.download));
 | 
				
			||||||
 | 
					    sendReq()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  // при отправке формы
 | 
					  // при отправке формы
 | 
				
			||||||
  document.forms.download.addEventListener('submit', (e) => {
 | 
					  document.forms.download.addEventListener('submit', (e) => {
 | 
				
			||||||
    e.preventDefault();
 | 
					    e.preventDefault();
 | 
				
			||||||
    sendForm();
 | 
					    sendForm();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  });
 | 
					  });
 | 
				
			||||||
</script>
 | 
					</script>
 | 
				
			||||||
</html>
 | 
					</html>
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user