rework redis, rework web for work with array of links

This commit is contained in:
nikili0n
2023-09-29 05:53:27 +03:00
committed by Dantenerosas
parent 1a2975c0d4
commit 5b848dd3eb
10 changed files with 153 additions and 124 deletions

View File

@@ -12,12 +12,10 @@ from src.core.redis_client import RedisClient
from src.core.result import Result, ResultTypeEnum
from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException
from src.parsers.MyMail.my_mail_parser import MyMailParser
from src.parsers.Okru.ok_parser import OkParser
from src.parsers.Yappy.yappy_parser import YappyParser
from src.parsers.base_parser import BaseParser
from loguru import logger
from src.parsers.parser_mapping import get_parser
# TODO: добавить логгер с временными метками в yt-dlp
class MasterService:
@@ -40,12 +38,17 @@ class MasterService:
await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
async def result_processing(self, result: Result | list, redis: RedisClient, video_params: dict):
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, link=video_params["link"])
await publish_message_with_task_done(task=result.value)
self.queue.task_done()
async def create_workers(self):
while True:
video_params = await self.queue.get()
redis = RedisClient()
await redis.del_tasks_queue()
await redis.del_task_from_queue_and_add_to_tasks(task=video_params)
await redis.del_task_from_queue_and_add_to_tasks(link=video_params["link"], task=video_params)
self.currently_underway[video_params['link']] = video_params
download_task = self.loop.run_in_executor(self.executor, partial(
@@ -53,18 +56,7 @@ class MasterService:
))
result: Result = await download_task
if result.result_type in [ResultTypeEnum.DONE, ResultTypeEnum.EXIST]:
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params)
await publish_message_with_task_done(task=result.value)
self.queue.task_done()
else:
error_message = {
"link": video_params["link"],
"result": result.value,
"status": "error"
}
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params)
await publish_message_with_task_done(task=error_message)
await self.result_processing(result, redis, video_params)
if video_params['link'] in self.currently_underway:
del self.currently_underway[video_params['link']]
@@ -79,7 +71,7 @@ class MasterService:
def get_parser(params: dict):
try:
url_parse_result = urlparse(params["link"])
uri = f"{url_parse_result.netloc}{url_parse_result.path}"
uri = f"{url_parse_result.netloc}{url_parse_result.path}"
logger.info(uri)
# # TODO: похоже нужно переделать на регулярки, т.к. добавлять каждую вариацию домена моветон, вероятно я сделаюне-
# parser_mapping = {
@@ -106,7 +98,13 @@ class MasterService:
"link": video_params["link"],
"result": result,
"status": "done"
})
}) if not isinstance(result, list)\
else Result(result_type=ResultTypeEnum.DONE, value={
"link": video_params["link"],
"result": [result_part for result_part in result],
"status": "done"
})
except FileAlreadyExistException as ex:
return Result(result_type=ResultTypeEnum.EXIST, value={
"link": video_params["link"],
@@ -120,16 +118,11 @@ class MasterService:
"status": "error"
})
except Exception as ex:
Result(result_type=ResultTypeEnum.EXCEPTION, value={
return Result(result_type=ResultTypeEnum.EXCEPTION, value={
"link": video_params["link"],
"result": traceback.format_exc(),
"status": "error"
})
return Result(result_type=ResultTypeEnum.EXCEPTION, value={
"link": video_params["link"],
"result": "Unknown exception encountered",
"status": "error"
})
# TODO upload to server