refactored tg parser
This commit is contained in:
		| @@ -1,10 +1,12 @@ | |||||||
| import asyncio | import asyncio | ||||||
| import concurrent.futures as pool | import concurrent.futures as pool | ||||||
| import subprocess | import subprocess | ||||||
|  | import pyrogram | ||||||
|  | import traceback | ||||||
|  |  | ||||||
| from functools import partial | from functools import partial | ||||||
| import traceback |  | ||||||
| from urllib.parse import urlparse | from urllib.parse import urlparse | ||||||
|  | from loguru import logger | ||||||
|  |  | ||||||
| from src.core.async_queue import AsyncQueue | from src.core.async_queue import AsyncQueue | ||||||
| from src.core.rabbitmq import get_messages, publish_message_with_task_done | from src.core.rabbitmq import get_messages, publish_message_with_task_done | ||||||
| @@ -12,16 +14,16 @@ from src.core.redis_client import RedisClient | |||||||
| from src.core.result import Result, ResultTypeEnum | from src.core.result import Result, ResultTypeEnum | ||||||
| from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException | from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException | ||||||
| from src.parsers.MyMail.my_mail_parser import MyMailParser | from src.parsers.MyMail.my_mail_parser import MyMailParser | ||||||
|  | from src.parsers.Telegram.telegram_media_downloader.media_downloader import app, _check_config | ||||||
| from src.parsers.Yappy.yappy_parser import YappyParser | from src.parsers.Yappy.yappy_parser import YappyParser | ||||||
| from src.parsers.base_parser import BaseParser | from src.parsers.base_parser import BaseParser | ||||||
| from loguru import logger |  | ||||||
| from src.parsers.parser_mapping import get_parser | from src.parsers.parser_mapping import get_parser | ||||||
|  |  | ||||||
|  |  | ||||||
| class MasterService: | class MasterService: | ||||||
|     def __init__(self): |     def __init__(self): | ||||||
|         self.loop = asyncio.get_event_loop() |         self.loop = asyncio.get_event_loop() | ||||||
|         # self.tg_client = |  | ||||||
|         self.MAX_EXECUTOR_WORKERS = 8 |         self.MAX_EXECUTOR_WORKERS = 8 | ||||||
|         self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS, |         self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS, | ||||||
|                                                  initializer=executor_initializer) |                                                  initializer=executor_initializer) | ||||||
| @@ -65,7 +67,18 @@ class MasterService: | |||||||
|     @staticmethod |     @staticmethod | ||||||
|     def video_download(video_params: dict): |     def video_download(video_params: dict): | ||||||
|         downloader: BaseParser | YappyParser | MyMailParser = MasterService.get_parser(video_params) |         downloader: BaseParser | YappyParser | MyMailParser = MasterService.get_parser(video_params) | ||||||
|         result = downloader.video_download() |         if _check_config(): | ||||||
|  |             tg_client = pyrogram.Client( | ||||||
|  |                 "media_downloader", | ||||||
|  |                 api_id=app.api_id, | ||||||
|  |                 api_hash=app.api_hash, | ||||||
|  |                 proxy=app.proxy, | ||||||
|  |                 workdir=app.session_file_path, | ||||||
|  |             ) | ||||||
|  |             app.pre_run() | ||||||
|  |             app.is_running = True | ||||||
|  |             tg_client.start() | ||||||
|  |         result = downloader.video_download(tg_client) | ||||||
|         return result |         return result | ||||||
|  |  | ||||||
|     @staticmethod |     @staticmethod | ||||||
|   | |||||||
							
								
								
									
										22
									
								
								src/parsers/Telegram/telegram_media_downloader/config.yaml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										22
									
								
								src/parsers/Telegram/telegram_media_downloader/config.yaml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,22 @@ | |||||||
