refactoring for new arch, added Redis, fixed filename, added video exists check
This commit is contained in:
@@ -1,10 +1,15 @@
|
||||
import asyncio
|
||||
import concurrent.futures as pool
|
||||
import os.path
|
||||
import subprocess
|
||||
|
||||
from functools import partial
|
||||
|
||||
from fastapi import HTTPException
|
||||
|
||||
from src.core.async_queue import AsyncQueue
|
||||
from src.core.rabbitmq import get_messages
|
||||
from src.core.redis_client import RedisClient
|
||||
from src.core.result import Result, ResultTypeEnum
|
||||
from src.core.ydl import VideoDownloader
|
||||
from src.exceptions.download_exceptions import SiteNotImplementedException
|
||||
@@ -16,12 +21,16 @@ class MasterService:
|
||||
self.MAX_EXECUTOR_WORKERS = 8
|
||||
self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS,
|
||||
initializer=executor_initializer)
|
||||
self.queue = asyncio.Queue()
|
||||
self.queue = AsyncQueue()
|
||||
self.rabbit_consumer = get_messages
|
||||
self.currently_underway = {} # contains currently in progress videos
|
||||
|
||||
async def run(self):
|
||||
# TODO add cleanup procedure for existing exector procs
|
||||
subprocess.run(
|
||||
"for pid in $(ps -ef | grep video_downloader_executor_process | awk '{print $2}'); do kill -9 $pid; done",
|
||||
shell=True, capture_output=True
|
||||
)
|
||||
|
||||
tasks = [self.loop.create_task(self.create_workers()) for i in range(self.MAX_EXECUTOR_WORKERS + 1)]
|
||||
|
||||
await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
|
||||
@@ -29,15 +38,17 @@ class MasterService:
|
||||
async def create_workers(self):
|
||||
while True:
|
||||
video_params = await self.queue.get()
|
||||
redis = RedisClient()
|
||||
await redis.del_task_from_queue_and_add_to_tasks(task=video_params)
|
||||
self.currently_underway[video_params['link']] = video_params
|
||||
# TODO save current copy of self.currently_underway to DB\file\etc
|
||||
download_task = self.loop.run_in_executor(self.executor, partial(
|
||||
MasterService.video_processing_executor, video_params=video_params
|
||||
)
|
||||
)
|
||||
)
|
||||
if download_task.done():
|
||||
result = download_task.result()
|
||||
# process result
|
||||
|
||||
result = await download_task
|
||||
await redis.del_task_from_tasks_and_add_to_task_done(task=video_params)
|
||||
# TODO process result
|
||||
self.queue.task_done()
|
||||
|
||||
@staticmethod
|
||||
@@ -46,9 +57,16 @@ class MasterService:
|
||||
"format": video_params["format"],
|
||||
"merge_output_format": video_params["merge_output_format"],
|
||||
'outtmpl': video_params["outtmpl"],
|
||||
"quiet": True
|
||||
}
|
||||
downloader = VideoDownloader(link=video_params["link"], ydl_opts=ydl_opts)
|
||||
video_info = downloader.get_info()
|
||||
if os.path.exists(
|
||||
os.path.join(os.getcwd() + f"Youtube/{video_info['id']}_{video_info['width']}.{video_info['ext']}")
|
||||
):
|
||||
return Result(result_type=ResultTypeEnum.EXIST)
|
||||
try:
|
||||
downloader.ydl_opts["quiet"] = False
|
||||
result = downloader.download()
|
||||
return result
|
||||
except SiteNotImplementedException as ex:
|
||||
@@ -59,7 +77,6 @@ class MasterService:
|
||||
|
||||
@staticmethod
|
||||
def video_processing_executor(video_params: dict):
|
||||
# TODO check if video exists on server etc. Result(type=ResultType.EXISTS, value=False)
|
||||
try:
|
||||
MasterService.video_download(video_params=video_params)
|
||||
return Result(result_type=ResultTypeEnum.DONE)
|
||||
@@ -74,4 +91,3 @@ def executor_initializer():
|
||||
return True
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user