video_downloader_service/src/core/master_service.py

125 lines
5.6 KiB
Python
Raw Normal View History

import asyncio
import concurrent.futures as pool
import subprocess
2023-09-22 00:17:24 +03:00
import traceback
2023-08-24 03:28:55 +03:00
from functools import partial
2023-09-22 00:17:24 +03:00
from urllib.parse import urlparse
from src.core.async_queue import AsyncQueue
2023-09-22 00:17:24 +03:00
from src.core.rabbitmq import get_messages, publish_message_with_task_done
from src.core.redis_client import RedisClient
2023-08-24 16:45:55 +03:00
from src.core.result import Result, ResultTypeEnum
2023-09-22 00:17:24 +03:00
from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException
from src.parsers.MyMail.my_mail_parser import MyMailParser
from src.parsers.Yappy.yappy_parser import YappyParser
from src.parsers.base_parser import BaseParser
2023-08-24 03:28:55 +03:00
class MasterService:
def __init__(self):
2023-08-24 16:45:55 +03:00
self.loop = asyncio.get_event_loop()
2023-08-24 03:28:55 +03:00
self.MAX_EXECUTOR_WORKERS = 8
2023-08-24 16:45:55 +03:00
self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS,
initializer=executor_initializer)
self.queue = AsyncQueue()
2023-08-24 03:28:55 +03:00
self.rabbit_consumer = get_messages
2023-08-24 16:45:55 +03:00
self.currently_underway = {} # contains currently in progress videos
2023-08-24 03:28:55 +03:00
async def run(self):
subprocess.run(
"for pid in $(ps -ef | grep video_downloader_executor_process | awk '{print $2}'); do kill -9 $pid; done",
shell=True, capture_output=True
)
2023-08-24 16:45:55 +03:00
tasks = [self.loop.create_task(self.create_workers()) for i in range(self.MAX_EXECUTOR_WORKERS + 1)]
2023-08-24 03:28:55 +03:00
await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
2023-08-24 16:45:55 +03:00
async def create_workers(self):
while True:
video_params = await self.queue.get()
2023-09-22 00:17:24 +03:00
# TODO: позднее написать функцию для определения парсера автоматически
redis = RedisClient()
2023-09-21 00:21:03 +03:00
# TODO: проверить что в редисе задача либо уже выполнена, т.е. сразу отдать ссылку, либо что она ранее была закончена с ошибкой
# и проверять словарь self.currently_underway, для надёжности
await redis.del_task_from_queue_and_add_to_tasks(task=video_params)
2023-08-24 16:45:55 +03:00
self.currently_underway[video_params['link']] = video_params
2023-09-22 00:17:24 +03:00
2023-08-24 16:45:55 +03:00
download_task = self.loop.run_in_executor(self.executor, partial(
MasterService.video_processing_executor, video_params=video_params
2023-09-22 00:17:24 +03:00
))
result: Result = await download_task
if result.result_type in [ResultTypeEnum.DONE, ResultTypeEnum.EXIST]:
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value)
await publish_message_with_task_done(task=result.value)
self.queue.task_done()
else:
error_message = {
"link": video_params["link"],
"result": result.value,
"status": "error"
}
await redis.del_task_from_tasks_and_add_to_task_done(task=error_message)
await publish_message_with_task_done(task=error_message)
if video_params['link'] in self.currently_underway:
del self.currently_underway[video_params['link']]
# TODO process result
2023-09-22 00:17:24 +03:00
# Result.Done \ Result.Exist - уведомить что задача выполнена, и отослать во вторую очередь сообщений
# RabbitMQ сообщение об этом
# Result.Error - в таблице Редиса для выполненых задач, пометить, что это ошибка и уведомить об этом
# по второй очереди сообщений и потом почистить self.currently_underway
2023-08-24 16:45:55 +03:00
@staticmethod
def video_download(video_params: dict):
downloader: BaseParser | YappyParser | MyMailParser = MasterService.get_parser(video_params)
2023-09-22 00:17:24 +03:00
result = downloader.video_download()
return result
2023-08-24 16:45:55 +03:00
@staticmethod
def get_parser(params: dict):
2023-09-22 00:17:24 +03:00
try:
domain = urlparse(params["link"]).netloc
parser_mapping = {
"my.mail.ru": MyMailParser(params),
"www.youtube.com": BaseParser(params),
"vk.com": BaseParser(params),
"ok.ru": BaseParser(params),
"likee.video": BaseParser(params),
"dzen.ru": BaseParser(params),
"yappy.media": YappyParser(params),
}
return parser_mapping[domain]
except KeyError:
raise SiteNotImplementedException
2023-08-24 16:45:55 +03:00
@staticmethod
def video_processing_executor(video_params: dict):
try:
2023-09-20 14:43:59 +03:00
result = MasterService.video_download(video_params=video_params)
2023-09-22 00:17:24 +03:00
return Result(result_type=ResultTypeEnum.DONE, value={
"link": video_params["link"],
"result": result,
"status": "done"
})
except FileAlreadyExistException as ex:
return Result(result_type=ResultTypeEnum.EXIST, value={
"link": video_params["link"],
"result": ex.message,
"status": "exist"
})
except SiteNotImplementedException as ex:
return Result(result_type=ResultTypeEnum.EXCEPTION, value=ex.default_message)
2023-08-24 16:45:55 +03:00
except Exception as ex:
2023-09-22 00:17:24 +03:00
return Result(result_type=ResultTypeEnum.EXCEPTION, value=traceback.format_exc())
2023-08-24 16:45:55 +03:00
# TODO upload to server
def executor_initializer():
import setproctitle
setproctitle.setproctitle(f'video_downloader_executor_process')
return True