Compare commits

..

No commits in common. "main" and "feature/tg_parser" have entirely different histories.

31 changed files with 169 additions and 504 deletions

View File

@ -1,17 +0,0 @@
FROM python:3.11.4
WORKDIR /app
COPY poetry.lock pyproject.toml /app/
RUN apt-get -y update
RUN apt-get -y upgrade
RUN apt-get install -y ffmpeg
RUN pip install poetry
RUN poetry install --no-root
COPY .. /app
CMD poetry run python main.py

View File

@ -1,60 +1,19 @@
version: "2.1"
networks:
network:
services:
web_service:
container_name: web_service
build:
context: .
dockerfile: web.Dockerfile
ports:
- "8000:8000"
depends_on:
redis:
condition: service_started
rabbitmq:
condition: service_healthy
restart: always
networks:
- network
download_service:
container_name: download_service
build:
context: .
dockerfile: Dockerfile
depends_on:
redis:
condition: service_started
rabbitmq:
condition: service_healthy
restart: always
networks:
- network
rabbitmq:
container_name: rabbitmq
image: rabbitmq:3.10.7-management
hostname: rabbitmq
restart: always
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 30s
timeout: 30s
retries: 3
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
volumes:
- ./rabbitmq:/var/lib/rabbitmq
ports:
- "15672:15672"
- "5672:5672"
networks:
- network
- 15672:15672
- 5672:5672
redis:
container_name: redis_video_downloader
image: redis:latest
ports:
- "6379:6379"
networks:
- network

28
main.py
View File

@ -1,32 +1,8 @@
import asyncio
import json
from typing import Any
from multiprocessing import freeze_support
from src.core.master_service import MasterService
from loguru import logger
def json_logs(message: Any) -> None:
record = message.record
data = {
"timestamp": record["time"].strftime("%d.%m.%y %H.%M.%S %Z%z"),
"level": record["level"].name,
"message": record["message"],
"path": record["file"].path,
"function": record["function"],
"line": record["line"],
}
print(json.dumps(data))
logger.remove(0)
logger.add(json_logs)
if __name__ == '__main__':
freeze_support()
loop = asyncio.new_event_loop()
ms = MasterService(loop)
ms.loop.run_until_complete(ms.run())
ms = MasterService()
ms.loop.run_until_complete(ms.run())

16
poetry.lock generated
View File

@ -1680,20 +1680,6 @@ files = [
[package.dependencies]
six = ">=1.5"
[[package]]
name = "python-dotenv"
version = "1.0.0"
description = "Read key-value pairs from a .env file and set them as environment variables"
optional = false
python-versions = ">=3.8"
files = [
{file = "python-dotenv-1.0.0.tar.gz", hash = "sha256:a8df96034aae6d2d50a4ebe8216326c61c3eb64836776504fcca410e5937a3ba"},
{file = "python_dotenv-1.0.0-py3-none-any.whl", hash = "sha256:f5971a9226b701070a4bf2c38c89e5a3f0d64de8debda981d1db98583009122a"},
]
[package.extras]
cli = ["click (>=5.0)"]
[[package]]
name = "python-multipart"
version = "0.0.6"
@ -2341,4 +2327,4 @@ websockets = "*"
[metadata]
lock-version = "2.0"
python-versions = "^3.11"
content-hash = "b7973dc522b312b75a798bc966c5001b24e134cccd332de7b978e5a1ec495b57"
content-hash = "272fe31fba150b0b0fcca1b7d60f706dc2a05ea730ef19e34ccb8e5524f47d66"

View File

@ -36,7 +36,6 @@ ply = "3.11"
ruamel-yaml = "0.17.21"
flask-login = "0.6.2"
pycryptodome = "3.18.0"
python-dotenv = "^1.0.0"
[build-system]

View File

@ -1,80 +0,0 @@
aio-pika==9.2.2
aiofiles==23.1.0
aiogram==3.0.0
aiohttp==3.8.5
aiormq==6.7.7
aiosignal==1.3.1
annotated-types==0.5.0
anyio==3.7.1
async-timeout==4.0.3
attrs==23.1.0
beautifulsoup4==4.12.2
boto3==1.28.36
botocore==1.31.36
Brotli==1.0.9
certifi==2023.7.22
charset-normalizer==3.2.0
click==8.1.6
commonmark==0.9.1
fastapi==0.101.0
ffmpeg==1.4
ffprobe==0.5
Flask==2.2.2
Flask-Login==0.6.2
frozenlist==1.4.0
greenlet==2.0.2
h11==0.14.0
idna==3.4
iniconfig==2.0.0
itsdangerous==2.1.2
Jinja2==3.1.2
jmespath==1.0.1
loguru==0.6.0
lxml==4.9.3
magic-filter==1.0.11
MarkupSafe==2.1.3
minio==7.1.16
multidict==6.0.4
mutagen==1.46.0
packaging==23.1
pamqp==3.2.1
pika==1.3.2
playwright==1.37.0
pluggy==1.3.0
ply==3.11
pyaes==1.6.1
pycryptodome==3.18.0
pycryptodomex==3.18.0
pydantic==2.3.0
pydantic_core==2.6.3
pyee==9.0.4
Pygments==2.16.1
Pyrogram @ https://github.com/Dineshkarthik/pyrogram/archive/refs/heads/master.zip
PySocks==1.7.1
pytest==7.4.0
pytest-base-url==2.0.0
pytest-playwright==0.4.2
python-dateutil==2.8.2
python-multipart==0.0.6
python-slugify==8.0.1
PyYAML==5.3.1
redis==5.0.0
requests==2.31.0
rich==12.5.1
ruamel.yaml==0.17.21
s3transfer==0.6.2
setproctitle==1.3.2
six==1.16.0
sniffio==1.3.0
soupsieve==2.4.1
starlette==0.27.0
text-unidecode==1.3
TgCrypto==1.2.5
typing_extensions==4.7.1
urllib3==1.26.16
uvicorn==0.23.2
websockets==11.0.3
Werkzeug==2.2.2
yarl==1.9.2
youtube-dl @ git+https://github.com/ytdl-org/youtube-dl.git@86e3cf5e5849aefcc540c19bb5fa5ab7f470d1c1
yt-dlp==2023.7.6

View File

@ -1,13 +0,0 @@
import os
from dotenv import load_dotenv
load_dotenv()
S3_HOST = os.environ.get("S3_HOST", "s3-api.grfc.ru")
S3_ACCESS_KEY = os.environ.get("S3_ACCESS_KEY", "cl-i-oculus-dev1")
S3_SECRET_KEY = os.environ.get("S3_SECRET_KEY", "Nom8qKEU6IYtQSrNt5ZPN1XncQTZdtUM")
S3_BUCKET_NAME = os.environ.get("S3_BUCKET_NAME", "clean-internet-oculus-integration-dev")
DEFAULT_DURATION = os.environ.get("DEFAULT_DURATION", 600)

View File

@ -1,7 +1,6 @@
import asyncio
import json
from loguru import logger
from playwright.async_api import async_playwright
from playwright.async_api import Playwright
from aio_pika import Message, connect, DeliveryMode
@ -39,13 +38,12 @@ async def run(playwright: Playwright):
routing_key='hello',
)
logger.info(f" [x] Sent '{body}'")
print(f" [x] Sent '{body}'")
await page.keyboard.press("ArrowDown")
while title == await page.title():
await page.title()
async def main():
async with async_playwright() as playwright:
await run(playwright)

