refactored tg parser

This commit is contained in:
Dantenerosas 2023-10-14 03:08:13 +03:00 committed by nikili0n
parent cc6fae873a
commit 2dd7ad12e7
5 changed files with 67 additions and 20 deletions

View File

@ -1,10 +1,12 @@
import asyncio import asyncio
import concurrent.futures as pool import concurrent.futures as pool
import subprocess import subprocess
import pyrogram
import traceback
from functools import partial from functools import partial
import traceback
from urllib.parse import urlparse from urllib.parse import urlparse
from loguru import logger
from src.core.async_queue import AsyncQueue from src.core.async_queue import AsyncQueue
from src.core.rabbitmq import get_messages, publish_message_with_task_done from src.core.rabbitmq import get_messages, publish_message_with_task_done
@ -12,16 +14,16 @@ from src.core.redis_client import RedisClient
from src.core.result import Result, ResultTypeEnum from src.core.result import Result, ResultTypeEnum
from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException
from src.parsers.MyMail.my_mail_parser import MyMailParser from src.parsers.MyMail.my_mail_parser import MyMailParser
from src.parsers.Telegram.telegram_media_downloader.media_downloader import app, _check_config
from src.parsers.Yappy.yappy_parser import YappyParser from src.parsers.Yappy.yappy_parser import YappyParser
from src.parsers.base_parser import BaseParser from src.parsers.base_parser import BaseParser
from loguru import logger
from src.parsers.parser_mapping import get_parser from src.parsers.parser_mapping import get_parser
class MasterService: class MasterService:
def __init__(self): def __init__(self):
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
# self.tg_client =
self.MAX_EXECUTOR_WORKERS = 8 self.MAX_EXECUTOR_WORKERS = 8
self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS, self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS,
initializer=executor_initializer) initializer=executor_initializer)
@ -65,7 +67,18 @@ class MasterService:
@staticmethod @staticmethod
def video_download(video_params: dict): def video_download(video_params: dict):
downloader: BaseParser | YappyParser | MyMailParser = MasterService.get_parser(video_params) downloader: BaseParser | YappyParser | MyMailParser = MasterService.get_parser(video_params)
result = downloader.video_download() if _check_config():
tg_client = pyrogram.Client(
"media_downloader",
api_id=app.api_id,
api_hash=app.api_hash,
proxy=app.proxy,
workdir=app.session_file_path,
)
app.pre_run()
app.is_running = True
tg_client.start()
result = downloader.video_download(tg_client)
return result return result
@staticmethod @staticmethod

View File

@ -0,0 +1,22 @@
api_hash: cb06da2bf01e15627434223242b6446d
api_id: 21648766
chat:
- chat_id: landigos
download_filter: id >= 6949 && id < 6950
last_read_message_id: 6949
file_formats:
video:
- all
media_types:
- video
# in linux please use /
# save_path: E:\github\telegram_media_downloader
disable_syslog: []
save_path: /Users/garickbadalov/PycharmProjects/video_downloader_service/downloads/Telegram/
language: RU
web_host: 0.0.0.0
web_port: 51256
file_path_prefix:
- chat_id
file_name_prefix:
- message_id

View File

@ -486,8 +486,10 @@ async def worker(client: pyrogram.client.Client):
if node.client: if node.client:
await download_task(node.client, message, node) await download_task(node.client, message, node)
app.is_running = False
else: else:
await download_task(client, message, node) await download_task(client, message, node)
app.is_running = False
except Exception as e: except Exception as e:
logger.exception(f"{e}") logger.exception(f"{e}")

View File

@ -619,10 +619,10 @@ class Application:
self.config.pop("last_read_message_id") self.config.pop("last_read_message_id")
self.config["language"] = self.language.name self.config["language"] = self.language.name
# for it in self.downloaded_ids: for it in self.downloaded_ids:
# self.already_download_ids_set.add(it) self.already_download_ids_set.add(it)
# self.app_data["already_download_ids"] = list(self.already_download_ids_set) self.app_data["already_download_ids"] = list(self.already_download_ids_set)
if immediate: if immediate:
with open(self.config_file, "w", encoding="utf-8") as yaml_file: with open(self.config_file, "w", encoding="utf-8") as yaml_file:
@ -639,7 +639,6 @@ class Application:
def load_config(self): def load_config(self):
"""Load user config""" """Load user config"""
loguru.logger.info(os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader", self.config_file))
with open( with open(
os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader", self.config_file), encoding="utf-8" os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader", self.config_file), encoding="utf-8"
) as f: ) as f:

View File

@ -1,25 +1,36 @@
import os import os
from urllib.parse import urlparse
from loguru import logger
from pyrogram import Client
from ruamel.yaml import YAML from ruamel.yaml import YAML
from src.parsers.Telegram.telegram_media_downloader.media_downloader import main, _check_config from src.exceptions.download_exceptions import FileAlreadyExistException
from src.parsers.Telegram.telegram_media_downloader.media_downloader import _check_config, app, download_all_chat, \
worker
from src.parsers.base_parser import BaseParser from src.parsers.base_parser import BaseParser
class TelegramParser(BaseParser): class TelegramParser(BaseParser):
def video_download(self): def video_download(self, client: Client = None):
message_id = self.params["link"][self.params["link"].rfind("/") + 1:] url_parse_result = urlparse(self.params["link"])
with open( channel, message_id = url_parse_result.path[1:].split('/')
os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader/config.yaml"), if os.path.exists(os.path.join(os.getcwd() + f"/downloads/Telegram/{message_id}.mp4")):
mode="r+", encoding="utf-8" raise FileAlreadyExistException(message=f"Telegram/{message_id}.mp4")
) as f: with open(os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader/config.yaml"),
mode="r+", encoding="utf-8") as f:
config = YAML().load(f.read()) config = YAML().load(f.read())
config["chat"][0]['download_filter'] = f"id >= {message_id} && id < {int(message_id) + 1}" config["chat"][0]['download_filter'] = f"id >= {message_id} && id < {int(message_id) + 1}"
config["chat"][0]['chat_id'] = channel
config["chat"][0]['last_read_message_id'] = int(message_id) config["chat"][0]['last_read_message_id'] = int(message_id)
with open(
os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader/config.yaml"), with open(os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader/config.yaml"),
mode="w+", encoding="utf-8" mode="w+", encoding="utf-8") as f:
) as f:
YAML().dump(config, f) YAML().dump(config, f)
if _check_config(): if _check_config():
main() app.loop.run_until_complete(download_all_chat(client))
app.loop.run_until_complete(worker(client))
client.stop()
app.is_running = False
logger.info("Stopped!")
return f"Telegram/{message_id}.mp4"