Compare commits

47 Commits

Author SHA1 Message Date
nikili0n 439b43862c refactored tg parser 2023-11-21 23:39:27 +03:00
nikili0n 1216c815a1 refactored tg parser 2023-11-21 23:39:27 +03:00
nikili0n e2bb10ad62 Added tg_parser 2023-11-21 23:39:27 +03:00
nikili0n 7f4e661ea6 Added bing_parser.py, minor fixes 2023-11-21 23:39:27 +03:00
nikili0n 3675695922 rework yappy_parser.py, Added dzen_parser.py, minor fixes 2023-11-21 23:39:27 +03:00
nikili0n 99d602bac0 added resolution parameter, minor fixes 2023-11-21 23:39:27 +03:00
nikili0n a493edef5a added yahoo parser 2023-11-21 23:39:27 +03:00
nikili0n 14bd23c841 minor fixes 2023-11-21 23:39:27 +03:00
nikili0n e4fa00e1ce rework redis, rework web for work with array of links 2023-11-21 23:39:27 +03:00
nikili0n 30ce9aee9a fix uri path 2023-11-21 23:39:27 +03:00
nikili0n 3e45f0d3a4 live_journal 2023-11-21 23:39:27 +03:00
nikili0n dbaf2477d9 exracted parser mappings 2023-11-21 23:39:27 +03:00
nikili0n 877f79223b [master_service] added youtube.com, to allowed domains 2023-11-21 23:39:27 +03:00
nikili0n 7931fb61ba added okru parser 2023-11-21 23:39:27 +03:00
nikili0n 4c6e0e30d6 Merge remote-tracking branch 'origin/main' 2023-11-21 23:39:27 +03:00
nikili0n ca3752bb45 minor fixes 2023-11-21 23:39:27 +03:00
nikili0n aaf88c1a39 change format defaults 2023-11-21 23:39:27 +03:00
nikili0n 490e32badf minor fixes 2023-11-21 23:39:27 +03:00
nikili0n 71b9ef2173 unkown stuff 2023-11-21 23:39:27 +03:00
nikili0n bb2b339b7d minor fixes 2023-11-21 23:39:27 +03:00
nikili0n cbbe1de95f fix post check 2023-11-21 23:39:27 +03:00
nikili0n 3673a0bab6 up 2023-11-21 23:39:27 +03:00
nikili0n 1b5e5bd286 up 2023-11-21 23:39:27 +03:00
nikili0n 4b2bc8c51b up link and change to post 2023-11-21 23:39:27 +03:00
nikili0n 8a20ddb2e4 up fix to sitenotimplementedexception 2023-11-21 23:39:27 +03:00
nikili0n 9f6152f327 minor fixes 2023-11-21 23:39:27 +03:00
nikili0n fcc0eaa82f fix 2023-11-21 23:39:27 +03:00
nikili0n 227f8ab6e0 up 2023-11-21 23:39:27 +03:00
nikili0n 678ed18fa6 minor fixes 2023-11-21 23:39:27 +03:00
nikili0n d82f884569 up 2023-11-21 23:39:27 +03:00
nikili0n 09be4e198d up 2023-11-21 23:39:27 +03:00
nikili0n 1f0d51957d minor fixes, rework web service, add features 2023-11-21 23:39:27 +03:00
nikili0n d6c15a7bc2 up 2023-11-21 23:39:27 +03:00
nikili0n 1619ac0dfd minor fixes, added result processing 2023-11-21 23:39:27 +03:00
nikili0n 37995a15db up 2023-11-21 23:39:27 +03:00
nikili0n 303d1679bc change to host in web 2023-11-21 23:39:27 +03:00
nikili0n 45348aab5c main file for web 2023-11-21 23:39:27 +03:00
nikili0n 9b7e006eec minor fixes, added web serer 2023-11-21 23:39:27 +03:00
nikili0n aa519d622f added parsers for new social networks, rework master service 2023-11-21 23:39:27 +03:00
nikili0n 457d48454c refactoring for new arch, added Redis, fixed filename, added video exists check 2023-11-21 23:39:27 +03:00
nikili0n 70961fa6f8 refactoring for new arch, minor fixes 2023-11-21 23:39:27 +03:00
nikili0n 55df0b9621 refactoring master service 2023-11-21 23:39:27 +03:00
nikili0n ea3f9d23a6 added aio rmq client, added internal queue for master service 2023-11-21 23:39:27 +03:00
nikili0n 8f7006cbf0 fix extension and downloading 2023-11-21 23:39:27 +03:00
nikili0n 38e29ecfa0 added loader, added video format check, fix video download 2023-11-21 23:39:27 +03:00
nikili0n 32b92c58d2 added link 2023-11-21 23:39:27 +03:00
nikili0n 30785694f2 initial commit 2023-11-21 23:39:27 +03:00
31 changed files with 169 additions and 504 deletions
-17
View File
@@ -1,17 +0,0 @@
FROM python:3.11.4
WORKDIR /app
COPY poetry.lock pyproject.toml /app/
RUN apt-get -y update
RUN apt-get -y upgrade
RUN apt-get install -y ffmpeg
RUN pip install poetry
RUN poetry install --no-root
COPY .. /app
CMD poetry run python main.py
+2 -43
View File
@@ -1,60 +1,19 @@
version: "2.1"
networks:
network:
services:
web_service:
container_name: web_service
build:
context: .
dockerfile: web.Dockerfile
ports:
- "8000:8000"
depends_on:
redis:
condition: service_started
rabbitmq:
condition: service_healthy
restart: always
networks:
- network
download_service:
container_name: download_service
build:
context: .
dockerfile: Dockerfile
depends_on:
redis:
condition: service_started
rabbitmq:
condition: service_healthy
restart: always
networks:
- network
rabbitmq:
container_name: rabbitmq
image: rabbitmq:3.10.7-management
hostname: rabbitmq
restart: always
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 30s
timeout: 30s
retries: 3
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
volumes:
- ./rabbitmq:/var/lib/rabbitmq
ports:
- "15672:15672"
- "5672:5672"
networks:
- network
- 15672:15672
- 5672:5672
redis:
container_name: redis_video_downloader
image: redis:latest
ports:
- "6379:6379"
networks:
- network
+1 -25
View File
@@ -1,32 +1,8 @@
import asyncio
import json
from typing import Any
from multiprocessing import freeze_support
from src.core.master_service import MasterService
from loguru import logger
def json_logs(message: Any) -> None:
record = message.record
data = {
"timestamp": record["time"].strftime("%d.%m.%y %H.%M.%S %Z%z"),
"level": record["level"].name,
"message": record["message"],
"path": record["file"].path,
"function": record["function"],
"line": record["line"],
}
print(json.dumps(data))
logger.remove(0)
logger.add(json_logs)
if __name__ == '__main__':
freeze_support()
loop = asyncio.new_event_loop()
ms = MasterService(loop)
ms = MasterService()
ms.loop.run_until_complete(ms.run())
Generated
+1 -15
View File
@@ -1680,20 +1680,6 @@ files = [
[package.dependencies]
six = ">=1.5"
[[package]]
name = "python-dotenv"
version = "1.0.0"
description = "Read key-value pairs from a .env file and set them as environment variables"
optional = false
python-versions = ">=3.8"
files = [
{file = "python-dotenv-1.0.0.tar.gz", hash = "sha256:a8df96034aae6d2d50a4ebe8216326c61c3eb64836776504fcca410e5937a3ba"},
{file = "python_dotenv-1.0.0-py3-none-any.whl", hash = "sha256:f5971a9226b701070a4bf2c38c89e5a3f0d64de8debda981d1db98583009122a"},
]
[package.extras]
cli = ["click (>=5.0)"]
[[package]]
name = "python-multipart"
version = "0.0.6"
@@ -2341,4 +2327,4 @@ websockets = "*"
[metadata]
lock-version = "2.0"
python-versions = "^3.11"
content-hash = "b7973dc522b312b75a798bc966c5001b24e134cccd332de7b978e5a1ec495b57"
content-hash = "272fe31fba150b0b0fcca1b7d60f706dc2a05ea730ef19e34ccb8e5524f47d66"
-1
View File
@@ -36,7 +36,6 @@ ply = "3.11"
ruamel-yaml = "0.17.21"
flask-login = "0.6.2"
pycryptodome = "3.18.0"
python-dotenv = "^1.0.0"
[build-system]
-80
View File
@@ -1,80 +0,0 @@
aio-pika==9.2.2
aiofiles==23.1.0
aiogram==3.0.0
aiohttp==3.8.5
aiormq==6.7.7
aiosignal==1.3.1
annotated-types==0.5.0
anyio==3.7.1
async-timeout==4.0.3
attrs==23.1.0
beautifulsoup4==4.12.2
boto3==1.28.36
botocore==1.31.36
Brotli==1.0.9
certifi==2023.7.22
charset-normalizer==3.2.0
click==8.1.6
commonmark==0.9.1
fastapi==0.101.0
ffmpeg==1.4
ffprobe==0.5
Flask==2.2.2
Flask-Login==0.6.2
frozenlist==1.4.0
greenlet==2.0.2
h11==0.14.0
idna==3.4
iniconfig==2.0.0
itsdangerous==2.1.2
Jinja2==3.1.2
jmespath==1.0.1
loguru==0.6.0
lxml==4.9.3
magic-filter==1.0.11
MarkupSafe==2.1.3
minio==7.1.16
multidict==6.0.4
mutagen==1.46.0
packaging==23.1
pamqp==3.2.1
pika==1.3.2
playwright==1.37.0
pluggy==1.3.0
ply==3.11
pyaes==1.6.1
pycryptodome==3.18.0
pycryptodomex==3.18.0
pydantic==2.3.0
pydantic_core==2.6.3
pyee==9.0.4
Pygments==2.16.1
Pyrogram @ https://github.com/Dineshkarthik/pyrogram/archive/refs/heads/master.zip
PySocks==1.7.1
pytest==7.4.0
pytest-base-url==2.0.0
pytest-playwright==0.4.2
python-dateutil==2.8.2
python-multipart==0.0.6
python-slugify==8.0.1
PyYAML==5.3.1
redis==5.0.0
requests==2.31.0
rich==12.5.1
ruamel.yaml==0.17.21
s3transfer==0.6.2
setproctitle==1.3.2
six==1.16.0
sniffio==1.3.0
soupsieve==2.4.1
starlette==0.27.0
text-unidecode==1.3
TgCrypto==1.2.5
typing_extensions==4.7.1
urllib3==1.26.16
uvicorn==0.23.2
websockets==11.0.3
Werkzeug==2.2.2
yarl==1.9.2
youtube-dl @ git+https://github.com/ytdl-org/youtube-dl.git@86e3cf5e5849aefcc540c19bb5fa5ab7f470d1c1
yt-dlp==2023.7.6
-13
View File
@@ -1,13 +0,0 @@
import os
from dotenv import load_dotenv
load_dotenv()
S3_HOST = os.environ.get("S3_HOST", "s3-api.grfc.ru")
S3_ACCESS_KEY = os.environ.get("S3_ACCESS_KEY", "cl-i-oculus-dev1")
S3_SECRET_KEY = os.environ.get("S3_SECRET_KEY", "Nom8qKEU6IYtQSrNt5ZPN1XncQTZdtUM")
S3_BUCKET_NAME = os.environ.get("S3_BUCKET_NAME", "clean-internet-oculus-integration-dev")
DEFAULT_DURATION = os.environ.get("DEFAULT_DURATION", 600)
+1 -3
View File
@@ -1,7 +1,6 @@
import asyncio
import json
from loguru import logger
from playwright.async_api import async_playwright
from playwright.async_api import Playwright
from aio_pika import Message, connect, DeliveryMode
@@ -39,13 +38,12 @@ async def run(playwright: Playwright):
routing_key='hello',
)
logger.info(f" [x] Sent '{body}'")
print(f" [x] Sent '{body}'")
await page.keyboard.press("ArrowDown")
while title == await page.title():
await page.title()
async def main():
async with async_playwright() as playwright:
await run(playwright)
+34 -24
View File
@@ -1,8 +1,7 @@
import asyncio
import concurrent.futures as pool
import json
import os
import subprocess
import pyrogram
import traceback
from functools import partial
@@ -13,9 +12,9 @@ from src.core.async_queue import AsyncQueue
from src.core.rabbitmq import get_messages, publish_message_with_task_done
from src.core.redis_client import RedisClient
from src.core.result import Result, ResultTypeEnum
from src.core.s3_client import S3Client
from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException
from src.parsers.MyMail.my_mail_parser import MyMailParser
from src.parsers.Telegram.telegram_media_downloader.media_downloader import app, _check_config
from src.parsers.Yappy.yappy_parser import YappyParser
from src.parsers.base_parser import BaseParser
from src.parsers.parser_mapping import get_parser
@@ -23,8 +22,8 @@ from src.parsers.Telegram.telegram_media_downloader.telegram_parser import Teleg
class MasterService:
def __init__(self, loop):
self.loop = loop
def __init__(self):
self.loop = asyncio.get_event_loop()
self.MAX_EXECUTOR_WORKERS = 8
self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS,
@@ -38,24 +37,12 @@ class MasterService:
"for pid in $(ps -ef | grep video_downloader_executor_process | awk '{print $2}'); do kill -9 $pid; done",
shell=True, capture_output=True
)
# TODO Возможно бросать ошибку если упал мастер сервис.
redis = RedisClient()
messages = await redis.get_all_tasks_from_queue(redis.TASKS_NAME)
if messages:
messages = {k.decode("utf-8"): json.loads(v.decode("utf-8")) for k, v in
messages.items()}
for params in list(messages.values()):
await self.queue.put(params)
await redis.del_tasks_queue()
tasks = [self.loop.create_task(self.create_workers()) for i in range(self.MAX_EXECUTOR_WORKERS + 1)]
await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
async def result_processing(self, result: Result | list, redis: RedisClient, s3_client: S3Client, video_params: dict):
file_path = os.path.join(os.getcwd() + "/downloads/")
links_to_download = s3_client.upload(file_name=result.value["result"], file_path=file_path)
result.value["result"] = links_to_download
async def result_processing(self, result: Result | list, redis: RedisClient, video_params: dict):
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, link=video_params["link"])
await publish_message_with_task_done(task=result.value)
self.queue.task_done()
@@ -64,7 +51,7 @@ class MasterService:
while True:
video_params = await self.queue.get()
redis = RedisClient()
s3_client = S3Client()
await redis.del_tasks_queue()
await redis.del_task_from_queue_and_add_to_tasks(link=video_params["link"], task=video_params)
self.currently_underway[video_params['link']] = video_params
@@ -73,7 +60,7 @@ class MasterService:
))
result: Result = await download_task
await self.result_processing(result, redis, s3_client, video_params)
await self.result_processing(result, redis, video_params)
if video_params['link'] in self.currently_underway:
del self.currently_underway[video_params['link']]
@@ -83,20 +70,44 @@ class MasterService:
downloader: BaseParser | YappyParser | MyMailParser | TelegramParser = MasterService.get_parser(video_params)
match downloader:
case TelegramParser():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(downloader.video_download())
if _check_config():
tg_client = pyrogram.Client(
"media_downloader",
api_id=app.api_id,
api_hash=app.api_hash,
proxy=app.proxy,
workdir=app.session_file_path,
)
app.pre_run()
app.is_running = True
tg_client.start()
result = downloader.video_download(client=tg_client)
return result
case _:
result = downloader.video_download()
return result
@staticmethod
def get_parser(params: dict):
try:
url_parse_result = urlparse(params["link"])
uri = f"{url_parse_result.netloc}{url_parse_result.path}"
logger.info(uri)
# # TODO: похоже нужно переделать на регулярки, т.к. добавлять каждую вариацию домена моветон, вероятно я сделаюне-
# parser_mapping = {
# "my.mail.ru": MyMailParser(params),
# "www.youtube.com": BaseParser(params),
# "youtube.com": BaseParser(params),
# "youtu.be": BaseParser(params),
# "vk.com": BaseParser(params),
# "ok.ru": BaseParser(params) if "topic" not in params["link"] else OkParser(params),
# "likee.video": BaseParser(params),
# "dzen.ru": BaseParser(params),
# "yappy.media": YappyParser(params),
# "yandex.ru": BaseParser(params),
# }
return get_parser(uri)(params)
except KeyError:
raise SiteNotImplementedException
@@ -129,7 +140,6 @@ class MasterService:
"status": "error"
})
except Exception as ex:
logger.error(traceback.format_exc())
return Result(result_type=ResultTypeEnum.EXCEPTION, value={
"link": video_params["link"],
"result": traceback.format_exc(),
+5 -5
View File
@@ -4,17 +4,16 @@ from functools import partial
from aio_pika import connect, Message, DeliveryMode
from aio_pika.abc import AbstractIncomingMessage
from loguru import logger
async def on_message(message: AbstractIncomingMessage, queue) -> None:
async with message.process():
await queue.put(json.loads(message.body))
logger.info(f" Message body is: {message.body!r}")
print(f" Message body is: {message.body!r}")
async def get_messages(inner_queue) -> None:
async with await connect("amqp://guest:guest@rabbitmq/") as connection:
async with await connect("amqp://guest:guest@localhost/") as connection:
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
@@ -24,13 +23,14 @@ async def get_messages(inner_queue) -> None:
)
await queue.consume(partial(on_message, queue=inner_queue))
logger.info("[*] Waiting for messages. To exit press CTRL+C")
print(" [*] Waiting for messages. To exit press CTRL+C")
await asyncio.Future()
async def publish_message_with_task_done(task: dict | list) -> None:
queue_name = "tasks_done"
async with await connect("amqp://guest:guest@rabbitmq/") as connection:
async with await connect("amqp://guest:guest@localhost/") as connection:
# Creating channel
channel = await connection.channel()
+1 -5
View File
@@ -9,7 +9,7 @@ class RedisClient:
TASKS_DONE_NAME = "tasks_done"
def __init__(self):
self.connection = redis.Redis(host="redis_video_downloader", port=6379, db=0)
self.connection = redis.Redis(host="localhost", port=6379, db=0)
async def _set_task(self, queue_name: str, link: str, task: dict | list, ) -> int:
async with self.connection as connection:
@@ -46,7 +46,3 @@ class RedisClient:
async with self.connection as connection:
res = await connection.delete(self.TASKS_NAME)
return res
async def update_task_in_tasks_done(self, task: dict | list, link: str) -> int:
await self._del_task(self.TASKS_DONE_NAME, link)
return await self._set_task(self.TASKS_DONE_NAME, link, task)
-66
View File
@@ -1,66 +0,0 @@
from loguru import logger
from minio import Minio
from minio.commonconfig import CopySource
from src.core.config import S3_HOST, S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET_NAME
class S3Client:
HOST = S3_HOST
ACCESS_KEY = S3_ACCESS_KEY
SECRET_KEY = S3_SECRET_KEY
BUCKET_NAME = S3_BUCKET_NAME
def __init__(self):
self.client = Minio(
self.HOST,
access_key=self.ACCESS_KEY,
secret_key=self.SECRET_KEY,
secure=True
)
def _make_sure_bucket_exist(self):
found = self.client.bucket_exists(self.BUCKET_NAME)
if not found:
self.client.make_bucket(self.BUCKET_NAME)
else:
logger.info(f"Bucket {self.BUCKET_NAME} already exists")
def upload(self, file_name: str, file_path: str | list[str]):
self._make_sure_bucket_exist()
if isinstance(file_name, str):
file_path = file_path + file_name
self.client.fput_object(self.BUCKET_NAME, file_name, file_path)
logger.info(f"{file_path} is successfully uploaded as object {file_name} to bucket {self.BUCKET_NAME}.")
link_to_download = self.get(file_name)
return link_to_download
else:
result = []
for file_name_part in file_name:
current_file_path = file_path + file_name_part
self.client.fput_object(self.BUCKET_NAME, file_name_part, current_file_path)
logger.info(f"{current_file_path} is successfully uploaded as object {file_name_part} to bucket {self.BUCKET_NAME}.")
link_to_download = self.get(file_name_part)
result.append(link_to_download)
return result
def get(self, file_name: str):
self._make_sure_bucket_exist()
result = self.client.get_presigned_url(
"GET",
self.BUCKET_NAME,
file_name,
)
return result
def delete(self, file_name: str):
result = self.client.remove_object(self.BUCKET_NAME, file_name)
return result
def copy_to_another_bucket(self, file_name: str, bucket_name):
result = self.client.copy_object(
bucket_name,
file_name,
CopySource(self.BUCKET_NAME, file_name),
)
return result
+32
View File
@@ -0,0 +1,32 @@
from minio import Minio
from minio.error import S3Error
def main():
client = Minio(
"grfc.ru",
access_key="cl-i-oculus-dev1",
secret_key="Nom8qKEU6IYtQSrNt5ZPN1XncQTZdtUM",
secure=True
)
found = client.bucket_exists("clean-internet-oculus-integration-dev")
if not found:
client.make_bucket("clean-internet-oculus-integration-dev")
else:
print("Bucket 'clean-internet-oculus-integration-dev' already exists")
client.fput_object(
"clean-internet-oculus-integration-dev", "4uv2GNc_ybc_1080p.mp4", "/Users/garickbadalov/PycharmProjects/video_downloader_service/downloads/Youtube/4uv2GNc_ybc_1080p.mp4",
)
print(
"'/Users/garickbadalov/PycharmProjects/video_downloader_service/downloads/Youtube/4uv2GNc_ybc_1080p.mp4' is successfully uploaded as "
"object '4uv2GNc_ybc_1080p.mp4' to bucket 'clean-internet-oculus-integration-dev'."
)
if __name__ == "__main__":
try:
main()
except S3Error as exc:
print("error occurred.", exc)
+1 -1
View File
@@ -36,4 +36,4 @@ class DzenParser(BaseParser):
self.params["outtmpl"] = f"downloads/ZenYandex/{title}_%(resolution)s.%(ext)s"
file_path = super().video_download()
self.params["link"] = base_link
return file_path.replace("master", title) if "master" in file_path else file_path
return file_path
-9
View File
@@ -1,10 +1,8 @@
import os
import re
import requests
from bs4 import BeautifulSoup
from lxml import etree
from src.exceptions.download_exceptions import FileAlreadyExistException
from src.parsers.base_parser import BaseParser
@@ -18,13 +16,6 @@ class OkParser(BaseParser):
resp = requests.get(self.params["link"])
resp.encoding = self.BASE_ENCODING
soup = BeautifulSoup(resp.text, 'lxml')
if "topic" in self.params["link"]:
dom = etree.HTML(str(soup))
elements_with_video_id = dom.xpath(
"//div[@class='mlr_cnt']/div[contains(@data-l, 'gA,VIDEO,mB,movie,ti,')]/div[@class='vid-card "
"vid-card__xl']/div[@class='video-card_n-w']/a[contains(@onclick, 'OK.VideoPlayer.openMovie')]")
links = ["https://ok.ru/video/" + re.findall('\d+', elem.get("onclick"))[0] for elem in elements_with_video_id]
else:
required_div = [div for div in soup.find_all('div', {'class': 'invisible'}) if len(div['class']) < 2][0]
video_tags = required_div.find('span').find_all_next('span', {'itemprop': "video"})
links = [video_tag.find('a').get("href") for video_tag in video_tags]
@@ -1,9 +1,9 @@
api_hash: cb06da2bf01e15627434223242b6446d
api_id: 21648766
chat:
- chat_id: dvachannel
download_filter: id == 125493
last_read_message_id: 125493
- chat_id: -1001966291562
download_filter: id == 2048
last_read_message_id: 2048
file_formats:
video:
- all
@@ -1,3 +1,3 @@
from src.parsers.Telegram.telegram_media_downloader.module.filter import Filter
from module.filter import Filter
Filter()
@@ -11,12 +11,12 @@ from loguru import logger
from pyrogram.types import Audio, Document, Photo, Video, VideoNote, Voice
from rich.logging import RichHandler
from src.parsers.Telegram.telegram_media_downloader.module.app import Application, ChatDownloadConfig, DownloadStatus, TaskNode
from src.parsers.Telegram.telegram_media_downloader.module.bot import start_download_bot, stop_download_bot
from src.parsers.Telegram.telegram_media_downloader.module.download_stat import update_download_status
from src.parsers.Telegram.telegram_media_downloader.module.get_chat_history_v2 import get_chat_history_v2
from src.parsers.Telegram.telegram_media_downloader.module.language import _t
from src.parsers.Telegram.telegram_media_downloader.module.pyrogram_extension import (
from module.app import Application, ChatDownloadConfig, DownloadStatus, TaskNode
from module.bot import start_download_bot, stop_download_bot
from module.download_stat import update_download_status
from module.get_chat_history_v2 import get_chat_history_v2
from module.language import _t
from module.pyrogram_extension import (
fetch_message,
get_extension,
record_download_status,
@@ -25,12 +25,12 @@ from src.parsers.Telegram.telegram_media_downloader.module.pyrogram_extension im
set_meta_data,
upload_telegram_chat,
)
from src.parsers.Telegram.telegram_media_downloader.module.web import init_web
from src.parsers.Telegram.telegram_media_downloader.utils.format import truncate_filename, validate_title
from src.parsers.Telegram.telegram_media_downloader.utils.log import LogFilter
from src.parsers.Telegram.telegram_media_downloader.utils.meta import print_meta
from src.parsers.Telegram.telegram_media_downloader.utils.meta_data import MetaData
from src.parsers.Telegram.telegram_media_downloader.utils.updates import check_for_updates
from module.web import init_web
from utils.format import truncate_filename, validate_title
from utils.log import LogFilter
from utils.meta import print_meta
from utils.meta_data import MetaData
from utils.updates import check_for_updates
logging.basicConfig(
level=logging.INFO,
@@ -478,7 +478,6 @@ def _check_config() -> bool:
async def worker(client: pyrogram.client.Client):
"""Work for download task"""
# TODO: mb replace with asyncio.Event
while app.is_running:
try:
item = await queue.get()
@@ -567,7 +566,7 @@ async def run_until_all_task_finish():
def _exec_loop():
"""Exec loop"""
# TODO: broken, no loop
if app.bot_token:
app.loop.run_forever()
else:
@@ -592,7 +591,7 @@ def main():
client.start()
logger.success(_t("Successfully started (Press Ctrl+C to stop)"))
# TODO: broken
app.loop.create_task(download_all_chat(client))
for _ in range(app.max_download_task):
task = app.loop.create_task(worker(client))
@@ -10,11 +10,11 @@ from typing import List, Optional, Union
import loguru
from ruamel import yaml
from src.parsers.Telegram.telegram_media_downloader.module.cloud_drive import CloudDrive, CloudDriveConfig
from src.parsers.Telegram.telegram_media_downloader.module.filter import Filter
from src.parsers.Telegram.telegram_media_downloader.module.language import Language, set_language
from src.parsers.Telegram.telegram_media_downloader.utils.format import replace_date_time, validate_title
from src.parsers.Telegram.telegram_media_downloader.utils.meta_data import MetaData
from module.cloud_drive import CloudDrive, CloudDriveConfig
from module.filter import Filter
from module.language import Language, set_language
from utils.format import replace_date_time, validate_title
from utils.meta_data import MetaData
_yaml = yaml.YAML()
# pylint: disable = R0902
@@ -227,6 +227,9 @@ class Application:
self.web_login_secret: str = ""
self.debug_web: bool = False
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.executor = ThreadPoolExecutor(
min(32, (os.cpu_count() or 0) + 4), thread_name_prefix="multi_task"
)
@@ -444,8 +447,7 @@ class Application:
self.cloud_drive_config, self.save_path, local_file_path
)
elif self.cloud_drive_config.upload_adapter == "aligo":
loop = asyncio.get_running_loop()
ret = await loop.run_in_executor(
ret = await self.loop.run_in_executor(
self.executor,
CloudDrive.aligo_upload_file(
self.cloud_drive_config, self.save_path, local_file_path
@@ -11,25 +11,25 @@ from pyrogram import types
from pyrogram.handlers import MessageHandler
from ruamel import yaml
from src.parsers.Telegram.telegram_media_downloader import utils
from src.parsers.Telegram.telegram_media_downloader.module.app import (
import utils
from module.app import (
Application,
ChatDownloadConfig,
ForwardStatus,
TaskNode,
TaskType,
)
from src.parsers.Telegram.telegram_media_downloader.module.filter import Filter
from src.parsers.Telegram.telegram_media_downloader.module.language import Language, _t
from src.parsers.Telegram.telegram_media_downloader.module.pyrogram_extension import (
from module.filter import Filter
from module.language import Language, _t
from module.pyrogram_extension import (
check_user_permission,
get_message_with_retry,
report_bot_forward_status,
report_bot_status,
set_meta_data,
)
from src.parsers.Telegram.telegram_media_downloader.utils.format import extract_info_from_link, replace_date_time, validate_title
from src.parsers.Telegram.telegram_media_downloader.utils.meta_data import MetaData
from utils.format import extract_info_from_link, replace_date_time, validate_title
from utils.meta_data import MetaData
# from pyrogram.types import (ReplyKeyboardMarkup, InlineKeyboardMarkup,
# InlineKeyboardButton)
@@ -232,10 +232,8 @@ class DownloadBot:
pass
# TODO: add admin
# self.bot.set_my_commands(commands, scope=types.BotCommandScopeChatAdministrators(self.app.))
# TODO: check for correctness
loop = asyncio.get_running_loop()
loop.create_task(_bot.update_reply_message())
# _bot.app.loop.create_task(_bot.update_reply_message())
_bot.app.loop.create_task(_bot.update_reply_message())
_bot = DownloadBot()
@@ -8,7 +8,7 @@ from zipfile import ZipFile
from loguru import logger
from src.parsers.Telegram.telegram_media_downloader.utils import platforms as platform
from utils import platform
# pylint: disable = R0902
@@ -6,8 +6,8 @@ from typing import Any, Optional, Tuple
from ply import lex, yacc
from src.parsers.Telegram.telegram_media_downloader.utils.format import get_byte_from_str
from src.parsers.Telegram.telegram_media_downloader.utils.meta_data import MetaData, NoneObj, ReString
from utils.format import get_byte_from_str
from utils.meta_data import MetaData, NoneObj, ReString
# pylint: disable = R0904
@@ -23,11 +23,11 @@ from pyrogram.file_id import (
)
from pyrogram.mime_types import mime_types
from src.parsers.Telegram.telegram_media_downloader.module.app import Application, DownloadStatus, ForwardStatus, TaskNode
from src.parsers.Telegram.telegram_media_downloader.module.download_stat import get_download_result
from src.parsers.Telegram.telegram_media_downloader.module.language import Language, _t
from src.parsers.Telegram.telegram_media_downloader.utils.format import create_progress_bar, format_byte, truncate_filename
from src.parsers.Telegram.telegram_media_downloader.utils.meta_data import MetaData
from module.app import Application, DownloadStatus, ForwardStatus, TaskNode
from module.download_stat import get_download_result
from module.language import Language, _t
from utils.format import create_progress_bar, format_byte, truncate_filename
from utils.meta_data import MetaData
_mimetypes = MimeTypes()
_mimetypes.readfp(StringIO(mime_types))
@@ -6,18 +6,18 @@ import threading
from flask import Flask, jsonify, render_template, request
from src.parsers.Telegram.telegram_media_downloader import utils
import utils
from flask_login import LoginManager, UserMixin, login_required, login_user
from src.parsers.Telegram.telegram_media_downloader.module.app import Application
from src.parsers.Telegram.telegram_media_downloader.module.download_stat import (
from module.app import Application
from module.download_stat import (
DownloadState,
get_download_result,
get_download_state,
get_total_download_speed,
set_download_state,
)
from src.parsers.Telegram.telegram_media_downloader.utils.crypto import AesBase64
from src.parsers.Telegram.telegram_media_downloader.utils.format import format_byte
from utils.crypto import AesBase64
from utils.format import format_byte
log = logging.getLogger("werkzeug")
log.setLevel(logging.ERROR)
@@ -2,7 +2,7 @@ import os
from urllib.parse import urlparse
from loguru import logger
import pyrogram
from pyrogram import Client
from ruamel.yaml import YAML
from src.exceptions.download_exceptions import FileAlreadyExistException
@@ -12,7 +12,7 @@ from src.parsers.base_parser import BaseParser
class TelegramParser(BaseParser):
async def video_download(self):
def video_download(self, client: Client = None):
url_parse_result = urlparse(self.params["link"])
channel, message_id = url_parse_result.path[1:].split('/') if "/c/" not in url_parse_result.path else \
url_parse_result.path[3:].split('/')
@@ -29,19 +29,9 @@ class TelegramParser(BaseParser):
mode="w+", encoding="utf-8") as f:
YAML().dump(config, f)
if _check_config():
tg_client = pyrogram.Client(
"media_downloader",
api_id=app.api_id,
api_hash=app.api_hash,
proxy=app.proxy,
workdir=app.session_file_path,
)
app.pre_run()
app.is_running = True
await tg_client.start()
await download_all_chat(tg_client)
await worker(tg_client)
await tg_client.stop()
app.loop.run_until_complete(download_all_chat(client))
app.loop.run_until_complete(worker(client))
client.stop()
app.is_running = False
logger.info("Stopped!")
return f"Telegram/{message_id}.mp4"
+5 -9
View File
@@ -2,9 +2,7 @@ import errno
import os
from loguru import logger
from yt_dlp import download_range_func
from src.core.config import DEFAULT_DURATION
from src.core.ydl import VideoDownloader
from src.exceptions.download_exceptions import FileAlreadyExistException
@@ -22,7 +20,6 @@ class BaseParser:
"logger": logger,
"merge_output_format": self.params["merge_output_format"],
'outtmpl': self.params["outtmpl"],
'download_ranges': download_range_func(None, [(0, int(DEFAULT_DURATION))]),
# "quiet": True
}
downloader = VideoDownloader(link=self.params["link"], ydl_opts=ydl_opts)
@@ -32,19 +29,18 @@ class BaseParser:
resolution = downloader.info['resolution']
else:
resolution = "NA"
base_file_name = f"{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
if "Yahoo" in ydl_opts["outtmpl"]["default"]:
path_to_video = f"Yahoo/{base_file_name}"
path_to_video = f"Yahoo/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
elif "ZenYandex" in ydl_opts["outtmpl"]["default"]:
path_to_video = f"ZenYandex/{base_file_name}"
path_to_video = f"ZenYandex/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
elif "Bing" in ydl_opts["outtmpl"]["default"]:
path_to_video = f"Bing/{base_file_name}"
path_to_video = f"Bing/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
else:
path_to_video = f"{downloader.info['extractor_key']}/{base_file_name}"
path_to_video = f"{downloader.info['extractor_key']}/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
if os.path.exists(os.path.join(os.getcwd() + "/downloads/" + path_to_video)):
raise FileAlreadyExistException(message=path_to_video)
downloader.ydl_opts["quiet"] = False
downloader.ydl_opts["quiet"] = False
downloader.download()
return path_to_video
+18 -42
View File
@@ -6,17 +6,17 @@ import logging
from aio_pika import connect, Message, DeliveryMode
from fastapi import FastAPI, Request, Depends
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import JSONResponse, FileResponse, Response
from starlette.responses import JSONResponse, FileResponse, StreamingResponse
from starlette.templating import Jinja2Templates
from src.core.redis_client import RedisClient
from src.core.s3_client import S3Client
from src.web.schemes.submit import SubmitIn, CheckIn, DeleteFromS3, CopyToAnotherBucketS3
from src.web.schemes.submit import SubmitIn, CheckIn
app = FastAPI(
title="video_downloader", openapi_url=f"/api/v1/openapi.json"
)
templates = Jinja2Templates(directory=f"{os.path.dirname(os.path.dirname(os.path.abspath(__file__)))}/web/templates")
templates = Jinja2Templates(directory="templates")
app.add_middleware(
CORSMiddleware,
@@ -82,7 +82,6 @@ async def index(request: Request):
@app.post('/submit')
async def get_url_for_download_video(request: Request, data: SubmitIn = Depends()):
red = RedisClient()
s3_client = S3Client()
task_done = await is_task_already_done_or_exist(red, data.link)
# TODO: где-то не обновился статус после выполнения\провала задачи
task_in_process = await is_task_in_process(red, data.link)
@@ -90,19 +89,13 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
return JSONResponse(status_code=202, content={"result": "Задача в работе. Ожидайте"})
if task_done:
if isinstance(task_done["result"], str):
file_name = task_done["result"][task_done["result"].index("dev/") + 4:task_done["result"].index("?")]
links_to_download_video = [s3_client.get(file_name)]
task_done["result"] = links_to_download_video[0]
links_to_download_video = [str(request.base_url) + "get/?file_path=" + task_done["result"]]
else:
file_names = [task_done_part[task_done_part.index("dev/") + 4:task_done_part.index("?")] for
task_done_part in task_done["result"]]
links_to_download_video = [s3_client.get(file_name) for file_name in file_names]
task_done["result"] = links_to_download_video
await red.update_task_in_tasks_done(task_done, task_done["link"])
links_to_download_video = [str(request.base_url) + "get/?file_path=" + path for path in task_done["result"]]
return JSONResponse({"result": links_to_download_video})
# TODO: учесть, что если делать запрос CURL\urllib3\etc, в теле может быть несколько ссылок -> должно быть создано несколько задач
async with await connect("amqp://guest:guest@rabbitmq/") as connection:
async with await connect("amqp://guest:guest@localhost/") as connection:
# Creating a channel
channel = await connection.channel()
body = [
@@ -111,7 +104,6 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
"format": f"bv[width={data.resolution.value}][ext={data.video_format.value}]+ba[ext={data.audio_format.value}]/"
f"bv[width={data.resolution.value}][ext=mp4]+ba[ext=m4a]/"
f"bv[width={data.resolution.value}][ext=webm]+ba[ext=webm]/"
f"best[width={data.resolution.value}]/"
f"best[ext={data.video_format.value}]/"
f"best[ext!=unknown_video]",
"merge_output_format": data.merge_output_format.value,
@@ -140,9 +132,14 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
@app.get('/get/', response_class=FileResponse, status_code=200)
async def download_video(file_path):
s3_client = S3Client()
file_response = s3_client.get(file_path)
return Response(content=file_response.data, headers={'Content-Disposition': f'inline; filename="{file_path}"'},
base = os.path.dirname(os.path.dirname(os.path.abspath(file_path)))
base_download_dir = os.path.join(base, os.pardir, os.pardir, "downloads")
def iterfile():
with open(base_download_dir + f'/{file_path}', mode="rb") as file_like:
yield from file_like
return StreamingResponse(iterfile(), headers={'Content-Disposition': f'inline; filename="{file_path}"'},
media_type="video")
@@ -171,9 +168,9 @@ async def download_video(data: CheckIn, request: Request):
content={"result": f"Задача выполнена с ошибкой, попробуйте загрузить еще раз"})
if tasks_done and data.link in tasks_done:
if isinstance(tasks_done[data.link]["result"], str):
links_to_download_video = [tasks_done[data.link]["result"]]
links_to_download_video = [str(request.base_url) + "get/?file_path=" + tasks_done[data.link]["result"]]
else:
links_to_download_video = [link for link in
links_to_download_video = [str(request.base_url) + "get/?file_path=" + path for path in
tasks_done[data.link]["result"]]
return JSONResponse({"result": links_to_download_video})
return JSONResponse(status_code=404, content={"result": "Задача не найдена"})
@@ -183,25 +180,4 @@ async def download_video(data: CheckIn, request: Request):
except Exception as ex:
print(ex)
@app.delete('/from-s3/', status_code=200)
async def delete_video_from_s3(delete_data: DeleteFromS3):
s3_client = S3Client()
s3_client.delete(delete_data.file_name)
return JSONResponse(
status_code=200,
content={"result": f"Файл {delete_data.file_name} успешно удален из корзины {s3_client.BUCKET_NAME}"}
)
@app.post('/copy-to-another-bucket/', status_code=200)
async def delete_video_from_s3(data: CopyToAnotherBucketS3):
s3_client = S3Client()
s3_client.copy_to_another_bucket(data.file_name, data.bucket_name)
return JSONResponse(
status_code=200,
content={"result": f"Файл {data.file_name} успешно скопирован в корзину {data.bucket_name}"}
)
uvicorn.run("src.web.main:app", host="0.0.0.0", port=8000, log_level="info")
uvicorn.run("src.web.main:app", host="0.0.0.0", log_level="info")
+3 -12
View File
@@ -50,20 +50,11 @@ class MergeOutputFormatEnum(Enum):
@dataclass
class SubmitIn:
link: str = Form(...)
video_format: VideoFormatEnum = Form(default=MergeOutputFormatEnum.format_mp4)
audio_format: AudioFormatEnum = Form(default=AudioFormatEnum.format_m4a)
resolution: ResolutionEnum = Form(default=ResolutionEnum.resolution_720)
video_format: VideoFormatEnum = Form(default=MergeOutputFormatEnum.format_webm)
audio_format: AudioFormatEnum = Form(default=AudioFormatEnum.format_webm)
resolution: ResolutionEnum = Form(default=ResolutionEnum.resolution_1080)
merge_output_format: MergeOutputFormatEnum = Form(default=MergeOutputFormatEnum.format_mkv)
class CheckIn(BaseModel):
link: str
class DeleteFromS3(BaseModel):
file_name: str
class CopyToAnotherBucketS3(BaseModel):
file_name: str
bucket_name: str
+4 -49
View File
@@ -72,51 +72,10 @@
<body>
<form method="post" action="/submit" id="download">
<input type="text" name="link" id="link" placeholder="link">
<p>Формат видео
<select id="video_format" name="video_format">
<option hidden>Выбор формата видео</option>
<option value="3gp">3gp</option>
<option value="flv">flv</option>
<option value="mp4" selected>mp4</option>
<option value="mov">mov</option>
<option value="webm">webm</option>
</select >
</p>
<p>Формат аудио
<select id="audio_format" name="audio_format">
<option hidden>Выбор формата аудио</option>
<option value="mp3">mp3</option>
<option value="ogg">ogg</option>
<option value="m4a" selected>m4a</option>
<option value="opus">opus</option>
<option value="webm">webm</option>
<option value="wav">wav</option>
<option value="aac">aac</option>
</select >
</p>
<p>Качество видео
<select id="resolution" name="resolution">
<option hidden>Выбор формата аудио</option>
<option value="240">240</option>
<option value="360">360</option>
<option value="480" >480</option>
<option value="720" selected>720</option>
<option value="1080">1080</option>
<option value="2048">2048</option>
<option value="3840">3840</option>
</select >
</p>
<p>Выходной формат видео
<select id="merge_output_format" name="merge_output_format">
<option hidden>Выбор формата аудио</option>
<option value="avi">avi</option>
<option value="flv">flv</option>
<option value="mp4">mp4</option>
<option value="mkv" selected>mkv</option>
<option value="mov">mov</option>
<option value="webm">webm</option>
</select>
</p>
<input type="text" name="video_format" placeholder="video_format">
<input type="text" name="audio_format" placeholder="audio_format">
<input type="text" name="resolution" placeholder="resolution">
<input type="text" name="merge_output_format" placeholder="merge_output_format">
<button type="submit" class="custom-btn btn-1"><span class="submit-spinner submit-spinner_hide"></span> Download</button>
</form>
<div id="linksList" class="col">
@@ -211,12 +170,8 @@
document.forms.download.querySelector('.submit-spinner').classList.add('submit-spinner_hide');
console.log(xhr.status);
if (xhr.status !== 200 && xhr.status !== 201) {
if ('response' in xhr && xhr.response !== null) {
sendReq()
} else {
return;
};
};
const response = xhr.response;
-13
View File
@@ -1,13 +0,0 @@
FROM python:3.11.4
WORKDIR /app
COPY poetry.lock pyproject.toml /app/
RUN pip install poetry
RUN poetry install --no-root
COPY .. /app
CMD poetry run python main_web.py