View File

@ -1,8 +1,7 @@
import asyncio
import concurrent.futures as pool
import json
import os
import subprocess
import pyrogram
import traceback
from functools import partial
@ -13,9 +12,9 @@ from src.core.async_queue import AsyncQueue
from src.core.rabbitmq import get_messages, publish_message_with_task_done
from src.core.redis_client import RedisClient
from src.core.result import Result, ResultTypeEnum
from src.core.s3_client import S3Client
from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException
from src.parsers.MyMail.my_mail_parser import MyMailParser
from src.parsers.Telegram.telegram_media_downloader.media_downloader import app, _check_config
from src.parsers.Yappy.yappy_parser import YappyParser
from src.parsers.base_parser import BaseParser
from src.parsers.parser_mapping import get_parser
@ -23,8 +22,8 @@ from src.parsers.Telegram.telegram_media_downloader.telegram_parser import Teleg
class MasterService:
def __init__(self, loop):
self.loop = loop
def __init__(self):
self.loop = asyncio.get_event_loop()
self.MAX_EXECUTOR_WORKERS = 8
self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS,
@ -38,24 +37,12 @@ class MasterService:
"for pid in $(ps -ef | grep video_downloader_executor_process | awk '{print $2}'); do kill -9 $pid; done",
shell=True, capture_output=True
)
# TODO Возможно бросать ошибку если упал мастер сервис.
redis = RedisClient()
messages = await redis.get_all_tasks_from_queue(redis.TASKS_NAME)
if messages:
messages = {k.decode("utf-8"): json.loads(v.decode("utf-8")) for k, v in
messages.items()}
for params in list(messages.values()):
await self.queue.put(params)
await redis.del_tasks_queue()
tasks = [self.loop.create_task(self.create_workers()) for i in range(self.MAX_EXECUTOR_WORKERS + 1)]
await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
async def result_processing(self, result: Result | list, redis: RedisClient, s3_client: S3Client, video_params: dict):
file_path = os.path.join(os.getcwd() + "/downloads/")
links_to_download = s3_client.upload(file_name=result.value["result"], file_path=file_path)
result.value["result"] = links_to_download
async def result_processing(self, result: Result | list, redis: RedisClient, video_params: dict):
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, link=video_params["link"])
await publish_message_with_task_done(task=result.value)
self.queue.task_done()
@ -64,7 +51,7 @@ class MasterService:
while True:
video_params = await self.queue.get()
redis = RedisClient()
s3_client = S3Client()
await redis.del_tasks_queue()
await redis.del_task_from_queue_and_add_to_tasks(link=video_params["link"], task=video_params)
self.currently_underway[video_params['link']] = video_params
@ -73,7 +60,7 @@ class MasterService:
))
result: Result = await download_task
await self.result_processing(result, redis, s3_client, video_params)
await self.result_processing(result, redis, video_params)
if video_params['link'] in self.currently_underway:
del self.currently_underway[video_params['link']]
@ -83,20 +70,44 @@ class MasterService:
downloader: BaseParser | YappyParser | MyMailParser | TelegramParser = MasterService.get_parser(video_params)
match downloader:
case TelegramParser():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(downloader.video_download())
return result
if _check_config():
tg_client = pyrogram.Client(
"media_downloader",
api_id=app.api_id,
api_hash=app.api_hash,
proxy=app.proxy,
workdir=app.session_file_path,
)
app.pre_run()
app.is_running = True
tg_client.start()
result = downloader.video_download(client=tg_client)
return result
case _:
result = downloader.video_download()
return result
@staticmethod
def get_parser(params: dict):
try:
url_parse_result = urlparse(params["link"])
uri = f"{url_parse_result.netloc}{url_parse_result.path}"
logger.info(uri)
# # TODO: похоже нужно переделать на регулярки, т.к. добавлять каждую вариацию домена моветон, вероятно я сделаюне-
# parser_mapping = {
# "my.mail.ru": MyMailParser(params),
# "www.youtube.com": BaseParser(params),
# "youtube.com": BaseParser(params),
# "youtu.be": BaseParser(params),
# "vk.com": BaseParser(params),
# "ok.ru": BaseParser(params) if "topic" not in params["link"] else OkParser(params),
# "likee.video": BaseParser(params),
# "dzen.ru": BaseParser(params),
# "yappy.media": YappyParser(params),
# "yandex.ru": BaseParser(params),
# }
return get_parser(uri)(params)
except KeyError:
raise SiteNotImplementedException
@ -129,7 +140,6 @@ class MasterService:
"status": "error"
})
except Exception as ex:
logger.error(traceback.format_exc())
return Result(result_type=ResultTypeEnum.EXCEPTION, value={
"link": video_params["link"],
"result": traceback.format_exc(),

View File

@ -4,17 +4,16 @@ from functools import partial
from aio_pika import connect, Message, DeliveryMode
from aio_pika.abc import AbstractIncomingMessage
from loguru import logger
async def on_message(message: AbstractIncomingMessage, queue) -> None:
async with message.process():
await queue.put(json.loads(message.body))
logger.info(f" Message body is: {message.body!r}")
print(f" Message body is: {message.body!r}")
async def get_messages(inner_queue) -> None:
async with await connect("amqp://guest:guest@rabbitmq/") as connection:
async with await connect("amqp://guest:guest@localhost/") as connection:
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
@ -24,13 +23,14 @@ async def get_messages(inner_queue) -> None:
)
await queue.consume(partial(on_message, queue=inner_queue))
logger.info("[*] Waiting for messages. To exit press CTRL+C")
print(" [*] Waiting for messages. To exit press CTRL+C")
await asyncio.Future()
async def publish_message_with_task_done(task: dict | list) -> None:
queue_name = "tasks_done"
async with await connect("amqp://guest:guest@rabbitmq/") as connection:
async with await connect("amqp://guest:guest@localhost/") as connection:
# Creating channel
channel = await connection.channel()

View File

@ -9,7 +9,7 @@ class RedisClient:
TASKS_DONE_NAME = "tasks_done"
def __init__(self):
self.connection = redis.Redis(host="redis_video_downloader", port=6379, db=0)
self.connection = redis.Redis(host="localhost", port=6379, db=0)
async def _set_task(self, queue_name: str, link: str, task: dict | list, ) -> int:
async with self.connection as connection:
@ -46,7 +46,3 @@ class RedisClient:
async with self.connection as connection:
res = await connection.delete(self.TASKS_NAME)
return res
async def update_task_in_tasks_done(self, task: dict | list, link: str) -> int:
await self._del_task(self.TASKS_DONE_NAME, link)
return await self._set_task(self.TASKS_DONE_NAME, link, task)

View File

@ -1,66 +0,0 @@
from loguru import logger
from minio import Minio
from minio.commonconfig import CopySource
from src.core.config import S3_HOST, S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET_NAME
class S3Client:
HOST = S3_HOST
ACCESS_KEY = S3_ACCESS_KEY
SECRET_KEY = S3_SECRET_KEY
BUCKET_NAME = S3_BUCKET_NAME
def __init__(self):
self.client = Minio(
self.HOST,
access_key=self.ACCESS_KEY,
secret_key=self.SECRET_KEY,
secure=True
)
def _make_sure_bucket_exist(self):
found = self.client.bucket_exists(self.BUCKET_NAME)
if not found:
self.client.make_bucket(self.BUCKET_NAME)
else:
logger.info(f"Bucket {self.BUCKET_NAME} already exists")
def upload(self, file_name: str, file_path: str | list[str]):
self._make_sure_bucket_exist()
if isinstance(file_name, str):
file_path = file_path + file_name
self.client.fput_object(self.BUCKET_NAME, file_name, file_path)
logger.info(f"{file_path} is successfully uploaded as object {file_name} to bucket {self.BUCKET_NAME}.")
link_to_download = self.get(file_name)
return link_to_download
else:
result = []
for file_name_part in file_name:
current_file_path = file_path + file_name_part
self.client.fput_object(self.BUCKET_NAME, file_name_part, current_file_path)
logger.info(f"{current_file_path} is successfully uploaded as object {file_name_part} to bucket {self.BUCKET_NAME}.")
link_to_download = self.get(file_name_part)
result.append(link_to_download)
return result
def get(self, file_name: str):
self._make_sure_bucket_exist()
result = self.client.get_presigned_url(
"GET",
self.BUCKET_NAME,
file_name,
)
return result
def delete(self, file_name: str):
result = self.client.remove_object(self.BUCKET_NAME, file_name)
return result
def copy_to_another_bucket(self, file_name: str, bucket_name):
result = self.client.copy_object(
bucket_name,
file_name,
CopySource(self.BUCKET_NAME, file_name),
)
return result

32
src/core/uploader.py Normal file
View File

@ -0,0 +1,32 @@
from minio import Minio
from minio.error import S3Error
def main():
client = Minio(
"grfc.ru",
access_key="cl-i-oculus-dev1",
secret_key="Nom8qKEU6IYtQSrNt5ZPN1XncQTZdtUM",
secure=True
)
found = client.bucket_exists("clean-internet-oculus-integration-dev")
if not found:
client.make_bucket("clean-internet-oculus-integration-dev")
else:
print("Bucket 'clean-internet-oculus-integration-dev' already exists")
client.fput_object(
"clean-internet-oculus-integration-dev", "4uv2GNc_ybc_1080p.mp4", "/Users/garickbadalov/PycharmProjects/video_downloader_service/downloads/Youtube/4uv2GNc_ybc_1080p.mp4",
)
print(
"'/Users/garickbadalov/PycharmProjects/video_downloader_service/downloads/Youtube/4uv2GNc_ybc_1080p.mp4' is successfully uploaded as "
"object '4uv2GNc_ybc_1080p.mp4' to bucket 'clean-internet-oculus-integration-dev'."
)
if __name__ == "__main__":
try:
main()
except S3Error as exc:
print("error occurred.", exc)

View File

@ -36,4 +36,4 @@ class DzenParser(BaseParser):
self.params["outtmpl"] = f"downloads/ZenYandex/{title}_%(resolution)s.%(ext)s"
file_path = super().video_download()
self.params["link"] = base_link
return file_path.replace("master", title) if "master" in file_path else file_path
return file_path

View File

@ -1,10 +1,8 @@
import os
import re
import requests
from bs4 import BeautifulSoup
from lxml import etree
from src.exceptions.download_exceptions import FileAlreadyExistException
from src.parsers.base_parser import BaseParser
@ -18,16 +16,9 @@ class OkParser(BaseParser):
resp = requests.get(self.params["link"])
resp.encoding = self.BASE_ENCODING
soup = BeautifulSoup(resp.text, 'lxml')
if "topic" in self.params["link"]:
dom = etree.HTML(str(soup))
elements_with_video_id = dom.xpath(
"//div[@class='mlr_cnt']/div[contains(@data-l, 'gA,VIDEO,mB,movie,ti,')]/div[@class='vid-card "
"vid-card__xl']/div[@class='video-card_n-w']/a[contains(@onclick, 'OK.VideoPlayer.openMovie')]")
links = ["https://ok.ru/video/" + re.findall('\d+', elem.get("onclick"))[0] for elem in elements_with_video_id]
else:
required_div = [div for div in soup.find_all('div', {'class': 'invisible'}) if len(div['class']) < 2][0]
video_tags = required_div.find('span').find_all_next('span', {'itemprop': "video"})
links = [video_tag.find('a').get("href") for video_tag in video_tags]
required_div = [div for div in soup.find_all('div', {'class': 'invisible'}) if len(div['class']) < 2][0]
video_tags = required_div.find('span').find_all_next('span', {'itemprop': "video"})
links = [video_tag.find('a').get("href") for video_tag in video_tags]
return links
except Exception as ex:
raise

View File

@ -1,9 +1,9 @@
api_hash: cb06da2bf01e15627434223242b6446d
api_id: 21648766
chat:
- chat_id: dvachannel
download_filter: id == 125493
last_read_message_id: 125493
- chat_id: -1001966291562
download_filter: id == 2048
last_read_message_id: 2048
file_formats:
video:
- all

View File

@ -1,3 +1,3 @@
from src.parsers.Telegram.telegram_media_downloader.module.filter import Filter
from module.filter import Filter
Filter()

View File

@ -11,12 +11,12 @@ from loguru import logger
from pyrogram.types import Audio, Document, Photo, Video, VideoNote, Voice
from rich.logging import RichHandler
from src.parsers.Telegram.telegram_media_downloader.module.app import Application, ChatDownloadConfig, DownloadStatus, TaskNode
from src.parsers.Telegram.telegram_media_downloader.module.bot import start_download_bot, stop_download_bot
from src.parsers.Telegram.telegram_media_downloader.module.download_stat import update_download_status
from src.parsers.Telegram.telegram_media_downloader.module.get_chat_history_v2 import get_chat_history_v2
from src.parsers.Telegram.telegram_media_downloader.module.language import _t
from src.parsers.Telegram.telegram_media_downloader.module.pyrogram_extension import (
from module.app import Application, ChatDownloadConfig, DownloadStatus, TaskNode
from module.bot import start_download_bot, stop_download_bot
from module.download_stat import update_download_status
from module.get_chat_history_v2 import get_chat_history_v2
from module.language import _t
from module.pyrogram_extension import (
fetch_message,
get_extension,
record_download_status,
@ -25,12 +25,12 @@ from src.parsers.Telegram.telegram_media_downloader.module.pyrogram_extension im
set_meta_data,
upload_telegram_chat,
)
from src.parsers.Telegram.telegram_media_downloader.module.web import init_web
from src.parsers.Telegram.telegram_media_downloader.utils.format import truncate_filename, validate_title
from src.parsers.Telegram.telegram_media_downloader.utils.log import LogFilter
from src.parsers.Telegram.telegram_media_downloader.utils.meta import print_meta
from src.parsers.Telegram.telegram_media_downloader.utils.meta_data import MetaData
from src.parsers.Telegram.telegram_media_downloader.utils.updates import check_for_updates
from module.web import init_web
from utils.format import truncate_filename, validate_title
from utils.log import LogFilter
from utils.meta import print_meta
from utils.meta_data import MetaData
from utils.updates import check_for_updates
logging.basicConfig(
level=logging.INFO,
@ -478,7 +478,6 @@ def _check_config() -> bool:
async def worker(client: pyrogram.client.Client):
"""Work for download task"""
# TODO: mb replace with asyncio.Event
while app.is_running:
try:
item = await queue.get()
@ -567,7 +566,7 @@ async def run_until_all_task_finish():
def _exec_loop():
"""Exec loop"""
# TODO: broken, no loop
if app.bot_token:
app.loop.run_forever()
else:
@ -592,7 +591,7 @@ def main():
client.start()
logger.success(_t("Successfully started (Press Ctrl+C to stop)"))
# TODO: broken
app.loop.create_task(download_all_chat(client))
for _ in range(app.max_download_task):
task = app.loop.create_task(worker(client))

View File

@ -10,11 +10,11 @@ from typing import List, Optional, Union
import loguru
from ruamel import yaml
from src.parsers.Telegram.telegram_media_downloader.module.cloud_drive import CloudDrive, CloudDriveConfig
from src.parsers.Telegram.telegram_media_downloader.module.filter import Filter
from src.parsers.Telegram.telegram_media_downloader.module.language import Language, set_language
from src.parsers.Telegram.telegram_media_downloader.utils.format import replace_date_time, validate_title
from src.parsers.Telegram.telegram_media_downloader.utils.meta_data import MetaData
from module.cloud_drive import CloudDrive, CloudDriveConfig
from module.filter import Filter
from module.language import Language, set_language
from utils.format import replace_date_time, validate_title
from utils.meta_data import MetaData
_yaml = yaml.YAML()
# pylint: disable = R0902
@ -227,6 +227,9 @@ class Application:
self.web_login_secret: str = ""
self.debug_web: bool = False
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.executor = ThreadPoolExecutor(
min(32, (os.cpu_count() or 0) + 4), thread_name_prefix="multi_task"
)
@ -444,8 +447,7 @@ class Application:
self.cloud_drive_config, self.save_path, local_file_path
)
elif self.cloud_drive_config.upload_adapter == "aligo":
loop = asyncio.get_running_loop()
ret = await loop.run_in_executor(
ret = await self.loop.run_in_executor(
self.executor,
CloudDrive.aligo_upload_file(
self.cloud_drive_config, self.save_path, local_file_path

View File

@ -11,25 +11,25 @@ from pyrogram import types
from pyrogram.handlers import MessageHandler
from ruamel import yaml
from src.parsers.Telegram.telegram_media_downloader import utils
from src.parsers.Telegram.telegram_media_downloader.module.app import (
import utils
from module.app import (
Application,
ChatDownloadConfig,
ForwardStatus,
TaskNode,
TaskType,
)
from src.parsers.Telegram.telegram_media_downloader.module.filter import Filter
from src.parsers.Telegram.telegram_media_downloader.module.language import Language, _t
from src.parsers.Telegram.telegram_media_downloader.module.pyrogram_extension import (
from module.filter import Filter
from module.language import Language, _t
from module.pyrogram_extension import (
check_user_permission,
get_message_with_retry,
report_bot_forward_status,
report_bot_status,
set_meta_data,
)
from src.parsers.Telegram.telegram_media_downloader.utils.format import extract_info_from_link, replace_date_time, validate_title
from src.parsers.Telegram.telegram_media_downloader.utils.meta_data import MetaData
from utils.format import extract_info_from_link, replace_date_time, validate_title
from utils.meta_data import MetaData
# from pyrogram.types import (ReplyKeyboardMarkup, InlineKeyboardMarkup,
# InlineKeyboardButton)
@ -232,10 +232,8 @@ class DownloadBot:
pass
# TODO: add admin
# self.bot.set_my_commands(commands, scope=types.BotCommandScopeChatAdministrators(self.app.))
# TODO: check for correctness
loop = asyncio.get_running_loop()
loop.create_task(_bot.update_reply_message())
# _bot.app.loop.create_task(_bot.update_reply_message())
_bot.app.loop.create_task(_bot.update_reply_message())
_bot = DownloadBot()

View File

@ -8,7 +8,7 @@ from zipfile import ZipFile
from loguru import logger
from src.parsers.Telegram.telegram_media_downloader.utils import platforms as platform
from utils import platform
# pylint: disable = R0902

View File

@ -6,8 +6,8 @@ from typing import Any, Optional, Tuple
from ply import lex, yacc
from src.parsers.Telegram.telegram_media_downloader.utils.format import get_byte_from_str
from src.parsers.Telegram.telegram_media_downloader.utils.meta_data import MetaData, NoneObj, ReString
from utils.format import get_byte_from_str
from utils.meta_data import MetaData, NoneObj, ReString
# pylint: disable = R0904

View File

@ -23,11 +23,11 @@ from pyrogram.file_id import (
)
from pyrogram.mime_types import mime_types
from src.parsers.Telegram.telegram_media_downloader.module.app import Application, DownloadStatus, ForwardStatus, TaskNode
from src.parsers.Telegram.telegram_media_downloader.module.download_stat import get_download_result
from src.parsers.Telegram.telegram_media_downloader.module.language import Language, _t
from src.parsers.Telegram.telegram_media_downloader.utils.format import create_progress_bar, format_byte, truncate_filename
from src.parsers.Telegram.telegram_media_downloader.utils.meta_data import MetaData
from module.app import Application, DownloadStatus, ForwardStatus, TaskNode
from module.download_stat import get_download_result
from module.language import Language, _t
from utils.format import create_progress_bar, format_byte, truncate_filename
from utils.meta_data import MetaData
_mimetypes = MimeTypes()
_mimetypes.readfp(StringIO(mime_types))

View File

@ -6,18 +6,18 @@ import threading
from flask import Flask, jsonify, render_template, request
from src.parsers.Telegram.telegram_media_downloader import utils
import utils
from flask_login import LoginManager, UserMixin, login_required, login_user
from src.parsers.Telegram.telegram_media_downloader.module.app import Application
from src.parsers.Telegram.telegram_media_downloader.module.download_stat import (
from module.app import Application
from module.download_stat import (
DownloadState,
get_download_result,
get_download_state,
get_total_download_speed,
set_download_state,
)
from src.parsers.Telegram.telegram_media_downloader.utils.crypto import AesBase64
from src.parsers.Telegram.telegram_media_downloader.utils.format import format_byte
from utils.crypto import AesBase64
from utils.format import format_byte
log = logging.getLogger("werkzeug")
log.setLevel(logging.ERROR)

View File

@ -2,7 +2,7 @@ import os
from urllib.parse import urlparse
from loguru import logger
import pyrogram
from pyrogram import Client
from ruamel.yaml import YAML
from src.exceptions.download_exceptions import FileAlreadyExistException
@ -12,7 +12,7 @@ from src.parsers.base_parser import BaseParser
class TelegramParser(BaseParser):
async def video_download(self):
def video_download(self, client: Client = None):
url_parse_result = urlparse(self.params["link"])
channel, message_id = url_parse_result.path[1:].split('/') if "/c/" not in url_parse_result.path else \
url_parse_result.path[3:].split('/')
@ -29,19 +29,9 @@ class TelegramParser(BaseParser):
mode="w+", encoding="utf-8") as f:
YAML().dump(config, f)
if _check_config():
tg_client = pyrogram.Client(
"media_downloader",
api_id=app.api_id,
api_hash=app.api_hash,
proxy=app.proxy,
workdir=app.session_file_path,
)
app.pre_run()
app.is_running = True
await tg_client.start()
await download_all_chat(tg_client)
await worker(tg_client)
await tg_client.stop()
app.loop.run_until_complete(download_all_chat(client))
app.loop.run_until_complete(worker(client))
client.stop()
app.is_running = False
logger.info("Stopped!")
return f"Telegram/{message_id}.mp4"

View File

@ -2,9 +2,7 @@ import errno
import os
from loguru import logger
from yt_dlp import download_range_func
from src.core.config import DEFAULT_DURATION
from src.core.ydl import VideoDownloader
from src.exceptions.download_exceptions import FileAlreadyExistException
@ -22,7 +20,6 @@ class BaseParser:
"logger": logger,
"merge_output_format": self.params["merge_output_format"],
'outtmpl': self.params["outtmpl"],
'download_ranges': download_range_func(None, [(0, int(DEFAULT_DURATION))]),
# "quiet": True
}
downloader = VideoDownloader(link=self.params["link"], ydl_opts=ydl_opts)
@ -32,19 +29,18 @@ class BaseParser:
resolution = downloader.info['resolution']
else:
resolution = "NA"
base_file_name = f"{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
if "Yahoo" in ydl_opts["outtmpl"]["default"]:
path_to_video = f"Yahoo/{base_file_name}"
path_to_video = f"Yahoo/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
elif "ZenYandex" in ydl_opts["outtmpl"]["default"]:
path_to_video = f"ZenYandex/{base_file_name}"
path_to_video = f"ZenYandex/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
elif "Bing" in ydl_opts["outtmpl"]["default"]:
path_to_video = f"Bing/{base_file_name}"
path_to_video = f"Bing/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
else:
path_to_video = f"{downloader.info['extractor_key']}/{base_file_name}"
path_to_video = f"{downloader.info['extractor_key']}/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
if os.path.exists(os.path.join(os.getcwd() + "/downloads/" + path_to_video)):
raise FileAlreadyExistException(message=path_to_video)
downloader.ydl_opts["quiet"] = False
downloader.ydl_opts["quiet"] = False
downloader.download()
return path_to_video

View File

@ -6,17 +6,17 @@ import logging
from aio_pika import connect, Message, DeliveryMode
from fastapi import FastAPI, Request, Depends
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import JSONResponse, FileResponse, Response
from starlette.responses import JSONResponse, FileResponse, StreamingResponse
from starlette.templating import Jinja2Templates
from src.core.redis_client import RedisClient
from src.core.s3_client import S3Client
from src.web.schemes.submit import SubmitIn, CheckIn, DeleteFromS3, CopyToAnotherBucketS3
from src.web.schemes.submit import SubmitIn, CheckIn
app = FastAPI(
title="video_downloader", openapi_url=f"/api/v1/openapi.json"
)
templates = Jinja2Templates(directory=f"{os.path.dirname(os.path.dirname(os.path.abspath(__file__)))}/web/templates")
templates = Jinja2Templates(directory="templates")
app.add_middleware(
CORSMiddleware,
@ -82,7 +82,6 @@ async def index(request: Request):
@app.post('/submit')
async def get_url_for_download_video(request: Request, data: SubmitIn = Depends()):
red = RedisClient()
s3_client = S3Client()
task_done = await is_task_already_done_or_exist(red, data.link)
# TODO: где-то не обновился статус после выполнения\провала задачи
task_in_process = await is_task_in_process(red, data.link)
@ -90,19 +89,13 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
return JSONResponse(status_code=202, content={"result": "Задача в работе. Ожидайте"})
if task_done:
if isinstance(task_done["result"], str):
file_name = task_done["result"][task_done["result"].index("dev/") + 4:task_done["result"].index("?")]
links_to_download_video = [s3_client.get(file_name)]
task_done["result"] = links_to_download_video[0]
links_to_download_video = [str(request.base_url) + "get/?file_path=" + task_done["result"]]
else:
file_names = [task_done_part[task_done_part.index("dev/") + 4:task_done_part.index("?")] for
task_done_part in task_done["result"]]
links_to_download_video = [s3_client.get(file_name) for file_name in file_names]
task_done["result"] = links_to_download_video
await red.update_task_in_tasks_done(task_done, task_done["link"])
links_to_download_video = [str(request.base_url) + "get/?file_path=" + path for path in task_done["result"]]
return JSONResponse({"result": links_to_download_video})
# TODO: учесть, что если делать запрос CURL\urllib3\etc, в теле может быть несколько ссылок -> должно быть создано несколько задач
async with await connect("amqp://guest:guest@rabbitmq/") as connection:
async with await connect("amqp://guest:guest@localhost/") as connection:
# Creating a channel
channel = await connection.channel()
body = [
@ -111,7 +104,6 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
"format": f"bv[width={data.resolution.value}][ext={data.video_format.value}]+ba[ext={data.audio_format.value}]/"
f"bv[width={data.resolution.value}][ext=mp4]+ba[ext=m4a]/"
f"bv[width={data.resolution.value}][ext=webm]+ba[ext=webm]/"
f"best[width={data.resolution.value}]/"
f"best[ext={data.video_format.value}]/"
f"best[ext!=unknown_video]",
"merge_output_format": data.merge_output_format.value,
@ -140,10 +132,15 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
@app.get('/get/', response_class=FileResponse, status_code=200)
async def download_video(file_path):
s3_client = S3Client()
file_response = s3_client.get(file_path)
return Response(content=file_response.data, headers={'Content-Disposition': f'inline; filename="{file_path}"'},
media_type="video")
base = os.path.dirname(os.path.dirname(os.path.abspath(file_path)))
base_download_dir = os.path.join(base, os.pardir, os.pardir, "downloads")
def iterfile():
with open(base_download_dir + f'/{file_path}', mode="rb") as file_like:
yield from file_like
return StreamingResponse(iterfile(), headers={'Content-Disposition': f'inline; filename="{file_path}"'},
media_type="video")
@app.post('/check/', response_class=FileResponse, status_code=200)
@ -171,9 +168,9 @@ async def download_video(data: CheckIn, request: Request):
content={"result": f"Задача выполнена с ошибкой, попробуйте загрузить еще раз"})
if tasks_done and data.link in tasks_done:
if isinstance(tasks_done[data.link]["result"], str):
links_to_download_video = [tasks_done[data.link]["result"]]
links_to_download_video = [str(request.base_url) + "get/?file_path=" + tasks_done[data.link]["result"]]
else:
links_to_download_video = [link for link in
links_to_download_video = [str(request.base_url) + "get/?file_path=" + path for path in
tasks_done[data.link]["result"]]
return JSONResponse({"result": links_to_download_video})
return JSONResponse(status_code=404, content={"result": "Задача не найдена"})
@ -183,25 +180,4 @@ async def download_video(data: CheckIn, request: Request):
except Exception as ex:
print(ex)
@app.delete('/from-s3/', status_code=200)
async def delete_video_from_s3(delete_data: DeleteFromS3):
s3_client = S3Client()
s3_client.delete(delete_data.file_name)
return JSONResponse(
status_code=200,
content={"result": f"Файл {delete_data.file_name} успешно удален из корзины {s3_client.BUCKET_NAME}"}
)
@app.post('/copy-to-another-bucket/', status_code=200)
async def delete_video_from_s3(data: CopyToAnotherBucketS3):
s3_client = S3Client()
s3_client.copy_to_another_bucket(data.file_name, data.bucket_name)
return JSONResponse(
status_code=200,
content={"result": f"Файл {data.file_name} успешно скопирован в корзину {data.bucket_name}"}
)
uvicorn.run("src.web.main:app", host="0.0.0.0", port=8000, log_level="info")
uvicorn.run("src.web.main:app", host="0.0.0.0", log_level="info")

View File

@ -50,20 +50,11 @@ class MergeOutputFormatEnum(Enum):
@dataclass
class SubmitIn:
link: str = Form(...)
video_format: VideoFormatEnum = Form(default=MergeOutputFormatEnum.format_mp4)
audio_format: AudioFormatEnum = Form(default=AudioFormatEnum.format_m4a)
resolution: ResolutionEnum = Form(default=ResolutionEnum.resolution_720)
video_format: VideoFormatEnum = Form(default=MergeOutputFormatEnum.format_webm)
audio_format: AudioFormatEnum = Form(default=AudioFormatEnum.format_webm)
resolution: ResolutionEnum = Form(default=ResolutionEnum.resolution_1080)
merge_output_format: MergeOutputFormatEnum = Form(default=MergeOutputFormatEnum.format_mkv)
class CheckIn(BaseModel):
link: str
class DeleteFromS3(BaseModel):
file_name: str
class CopyToAnotherBucketS3(BaseModel):
file_name: str
bucket_name: str

View File

@ -72,51 +72,10 @@
<body>
<form method="post" action="/submit" id="download">
<input type="text" name="link" id="link" placeholder="link">
<p>Формат видео
<select id="video_format" name="video_format">
<option hidden>Выбор формата видео</option>
<option value="3gp">3gp</option>
<option value="flv">flv</option>
<option value="mp4" selected>mp4</option>
<option value="mov">mov</option>
<option value="webm">webm</option>
</select >
</p>
<p>Формат аудио
<select id="audio_format" name="audio_format">
<option hidden>Выбор формата аудио</option>
<option value="mp3">mp3</option>
<option value="ogg">ogg</option>
<option value="m4a" selected>m4a</option>
<option value="opus">opus</option>
<option value="webm">webm</option>
<option value="wav">wav</option>
<option value="aac">aac</option>
</select >
</p>
<p>Качество видео
<select id="resolution" name="resolution">
<option hidden>Выбор формата аудио</option>
<option value="240">240</option>
<option value="360">360</option>
<option value="480" >480</option>
<option value="720" selected>720</option>
<option value="1080">1080</option>
<option value="2048">2048</option>
<option value="3840">3840</option>
</select >
</p>
<p>Выходной формат видео
<select id="merge_output_format" name="merge_output_format">
<option hidden>Выбор формата аудио</option>
<option value="avi">avi</option>
<option value="flv">flv</option>
<option value="mp4">mp4</option>
<option value="mkv" selected>mkv</option>
<option value="mov">mov</option>
<option value="webm">webm</option>
</select>
</p>
<input type="text" name="video_format" placeholder="video_format">
<input type="text" name="audio_format" placeholder="audio_format">
<input type="text" name="resolution" placeholder="resolution">
<input type="text" name="merge_output_format" placeholder="merge_output_format">
<button type="submit" class="custom-btn btn-1"><span class="submit-spinner submit-spinner_hide"></span> Download</button>
</form>
<div id="linksList" class="col">
@ -211,11 +170,7 @@
document.forms.download.querySelector('.submit-spinner').classList.add('submit-spinner_hide');
console.log(xhr.status);
if (xhr.status !== 200 && xhr.status !== 201) {
if ('response' in xhr && xhr.response !== null) {
sendReq()
} else {
return;
};
return;
};
const response = xhr.response;

View File

@ -1,13 +0,0 @@
FROM python:3.11.4
WORKDIR /app
COPY poetry.lock pyproject.toml /app/
RUN pip install poetry
RUN poetry install --no-root
COPY .. /app
CMD poetry run python main_web.py