minor fixes, rework web service, add features

This commit is contained in:
nikili0n
2023-09-25 04:05:42 +03:00
committed by Dantenerosas
parent 801b9f2e52
commit 9d6d9947f5
9 changed files with 182 additions and 84 deletions

View File

@@ -1,7 +1,6 @@
import asyncio
import concurrent.futures as pool
import subprocess
import traceback
from functools import partial
from urllib.parse import urlparse
@@ -17,6 +16,7 @@ from src.parsers.base_parser import BaseParser
# TODO: добавить логгер с временными метками в yt-dlp
class MasterService:
def __init__(self):
self.loop = asyncio.get_event_loop()
@@ -40,10 +40,7 @@ class MasterService:
async def create_workers(self):
while True:
video_params = await self.queue.get()
# TODO: позднее написать функцию для определения парсера автоматически
redis = RedisClient()
# TODO: проверить что в редисе задача либо уже выполнена, т.е. сразу отдать ссылку, либо что она ранее была закончена с ошибкой
# и проверять словарь self.currently_underway, для надёжности
await redis.del_task_from_queue_and_add_to_tasks(task=video_params)
self.currently_underway[video_params['link']] = video_params
@@ -53,7 +50,7 @@ class MasterService:
result: Result = await download_task
if result.result_type in [ResultTypeEnum.DONE, ResultTypeEnum.EXIST]:
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value)
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params)
await publish_message_with_task_done(task=result.value)
self.queue.task_done()
else:
@@ -62,16 +59,11 @@ class MasterService:
"result": result.value,
"status": "error"
}
await redis.del_task_from_tasks_and_add_to_task_done(task=error_message)
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params)
await publish_message_with_task_done(task=error_message)
if video_params['link'] in self.currently_underway:
del self.currently_underway[video_params['link']]
# TODO process result
# Result.Done \ Result.Exist - уведомить что задача выполнена, и отослать во вторую очередь сообщений
# RabbitMQ сообщение об этом
# Result.Error - в таблице Редиса для выполненых задач, пометить, что это ошибка и уведомить об этом
# по второй очереди сообщений и потом почистить self.currently_underway
@staticmethod
def video_download(video_params: dict):
@@ -113,9 +105,6 @@ class MasterService:
})
except SiteNotImplementedException as ex:
return Result(result_type=ResultTypeEnum.EXCEPTION, value=ex.default_message)
except Exception as ex:
return Result(result_type=ResultTypeEnum.EXCEPTION, value=traceback.format_exc())
# TODO upload to server