|  | api_hash: cb06da2bf01e15627434223242b6446d | ||||||
|  | api_id: 21648766 | ||||||
|  | chat: | ||||||
|  | - chat_id: landigos | ||||||
|  |   download_filter: id >= 6949 && id < 6950 | ||||||
|  |   last_read_message_id: 6949 | ||||||
|  | file_formats: | ||||||
|  |   video: | ||||||
|  |   - all | ||||||
|  | media_types: | ||||||
|  | - video | ||||||
|  | # in linux please use / | ||||||
|  | # save_path: E:\github\telegram_media_downloader | ||||||
|  | disable_syslog: [] | ||||||
|  | save_path: /Users/garickbadalov/PycharmProjects/video_downloader_service/downloads/Telegram/ | ||||||
|  | language: RU | ||||||
|  | web_host: 0.0.0.0 | ||||||
|  | web_port: 51256 | ||||||
|  | file_path_prefix: | ||||||
|  | - chat_id | ||||||
|  | file_name_prefix: | ||||||
|  | - message_id | ||||||
| @@ -486,8 +486,10 @@ async def worker(client: pyrogram.client.Client): | |||||||
|  |  | ||||||
|             if node.client: |             if node.client: | ||||||
|                 await download_task(node.client, message, node) |                 await download_task(node.client, message, node) | ||||||
|  |                 app.is_running = False | ||||||
|             else: |             else: | ||||||
|                 await download_task(client, message, node) |                 await download_task(client, message, node) | ||||||
|  |                 app.is_running = False | ||||||
|         except Exception as e: |         except Exception as e: | ||||||
|             logger.exception(f"{e}") |             logger.exception(f"{e}") | ||||||
|  |  | ||||||
|   | |||||||
| @@ -619,10 +619,10 @@ class Application: | |||||||
|             self.config.pop("last_read_message_id") |             self.config.pop("last_read_message_id") | ||||||
|  |  | ||||||
|         self.config["language"] = self.language.name |         self.config["language"] = self.language.name | ||||||
|         # for it in self.downloaded_ids: |         for it in self.downloaded_ids: | ||||||
|         #    self.already_download_ids_set.add(it) |            self.already_download_ids_set.add(it) | ||||||
|  |  | ||||||
|         # self.app_data["already_download_ids"] = list(self.already_download_ids_set) |         self.app_data["already_download_ids"] = list(self.already_download_ids_set) | ||||||
|  |  | ||||||
|         if immediate: |         if immediate: | ||||||
|             with open(self.config_file, "w", encoding="utf-8") as yaml_file: |             with open(self.config_file, "w", encoding="utf-8") as yaml_file: | ||||||
| @@ -639,7 +639,6 @@ class Application: | |||||||
|  |  | ||||||
|     def load_config(self): |     def load_config(self): | ||||||
|         """Load user config""" |         """Load user config""" | ||||||
|         loguru.logger.info(os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader", self.config_file)) |  | ||||||
|         with open( |         with open( | ||||||
|             os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader", self.config_file), encoding="utf-8" |             os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader", self.config_file), encoding="utf-8" | ||||||
|         ) as f: |         ) as f: | ||||||
|   | |||||||
| @@ -1,25 +1,36 @@ | |||||||
| import os | import os | ||||||
|  | from urllib.parse import urlparse | ||||||
|  |  | ||||||
|  | from loguru import logger | ||||||
|  | from pyrogram import Client | ||||||
| from ruamel.yaml import YAML | from ruamel.yaml import YAML | ||||||
|  |  | ||||||
| from src.parsers.Telegram.telegram_media_downloader.media_downloader import main, _check_config | from src.exceptions.download_exceptions import FileAlreadyExistException | ||||||
|  | from src.parsers.Telegram.telegram_media_downloader.media_downloader import _check_config, app, download_all_chat, \ | ||||||
|  |     worker | ||||||
| from src.parsers.base_parser import BaseParser | from src.parsers.base_parser import BaseParser | ||||||
|  |  | ||||||
|  |  | ||||||
| class TelegramParser(BaseParser): | class TelegramParser(BaseParser): | ||||||
|     def video_download(self): |     def video_download(self, client: Client = None): | ||||||
|         message_id = self.params["link"][self.params["link"].rfind("/") + 1:] |         url_parse_result = urlparse(self.params["link"]) | ||||||
|         with open( |         channel, message_id = url_parse_result.path[1:].split('/') | ||||||
|                 os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader/config.yaml"), |         if os.path.exists(os.path.join(os.getcwd() + f"/downloads/Telegram/{message_id}.mp4")): | ||||||
|                 mode="r+", encoding="utf-8" |             raise FileAlreadyExistException(message=f"Telegram/{message_id}.mp4") | ||||||
|         ) as f: |         with open(os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader/config.yaml"), | ||||||
|  |                   mode="r+", encoding="utf-8") as f: | ||||||
|             config = YAML().load(f.read()) |             config = YAML().load(f.read()) | ||||||
|             config["chat"][0]['download_filter'] = f"id >= {message_id} && id < {int(message_id) + 1}" |             config["chat"][0]['download_filter'] = f"id >= {message_id} && id < {int(message_id) + 1}" | ||||||
|  |             config["chat"][0]['chat_id'] = channel | ||||||
|             config["chat"][0]['last_read_message_id'] = int(message_id) |             config["chat"][0]['last_read_message_id'] = int(message_id) | ||||||
|         with open( |  | ||||||
|                 os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader/config.yaml"), |         with open(os.path.join(os.path.abspath(""), "src/parsers/Telegram/telegram_media_downloader/config.yaml"), | ||||||
|                 mode="w+", encoding="utf-8" |                   mode="w+", encoding="utf-8") as f: | ||||||
|         ) as f: |  | ||||||
|             YAML().dump(config, f) |             YAML().dump(config, f) | ||||||
|         if _check_config(): |         if _check_config(): | ||||||
|             main() |             app.loop.run_until_complete(download_all_chat(client)) | ||||||
|  |             app.loop.run_until_complete(worker(client)) | ||||||
|  |             client.stop() | ||||||
|  |             app.is_running = False | ||||||
|  |             logger.info("Stopped!") | ||||||
|  |         return f"Telegram/{message_id}.mp4" | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 nikili0n
					nikili0n