2023-08-23 04:13:56 +03:00
|
|
|
|
import asyncio
|
|
|
|
|
import concurrent.futures as pool
|
2023-08-27 16:27:28 +03:00
|
|
|
|
import subprocess
|
|
|
|
|
|
2023-08-24 03:28:55 +03:00
|
|
|
|
from functools import partial
|
2023-09-26 00:24:38 +03:00
|
|
|
|
import traceback
|
2023-09-22 00:17:24 +03:00
|
|
|
|
from urllib.parse import urlparse
|
2023-08-23 04:13:56 +03:00
|
|
|
|
|
2023-08-27 16:27:28 +03:00
|
|
|
|
from src.core.async_queue import AsyncQueue
|
2023-09-22 00:17:24 +03:00
|
|
|
|
from src.core.rabbitmq import get_messages, publish_message_with_task_done
|
2023-08-27 16:27:28 +03:00
|
|
|
|
from src.core.redis_client import RedisClient
|
2023-08-24 16:45:55 +03:00
|
|
|
|
from src.core.result import Result, ResultTypeEnum
|
2023-09-22 00:17:24 +03:00
|
|
|
|
from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException
|
2023-09-15 01:29:43 +03:00
|
|
|
|
from src.parsers.MyMail.my_mail_parser import MyMailParser
|
|
|
|
|
from src.parsers.Yappy.yappy_parser import YappyParser
|
|
|
|
|
from src.parsers.base_parser import BaseParser
|
2023-08-23 04:13:56 +03:00
|
|
|
|
|
2023-09-22 13:35:00 +03:00
|
|
|
|
# TODO: добавить логгер с временными метками в yt-dlp
|
2023-08-23 04:13:56 +03:00
|
|
|
|
|
2023-09-25 04:05:42 +03:00
|
|
|
|
|
2023-08-24 03:28:55 +03:00
|
|
|
|
class MasterService:
|
|
|
|
|
def __init__(self):
|
2023-08-24 16:45:55 +03:00
|
|
|
|
self.loop = asyncio.get_event_loop()
|
2023-08-24 03:28:55 +03:00
|
|
|
|
self.MAX_EXECUTOR_WORKERS = 8
|
2023-08-24 16:45:55 +03:00
|
|
|
|
self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS,
|
|
|
|
|
initializer=executor_initializer)
|
2023-08-27 16:27:28 +03:00
|
|
|
|
self.queue = AsyncQueue()
|
2023-08-24 03:28:55 +03:00
|
|
|
|
self.rabbit_consumer = get_messages
|
2023-08-24 16:45:55 +03:00
|
|
|
|
self.currently_underway = {} # contains currently in progress videos
|
2023-08-23 04:13:56 +03:00
|
|
|
|
|
2023-08-24 03:28:55 +03:00
|
|
|
|
async def run(self):
|
2023-08-27 16:27:28 +03:00
|
|
|
|
subprocess.run(
|
|
|
|
|
"for pid in $(ps -ef | grep video_downloader_executor_process | awk '{print $2}'); do kill -9 $pid; done",
|
|
|
|
|
shell=True, capture_output=True
|
|
|
|
|
)
|
|
|
|
|
|
2023-08-24 16:45:55 +03:00
|
|
|
|
tasks = [self.loop.create_task(self.create_workers()) for i in range(self.MAX_EXECUTOR_WORKERS + 1)]
|
|
|
|
|
|
2023-08-24 03:28:55 +03:00
|
|
|
|
await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
|
2023-08-23 04:13:56 +03:00
|
|
|
|
|
2023-08-24 16:45:55 +03:00
|
|
|
|
async def create_workers(self):
|
|
|
|
|
while True:
|
|
|
|
|
video_params = await self.queue.get()
|
2023-08-27 16:27:28 +03:00
|
|
|
|
redis = RedisClient()
|
|
|
|
|
await redis.del_task_from_queue_and_add_to_tasks(task=video_params)
|
2023-08-24 16:45:55 +03:00
|
|
|
|
self.currently_underway[video_params['link']] = video_params
|
2023-09-22 00:17:24 +03:00
|
|
|
|
|
2023-08-24 16:45:55 +03:00
|
|
|
|
download_task = self.loop.run_in_executor(self.executor, partial(
|
|
|
|
|
MasterService.video_processing_executor, video_params=video_params
|
2023-09-22 00:17:24 +03:00
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
result: Result = await download_task
|
|
|
|
|
if result.result_type in [ResultTypeEnum.DONE, ResultTypeEnum.EXIST]:
|
2023-09-25 04:05:42 +03:00
|
|
|
|
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params)
|
2023-09-22 00:17:24 +03:00
|
|
|
|
await publish_message_with_task_done(task=result.value)
|
|
|
|
|
self.queue.task_done()
|
|
|
|
|
else:
|
|
|
|
|
error_message = {
|
|
|
|
|
"link": video_params["link"],
|
|
|
|
|
"result": result.value,
|
|
|
|
|
"status": "error"
|
|
|
|
|
}
|
2023-09-25 04:05:42 +03:00
|
|
|
|
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, working_task=video_params)
|
2023-09-22 00:17:24 +03:00
|
|
|
|
await publish_message_with_task_done(task=error_message)
|
|
|
|
|
|
|
|
|
|
if video_params['link'] in self.currently_underway:
|
|
|
|
|
del self.currently_underway[video_params['link']]
|
2023-08-24 16:45:55 +03:00
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def video_download(video_params: dict):
|
2023-09-15 01:29:43 +03:00
|
|
|
|
downloader: BaseParser | YappyParser | MyMailParser = MasterService.get_parser(video_params)
|
2023-09-22 00:17:24 +03:00
|
|
|
|
result = downloader.video_download()
|
|
|
|
|
return result
|
2023-08-24 16:45:55 +03:00
|
|
|
|
|
2023-09-15 01:29:43 +03:00
|
|
|
|
@staticmethod
|
|
|
|
|
def get_parser(params: dict):
|
2023-09-22 00:17:24 +03:00
|
|
|
|
try:
|
|
|
|
|
domain = urlparse(params["link"]).netloc
|
|
|
|
|
parser_mapping = {
|
|
|
|
|
"my.mail.ru": MyMailParser(params),
|
|
|
|
|
"www.youtube.com": BaseParser(params),
|
2023-09-26 00:01:09 +03:00
|
|
|
|
"youtu.be": BaseParser(params),
|
2023-09-22 00:17:24 +03:00
|
|
|
|
"vk.com": BaseParser(params),
|
|
|
|
|
"ok.ru": BaseParser(params),
|
|
|
|
|
"likee.video": BaseParser(params),
|
|
|
|
|
"dzen.ru": BaseParser(params),
|
|
|
|
|
"yappy.media": YappyParser(params),
|
|
|
|
|
}
|
|
|
|
|
return parser_mapping[domain]
|
|
|
|
|
except KeyError:
|
|
|
|
|
raise SiteNotImplementedException
|
2023-09-15 01:29:43 +03:00
|
|
|
|
|
2023-08-24 16:45:55 +03:00
|
|
|
|
@staticmethod
|
|
|
|
|
def video_processing_executor(video_params: dict):
|
|
|
|
|
try:
|
2023-09-20 14:43:59 +03:00
|
|
|
|
result = MasterService.video_download(video_params=video_params)
|
2023-09-22 00:17:24 +03:00
|
|
|
|
return Result(result_type=ResultTypeEnum.DONE, value={
|
|
|
|
|
"link": video_params["link"],
|
|
|
|
|
"result": result,
|
|
|
|
|
"status": "done"
|
|
|
|
|
})
|
|
|
|
|
except FileAlreadyExistException as ex:
|
2023-09-26 00:01:09 +03:00
|
|
|
|
# TODO: сделать так, чтобы грузился обычный, нам не нужно записывать что-то в редис
|
2023-09-22 00:17:24 +03:00
|
|
|
|
return Result(result_type=ResultTypeEnum.EXIST, value={
|
|
|
|
|
"link": video_params["link"],
|
|
|
|
|
"result": ex.message,
|
|
|
|
|
"status": "exist"
|
|
|
|
|
})
|
|
|
|
|
except SiteNotImplementedException as ex:
|
2023-09-26 00:01:09 +03:00
|
|
|
|
return Result(result_type=ResultTypeEnum.EXCEPTION, value={
|
|
|
|
|
"link": video_params["link"],
|
|
|
|
|
"result": ex.default_message,
|
|
|
|
|
"status": "error"
|
|
|
|
|
})
|
2023-09-26 00:24:38 +03:00
|
|
|
|
except Exception as ex:
|
|
|
|
|
Result(result_type=ResultTypeEnum.EXCEPTION, value={
|
|
|
|
|
"link": video_params["link"],
|
|
|
|
|
"result": traceback.format_exc(),
|
|
|
|
|
"status": "error"
|
|
|
|
|
})
|
2023-08-24 16:45:55 +03:00
|
|
|
|
# TODO upload to server
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def executor_initializer():
|
|
|
|
|
import setproctitle
|
|
|
|
|
setproctitle.setproctitle(f'video_downloader_executor_process')
|
|
|
|
|
return True
|