minor fixes, rework web service, add features
This commit is contained in:
@ -1,7 +1,6 @@
|
||||
import asyncio
|
||||
import concurrent.futures as pool
|
||||
import subprocess
|
||||
import traceback
|
||||
|
||||
from functools import partial
|
||||
from urllib.parse import urlparse
|
||||
@ -17,6 +16,7 @@ from src.parsers.base_parser import BaseParser
|
||||
|
||||
# TODO: добавить логгер с временными метками в yt-dlp
|
||||
|
||||
|
||||
class MasterService:
|
||||
def __init__(self):
|
||||
self.loop = asyncio.get_event_loop()
|
||||
@ -40,10 +40,7 @@ class MasterService:
|
||||
async def create_workers(self):
|
||||
while True:
|
||||
video_params = await self.queue.get()
|
||||
# TODO: позднее написать функцию для определения парсера автоматически
|
||||
redis = RedisClient()
|
||||
# TODO: проверить что в редисе задача либо уже выполнена, т.е. сразу отдать ссылку, либо что она ранее была закончена с ошибкой
|
||||
# и проверять словарь self.currently_underway, для надёжности
|
||||
await redis.del_task_from_queue_and_add_to_tasks(task=video_params)
|
||||
self.currently_underway[video_params['link']] = video_params
|
||||
|
||||
@ -53,7 +50,7 @@ class MasterService:
|
||||
|
||||
result: Result = await download_task
|
||||
if result.result_type in [ResultTypeEnum.DONE, ResultTypeEnum.EXIST]:
|
||||
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value)
|
||||
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params)
|
||||
await publish_message_with_task_done(task=result.value)
|
||||
self.queue.task_done()
|
||||
else:
|
||||
@ -62,16 +59,11 @@ class MasterService:
|
||||
"result": result.value,
|
||||
"status": "error"
|
||||
}
|
||||
await redis.del_task_from_tasks_and_add_to_task_done(task=error_message)
|
||||
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params)
|
||||
await publish_message_with_task_done(task=error_message)
|
||||
|
||||
if video_params['link'] in self.currently_underway:
|
||||
del self.currently_underway[video_params['link']]
|
||||
# TODO process result
|
||||
# Result.Done \ Result.Exist - уведомить что задача выполнена, и отослать во вторую очередь сообщений
|
||||
# RabbitMQ сообщение об этом
|
||||
# Result.Error - в таблице Редиса для выполненых задач, пометить, что это ошибка и уведомить об этом
|
||||
# по второй очереди сообщений и потом почистить self.currently_underway
|
||||
|
||||
@staticmethod
|
||||
def video_download(video_params: dict):
|
||||
@ -113,9 +105,6 @@ class MasterService:
|
||||
})
|
||||
except SiteNotImplementedException as ex:
|
||||
return Result(result_type=ResultTypeEnum.EXCEPTION, value=ex.default_message)
|
||||
|
||||
except Exception as ex:
|
||||
return Result(result_type=ResultTypeEnum.EXCEPTION, value=traceback.format_exc())
|
||||
# TODO upload to server
|
||||
|
||||
|
||||
|
@ -11,22 +11,14 @@ class RedisClient:
|
||||
def __init__(self):
|
||||
self.connection = redis.Redis(host="localhost", port=6379, db=0)
|
||||
|
||||
async def _set_task(self, task: dict) -> int:
|
||||
async def _set_task(self, task: dict, queue_name) -> int:
|
||||
async with self.connection as connection:
|
||||
res = await connection.set(f'{self.TASKS_NAME}:{task["link"]}', json.dumps(task, indent=4).encode('utf-8'))
|
||||
res = await connection.sadd(f'{queue_name}:1', json.dumps(task, indent=4).encode('utf-8'))
|
||||
return res
|
||||
|
||||
async def _set_task_done(self, task: dict) -> int:
|
||||
async def _del_task(self, task: dict, queue_name) -> int:
|
||||
async with self.connection as connection:
|
||||
res = await connection.sadd(
|
||||
f'{self.TASKS_DONE_NAME}:1',
|
||||
json.dumps(task, indent=4).encode('utf-8')
|
||||
)
|
||||
return res
|
||||
|
||||
async def _del_task(self, task: dict) -> int:
|
||||
async with self.connection as connection:
|
||||
res = await connection.delete(f'{self.TASKS_NAME}:{task["link"]}')
|
||||
res = await connection.srem(f'{queue_name}:1', json.dumps(task, indent=4).encode('utf-8'))
|
||||
return res
|
||||
|
||||
async def set_task_to_queue(self, task: dict) -> int:
|
||||
@ -36,7 +28,17 @@ class RedisClient:
|
||||
|
||||
async def get_queue(self) -> set:
|
||||
async with self.connection as connection:
|
||||
res = await connection.smembers(self.SET_NAME)
|
||||
res = await connection.smembers(self.SET_NAME + f":1")
|
||||
return res
|
||||
|
||||
async def get_tasks(self) -> set:
|
||||
async with self.connection as connection:
|
||||
res = await connection.smembers(self.TASKS_NAME + f":1")
|
||||
return res
|
||||
|
||||
async def get_task_done_queue(self) -> set:
|
||||
async with self.connection as connection:
|
||||
res = await connection.smembers(self.TASKS_DONE_NAME + f":1")
|
||||
return res
|
||||
|
||||
async def del_task_from_queue(self, task: dict) -> int:
|
||||
@ -45,27 +47,14 @@ class RedisClient:
|
||||
return res
|
||||
|
||||
async def del_task_from_queue_and_add_to_tasks(self, task: dict) -> int:
|
||||
await self._del_task(task, self.SET_NAME)
|
||||
return await self._set_task(task, self.TASKS_NAME)
|
||||
|
||||
await self.del_task_from_queue(task)
|
||||
return await self._set_task(task)
|
||||
|
||||
async def del_task_from_tasks_and_add_to_task_done(self, task: dict) -> int:
|
||||
await self._del_task(task)
|
||||
return await self._set_task_done(task)
|
||||
|
||||
async def get_task_done_queue(self) -> set:
|
||||
async with self.connection as connection:
|
||||
res = await connection.smembers(self.TASKS_DONE_NAME + f":1")
|
||||
return res
|
||||
async def del_task_from_tasks_and_add_to_task_done(self, task: dict, working_task: dict) -> int:
|
||||
await self._del_task(working_task, self.TASKS_NAME)
|
||||
return await self._set_task(task, self.TASKS_DONE_NAME)
|
||||
|
||||
async def del_task_from_task_done_queue(self, task) -> int:
|
||||
async with self.connection as connection:
|
||||
res = await connection.srem(self.TASKS_DONE_NAME + f":1", json.dumps(task, indent=4).encode('utf-8'))
|
||||
return res
|
||||
|
||||
async def get_tasks_queue(self) -> set:
|
||||
async with self.connection as connection:
|
||||
res = await connection.json().get(self.TASKS_NAME)
|
||||
return res
|
||||
|
||||
|
||||
|
@ -27,6 +27,12 @@ class VideoDownloader:
|
||||
|
||||
def download(self):
|
||||
# TODO: удалить все файлы связанные с текущим видео, которые сейчас остались
|
||||
base = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
base_download_dir = os.path.join(base, os.pardir, "downloads", self.info['extractor_key'])
|
||||
for root, dirs, files in os.walk(base_download_dir):
|
||||
for file in files:
|
||||
if file.find(self.info['id']) != -1 and file.find('.part') != -1:
|
||||
os.remove(base_download_dir + f"/{file}")
|
||||
with YoutubeDL(self.ydl_opts if self.ydl_opts else {}) as ydl:
|
||||
ydl.download([self.link])
|
||||
return self.info
|
||||
|
Reference in New Issue
Block a user