rework redis, rework web for work with array of links
This commit is contained in:
@ -8,5 +8,5 @@ redis = RedisClient()
|
||||
class AsyncQueue(asyncio.Queue):
|
||||
|
||||
async def put(self, item):
|
||||
await redis.set_task_to_queue(item)
|
||||
await redis.set_task_to_queue(item["link"], item)
|
||||
return await super().put(item)
|
||||
|
@ -12,12 +12,10 @@ from src.core.redis_client import RedisClient
|
||||
from src.core.result import Result, ResultTypeEnum
|
||||
from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException
|
||||
from src.parsers.MyMail.my_mail_parser import MyMailParser
|
||||
from src.parsers.Okru.ok_parser import OkParser
|
||||
from src.parsers.Yappy.yappy_parser import YappyParser
|
||||
from src.parsers.base_parser import BaseParser
|
||||
from loguru import logger
|
||||
from src.parsers.parser_mapping import get_parser
|
||||
# TODO: добавить логгер с временными метками в yt-dlp
|
||||
|
||||
|
||||
class MasterService:
|
||||
@ -40,12 +38,17 @@ class MasterService:
|
||||
|
||||
await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
|
||||
|
||||
async def result_processing(self, result: Result | list, redis: RedisClient, video_params: dict):
|
||||
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, link=video_params["link"])
|
||||
await publish_message_with_task_done(task=result.value)
|
||||
self.queue.task_done()
|
||||
|
||||
async def create_workers(self):
|
||||
while True:
|
||||
video_params = await self.queue.get()
|
||||
redis = RedisClient()
|
||||
await redis.del_tasks_queue()
|
||||
await redis.del_task_from_queue_and_add_to_tasks(task=video_params)
|
||||
await redis.del_task_from_queue_and_add_to_tasks(link=video_params["link"], task=video_params)
|
||||
self.currently_underway[video_params['link']] = video_params
|
||||
|
||||
download_task = self.loop.run_in_executor(self.executor, partial(
|
||||
@ -53,18 +56,7 @@ class MasterService:
|
||||
))
|
||||
|
||||
result: Result = await download_task
|
||||
if result.result_type in [ResultTypeEnum.DONE, ResultTypeEnum.EXIST]:
|
||||
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params)
|
||||
await publish_message_with_task_done(task=result.value)
|
||||
self.queue.task_done()
|
||||
else:
|
||||
error_message = {
|
||||
"link": video_params["link"],
|
||||
"result": result.value,
|
||||
"status": "error"
|
||||
}
|
||||
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params)
|
||||
await publish_message_with_task_done(task=error_message)
|
||||
await self.result_processing(result, redis, video_params)
|
||||
|
||||
if video_params['link'] in self.currently_underway:
|
||||
del self.currently_underway[video_params['link']]
|
||||
@ -79,7 +71,7 @@ class MasterService:
|
||||
def get_parser(params: dict):
|
||||
try:
|
||||
url_parse_result = urlparse(params["link"])
|
||||
uri = f"{url_parse_result.netloc}{url_parse_result.path}"
|
||||
uri = f"{url_parse_result.netloc}{url_parse_result.path}"
|
||||
logger.info(uri)
|
||||
# # TODO: похоже нужно переделать на регулярки, т.к. добавлять каждую вариацию домена моветон, вероятно я сделаюне-
|
||||
# parser_mapping = {
|
||||
@ -106,7 +98,13 @@ class MasterService:
|
||||
"link": video_params["link"],
|
||||
"result": result,
|
||||
"status": "done"
|
||||
})
|
||||
}) if not isinstance(result, list)\
|
||||
else Result(result_type=ResultTypeEnum.DONE, value={
|
||||
"link": video_params["link"],
|
||||
"result": [result_part for result_part in result],
|
||||
"status": "done"
|
||||
})
|
||||
|
||||
except FileAlreadyExistException as ex:
|
||||
return Result(result_type=ResultTypeEnum.EXIST, value={
|
||||
"link": video_params["link"],
|
||||
@ -120,16 +118,11 @@ class MasterService:
|
||||
"status": "error"
|
||||
})
|
||||
except Exception as ex:
|
||||
Result(result_type=ResultTypeEnum.EXCEPTION, value={
|
||||
return Result(result_type=ResultTypeEnum.EXCEPTION, value={
|
||||
"link": video_params["link"],
|
||||
"result": traceback.format_exc(),
|
||||
"status": "error"
|
||||
})
|
||||
return Result(result_type=ResultTypeEnum.EXCEPTION, value={
|
||||
"link": video_params["link"],
|
||||
"result": "Unknown exception encountered",
|
||||
"status": "error"
|
||||
})
|
||||
# TODO upload to server
|
||||
|
||||
|
||||
|
@ -28,7 +28,7 @@ async def get_messages(inner_queue) -> None:
|
||||
await asyncio.Future()
|
||||
|
||||
|
||||
async def publish_message_with_task_done(task: dict) -> None:
|
||||
async def publish_message_with_task_done(task: dict | list) -> None:
|
||||
queue_name = "tasks_done"
|
||||
async with await connect("amqp://guest:guest@localhost/") as connection:
|
||||
# Creating channel
|
||||
|
@ -7,61 +7,57 @@ class RedisClient:
|
||||
SET_NAME = "queue"
|
||||
TASKS_NAME = "tasks_working"
|
||||
TASKS_DONE_NAME = "tasks_done"
|
||||
|
||||
# TODO: переписать всё вазимодействие редиса обратно на ключ-значение
|
||||
|
||||
def __init__(self):
|
||||
self.connection = redis.Redis(host="localhost", port=6379, db=0)
|
||||
|
||||
async def _set_task(self, task: dict, queue_name) -> int:
|
||||
async def _set_task(self, queue_name: str, link: str, task: dict | list, ) -> int:
|
||||
async with self.connection as connection:
|
||||
res = await connection.sadd(f'{queue_name}:1', json.dumps(task, indent=4).encode('utf-8'))
|
||||
res = await connection.hset(queue_name, link, json.dumps(task, indent=4).encode('utf-8'))
|
||||
return res
|
||||
|
||||
async def _del_task(self, task: dict, queue_name) -> int:
|
||||
async def _del_task(self, queue_name: str, link: str,) -> int:
|
||||
async with self.connection as connection:
|
||||
res = await connection.srem(f'{queue_name}:1', json.dumps(task, indent=4).encode('utf-8'))
|
||||
res = await connection.hdel(queue_name, link)
|
||||
return res
|
||||
|
||||
async def set_task_to_queue(self, task: dict) -> int:
|
||||
async with self.connection as connection:
|
||||
res = await connection.sadd(self.SET_NAME, json.dumps(task, indent=4).encode('utf-8'))
|
||||
async def set_task_to_queue(self, link: str, task: dict | list) -> int:
|
||||
res = await self._set_task(queue_name=self.SET_NAME, link=link, task=task)
|
||||
return res
|
||||
|
||||
async def get_queue(self) -> set:
|
||||
async def get_all_tasks_from_queue(self, queue_name: str) -> dict:
|
||||
async with self.connection as connection:
|
||||
res = await connection.smembers(self.SET_NAME + f":1")
|
||||
res = await connection.hgetall(queue_name)
|
||||
return res
|
||||
|
||||
async def get_tasks(self) -> set:
|
||||
async with self.connection as connection:
|
||||
res = await connection.smembers(self.TASKS_NAME + f":1")
|
||||
return res
|
||||
# async def get_tasks(self) -> set:
|
||||
# async with self.connection as connection:
|
||||
# res = await connection.smembers(self.TASKS_NAME + f":1")
|
||||
# return res
|
||||
|
||||
async def get_task_done_queue(self) -> set:
|
||||
async with self.connection as connection:
|
||||
res = await connection.smembers(self.TASKS_DONE_NAME + f":1")
|
||||
return res
|
||||
# async def get_task_done_queue(self) -> set:
|
||||
# async with self.connection as connection:
|
||||
# res = await connection.smembers(self.TASKS_DONE_NAME + f":1")
|
||||
# return res
|
||||
|
||||
async def del_task_from_queue(self, task: dict) -> int:
|
||||
async with self.connection as connection:
|
||||
res = await connection.srem(self.SET_NAME, json.dumps(task, indent=4).encode('utf-8'))
|
||||
return res
|
||||
# async def del_task_from_queue(self, link, task: dict) -> int:
|
||||
# async with self.connection as connection:
|
||||
# res = await self._del_task(self.SET_NAME, json.dumps(task, indent=4).encode('utf-8'))
|
||||
# return res
|
||||
|
||||
async def del_task_from_queue_and_add_to_tasks(self, task: dict) -> int:
|
||||
await self._del_task(task, self.SET_NAME)
|
||||
return await self._set_task(task, self.TASKS_NAME)
|
||||
async def del_task_from_queue_and_add_to_tasks(self, link: str, task: dict | list) -> int:
|
||||
await self._del_task(self.SET_NAME, link)
|
||||
return await self._set_task(self.TASKS_NAME, link, task)
|
||||
|
||||
async def del_task_from_tasks_and_add_to_task_done(self, task: dict, working_task: dict) -> int:
|
||||
await self._del_task(working_task, self.TASKS_NAME)
|
||||
return await self._set_task(task, self.TASKS_DONE_NAME)
|
||||
async def del_task_from_tasks_and_add_to_task_done(self, task: dict | list, link: str) -> int:
|
||||
await self._del_task(self.TASKS_NAME, link)
|
||||
return await self._set_task(self.TASKS_DONE_NAME, link, task)
|
||||
|
||||
async def del_task_from_task_done_queue(self, task) -> int:
|
||||
async with self.connection as connection:
|
||||
res = await connection.srem(self.TASKS_DONE_NAME + f":1", json.dumps(task, indent=4).encode('utf-8'))
|
||||
res = await self._del_task(self.TASKS_DONE_NAME, task["link"])
|
||||
return res
|
||||
|
||||
async def del_tasks_queue(self) -> int:
|
||||
async with self.connection as connection:
|
||||
res = await connection.delete(self.TASKS_NAME + f":1")
|
||||
res = await connection.delete(self.TASKS_NAME)
|
||||
return res
|
||||
|
@ -10,7 +10,7 @@ class ResultTypeEnum(Enum):
|
||||
|
||||
|
||||
class Result:
|
||||
def __init__(self, result_type: ResultTypeEnum, value: str | dict = None):
|
||||
def __init__(self, result_type: ResultTypeEnum, value: str | dict | list = None):
|
||||
self.result_type = result_type
|
||||
self.value = value
|
||||
|
||||
|
Reference in New Issue
Block a user