video_downloader_service/src/core/master_service.py
2024-04-23 14:57:01 +03:00

135 lines
5.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import concurrent.futures as pool
import subprocess
from functools import partial
import traceback
from urllib.parse import urlparse
from src.core.async_queue import AsyncQueue
from src.core.rabbitmq import get_messages, publish_message_with_task_done
from src.core.redis_client import RedisClient
from src.core.result import Result, ResultTypeEnum
from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException
from src.parsers.MyMail.my_mail_parser import MyMailParser
from src.parsers.Okru.ok_parser import OkParser
from src.parsers.Yappy.yappy_parser import YappyParser
from src.parsers.base_parser import BaseParser
# TODO: добавить логгер с временными метками в yt-dlp
class MasterService:
def __init__(self):
self.loop = asyncio.get_event_loop()
self.MAX_EXECUTOR_WORKERS = 8
self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS,
initializer=executor_initializer)
self.queue = AsyncQueue()
self.rabbit_consumer = get_messages
self.currently_underway = {} # contains currently in progress videos
async def run(self):
subprocess.run(
"for pid in $(ps -ef | grep video_downloader_executor_process | awk '{print $2}'); do kill -9 $pid; done",
shell=True, capture_output=True
)
tasks = [self.loop.create_task(self.create_workers()) for i in range(self.MAX_EXECUTOR_WORKERS + 1)]
await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
async def create_workers(self):
while True:
video_params = await self.queue.get()
redis = RedisClient()
await redis.del_tasks_queue()
await redis.del_task_from_queue_and_add_to_tasks(task=video_params)
self.currently_underway[video_params['link']] = video_params
download_task = self.loop.run_in_executor(self.executor, partial(
MasterService.video_processing_executor, video_params=video_params
))
result: Result = await download_task
if result.result_type in [ResultTypeEnum.DONE, ResultTypeEnum.EXIST]:
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params)
await publish_message_with_task_done(task=result.value)
self.queue.task_done()
else:
error_message = {
"link": video_params["link"],
"result": result.value,
"status": "error"
}
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params)
await publish_message_with_task_done(task=error_message)
if video_params['link'] in self.currently_underway:
del self.currently_underway[video_params['link']]
@staticmethod
def video_download(video_params: dict):
downloader: BaseParser | YappyParser | MyMailParser = MasterService.get_parser(video_params)
result = downloader.video_download()
return result
@staticmethod
def get_parser(params: dict):
try:
domain = urlparse(params["link"]).netloc
parser_mapping = {
"my.mail.ru": MyMailParser(params),
"www.youtube.com": BaseParser(params),
"youtu.be": BaseParser(params),
"vk.com": BaseParser(params),
"ok.ru": BaseParser(params) if "topic" not in params["link"] else OkParser(params),
"likee.video": BaseParser(params),
"dzen.ru": BaseParser(params),
"yappy.media": YappyParser(params),
"yandex.ru": BaseParser(params),
}
return parser_mapping[domain]
except KeyError:
raise SiteNotImplementedException
@staticmethod
def video_processing_executor(video_params: dict):
try:
result = MasterService.video_download(video_params=video_params)
return Result(result_type=ResultTypeEnum.DONE, value={
"link": video_params["link"],
"result": result,
"status": "done"
})
except FileAlreadyExistException as ex:
return Result(result_type=ResultTypeEnum.EXIST, value={
"link": video_params["link"],
"result": ex.message,
"status": "exist"
})
except SiteNotImplementedException as ex:
return Result(result_type=ResultTypeEnum.EXCEPTION, value={
"link": video_params["link"],
"result": ex.default_message,
"status": "error"
})
except Exception as ex:
Result(result_type=ResultTypeEnum.EXCEPTION, value={
"link": video_params["link"],
"result": traceback.format_exc(),
"status": "error"
})
return Result(result_type=ResultTypeEnum.EXCEPTION, value={
"link": video_params["link"],
"result": "Unknown exception encountered",
"status": "error"
})
# TODO upload to server
def executor_initializer():
import setproctitle
setproctitle.setproctitle(f'video_downloader_executor_process')
return True