refactored tg parser

This commit is contained in:
garickbadalov 2023-10-14 03:08:13 +03:00
parent cd1026c807
commit fce408310a
5 changed files with 67 additions and 20 deletions

View File

@ -1,10 +1,12 @@
import asyncio
import concurrent.futures as pool
import subprocess
import pyrogram
import traceback
from functools import partial
import traceback
from urllib.parse import urlparse
from loguru import logger
from src.core.async_queue import AsyncQueue
from src.core.rabbitmq import get_messages, publish_message_with_task_done
@ -12,16 +14,16 @@ from src.core.redis_client import RedisClient
from src.core.result import Result, ResultTypeEnum
from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException
from src.parsers.MyMail.my_mail_parser import MyMailParser
from src.parsers.Telegram.telegram_media_downloader.media_downloader import app, _check_config
from src.parsers.Yappy.yappy_parser import YappyParser
from src.parsers.base_parser import BaseParser
from loguru import logger
from src.parsers.parser_mapping import get_parser
class MasterService:
def __init__(self):
self.loop = asyncio.get_event_loop()
# self.tg_client =
self.MAX_EXECUTOR_WORKERS = 8
self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS,
initializer=executor_initializer)
@ -65,7 +67,18 @@ class MasterService:
@staticmethod
def video_download(video_params: dict):
downloader: BaseParser | YappyParser | MyMailParser = MasterService.get_parser(video_params)
result = downloader.video_download()
if _check_config():
tg_client = pyrogram.Client(
"media_downloader",
api_id=app.api_id,
api_hash=app.api_hash,
proxy=app.proxy,
workdir=app.session_file_path,
)
app.pre_run()
app.is_running = True
tg_client.start()
result = downloader.video_download(tg_client)
return result
@staticmethod

View File

@ -0,0 +1,22 @@
api_hash: cb06da2bf01e15627434223242b6446d
api_id: 21648766
chat:
- chat_id: landigos
download_filter: id >= 6949 && id < 6950
last_read_message_id: 6949
file_formats:
video:
- all
media_types:
- video
# in linux please use /
# save_path: E:\github\telegram_media_downloader
disable_syslog: []
save_path: /Users/garickbadalov/PycharmProjects/video_downloader_service/downloads/Telegram/
language: RU
web_host: 0.0.0.0
web_port: 51256
file_path_prefix:
- chat_id
file_name_prefix:
- message_id

View File

@ -486,8 +486,10 @@ async def worker(client: pyrogram.client.Client):
if node.client:
await download_task(node.client, message, node)
app.is_running = False
else:
await download_task(client, message, node)
app.is_running = False
except Exception as e:
logger.exception(f"{e}")

View File

@ -619,10 +619,10 @@ class Application:
self.config.pop("last_read_message_id")
self.config["language"] = self.language.name
# for it in self.downloaded_ids:
# self.already_download_ids_set.add(it)
for it in self.downloaded_ids:
self.already_download_ids_set.add(it)
# self.app_data["already_download_ids"] = list(self.already_download_ids_set)
self.app_data["already_download_ids"] = list(self.already_download_ids_set)
if immediate:
with open(self.config_file, "w", encoding="utf-8") as yaml_file:
@ -639,7 +639,6 @@ class Application:
def load_config(self):
"""Load user config"""
loguru.logger.info(os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader", self.config_file))
with open(
os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader", self.config_file), encoding="utf-8"
) as f:

View File

@ -1,25 +1,36 @@
import os
from urllib.parse import urlparse
from loguru import logger
from pyrogram import Client
from ruamel.yaml import YAML
from src.parsers.Telegram.telegram_media_downloader.media_downloader import main, _check_config
from src.exceptions.download_exceptions import FileAlreadyExistException
from src.parsers.Telegram.telegram_media_downloader.media_downloader import _check_config, app, download_all_chat, \
worker
from src.parsers.base_parser import BaseParser
class TelegramParser(BaseParser):
def video_download(self):
message_id = self.params["link"][self.params["link"].rfind("/") + 1:]
with open(
os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader/config.yaml"),
mode="r+", encoding="utf-8"
) as f:
def video_download(self, client: Client = None):
url_parse_result = urlparse(self.params["link"])
channel, message_id = url_parse_result.path[1:].split('/')
if os.path.exists(os.path.join(os.getcwd() + f"/downloads/Telegram/{message_id}.mp4")):
raise FileAlreadyExistException(message=f"Telegram/{message_id}.mp4")
with open(os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader/config.yaml"),
mode="r+", encoding="utf-8") as f:
config = YAML().load(f.read())
config["chat"][0]['download_filter'] = f"id >= {message_id} && id < {int(message_id) + 1}"
config["chat"][0]['chat_id'] = channel
config["chat"][0]['last_read_message_id'] = int(message_id)
with open(
os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader/config.yaml"),
mode="w+", encoding="utf-8"
) as f:
with open(os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader/config.yaml"),
mode="w+", encoding="utf-8") as f:
YAML().dump(config, f)
if _check_config():
main()
app.loop.run_until_complete(download_all_chat(client))
app.loop.run_until_complete(worker(client))
client.stop()
app.is_running = False
logger.info("Stopped!")
return f"Telegram/{message_id}.mp4"