Compare commits

..

67 Commits

Author SHA1 Message Date
nikili0n d86e95093d refactored docker-compose 2023-11-29 16:15:48 +03:00
nikili0n 35b6ff8546 refactored logging with json_logs func 2023-11-29 16:15:48 +03:00
nikili0n 8747643616 added s3_client, refactored web and master services 2023-11-21 23:36:58 +03:00
nikili0n 29bffb6e53 minor fixes 2023-11-21 23:36:58 +03:00
nikili0n aad25c98d9 minor fixes 2023-11-21 23:36:58 +03:00
nikili0n e183953bfa wip 2023-11-21 23:36:58 +03:00
nikili0n 70f25f3e36 timeout 2023-11-21 23:36:58 +03:00
nikili0n be792ca57f up 2023-11-21 23:36:58 +03:00
nikili0n b2806da8ca wip 2023-11-21 23:36:58 +03:00
nikili0n dc066a17ab test wip 2023-11-21 23:36:58 +03:00
nikili0n 69eb65f32b remove loop from app 2023-11-21 23:36:58 +03:00
nikili0n a9575c55e9 up wip 2023-11-21 23:36:58 +03:00
nikili0n c12b966496 wip telegram test 2023-11-21 23:36:58 +03:00
nikili0n 913dc7f9aa minor fixes 2023-11-21 23:36:58 +03:00
nikili0n 724e07179d minor fixes 2023-11-21 23:36:58 +03:00
nikili0n 5bb7a72c6c minor fixes 2023-11-21 23:36:58 +03:00
nikili0n 56a43d212e minor fixes 2023-11-21 23:36:58 +03:00
nikili0n 2c6547f04f fix paths 2023-11-21 23:36:58 +03:00
nikili0n b83017a3d8 fix paths 2023-11-21 23:36:58 +03:00
nikili0n 85e3bb4e8e Merge pull request 'feature/tg_parser' (#2) from feature/tg_parser into main
Reviewed-on: #2
2023-11-21 23:36:58 +03:00
nikili0n 398260eafd Added bing_parser.py, minor fixes 2023-11-21 23:36:58 +03:00
nikili0n 038eba9b2a refactored tg parser 2023-11-21 23:36:58 +03:00
nikili0n 6abd2807aa rework yappy_parser.py, Added dzen_parser.py, minor fixes 2023-11-21 23:36:58 +03:00
nikili0n d2fa090731 refactored tg parser 2023-11-21 23:36:58 +03:00
nikili0n 17a257955c added resolution parameter, minor fixes 2023-11-21 23:36:58 +03:00
nikili0n 3ec8326fcd Added tg_parser 2023-11-21 23:36:58 +03:00
nikili0n 75be832b2d added yahoo parser 2023-11-21 23:36:58 +03:00
nikili0n 93b8e50680 minor fixes 2023-11-21 23:36:58 +03:00
nikili0n 5b848dd3eb rework redis, rework web for work with array of links 2023-11-21 23:36:58 +03:00
nikili0n 1a2975c0d4 fix uri path 2023-11-21 23:36:58 +03:00
nikili0n 9210bcbd07 live_journal 2023-11-21 23:36:58 +03:00
nikili0n a79b29e0fa exracted parser mappings 2023-11-21 23:36:58 +03:00
nikili0n d24d9a3a89 [master_service] added youtube.com, to allowed domains 2023-11-21 23:36:58 +03:00
nikili0n adc191e71f added okru parser 2023-11-21 23:36:58 +03:00
nikili0n 9728287569 Merge remote-tracking branch 'origin/main' 2023-11-21 23:36:58 +03:00
nikili0n 1d8b2e4a0d minor fixes 2023-11-21 23:36:58 +03:00
nikili0n a58b543529 change format defaults 2023-11-21 23:36:58 +03:00
nikili0n df51b59379 minor fixes 2023-11-21 23:36:58 +03:00
nikili0n 9d17a292bd unkown stuff 2023-11-21 23:36:58 +03:00
nikili0n 337885746d minor fixes 2023-11-21 23:36:58 +03:00
nikili0n ad3c7450fb fix post check 2023-11-21 23:36:58 +03:00
nikili0n fa75f980d2 up 2023-11-21 23:36:58 +03:00
nikili0n 00c10a4145 up 2023-11-21 23:36:58 +03:00
nikili0n ed267e065f up link and change to post 2023-11-21 23:36:58 +03:00
nikili0n d7b82e3184 up fix to sitenotimplementedexception 2023-11-21 23:36:58 +03:00
nikili0n 89398ae5d1 minor fixes 2023-11-21 23:36:58 +03:00
nikili0n 90207dd5a7 fix 2023-11-21 23:36:58 +03:00
nikili0n c3a3138c0c up 2023-11-21 23:36:58 +03:00
nikili0n 72c2f50a0c minor fixes 2023-11-21 23:36:58 +03:00
nikili0n ef9dc04458 up 2023-11-21 23:36:58 +03:00
nikili0n 8bd1463401 up 2023-11-21 23:36:58 +03:00
nikili0n 9d6d9947f5 minor fixes, rework web service, add features 2023-11-21 23:36:58 +03:00
nikili0n 801b9f2e52 up 2023-11-21 23:36:58 +03:00
nikili0n 79afa55e73 minor fixes, added result processing 2023-11-21 23:36:58 +03:00
nikili0n bce40ee341 up 2023-11-21 23:36:58 +03:00
nikili0n 53b3481c0e change to host in web 2023-11-21 23:36:58 +03:00
nikili0n 1fea12fc29 main file for web 2023-11-21 23:36:58 +03:00
nikili0n a672d2e421 minor fixes, added web serer 2023-11-21 23:36:58 +03:00
nikili0n 5f9b092832 added parsers for new social networks, rework master service 2023-11-21 23:36:58 +03:00
nikili0n 26740ef9ed refactoring for new arch, added Redis, fixed filename, added video exists check 2023-11-21 23:36:58 +03:00
nikili0n dbd1f19c95 refactoring for new arch, minor fixes 2023-11-21 23:36:58 +03:00
nikili0n 338d2c58a1 refactoring master service 2023-11-21 23:36:58 +03:00
nikili0n e909c80178 added aio rmq client, added internal queue for master service 2023-11-21 23:36:58 +03:00
nikili0n c1a4972889 fix extension and downloading 2023-11-21 23:36:58 +03:00
nikili0n 934fde77ec added loader, added video format check, fix video download 2023-11-21 23:36:58 +03:00
nikili0n 87cc8a0648 added link 2023-11-21 23:36:58 +03:00
nikili0n 46ae3a7077 initial commit 2023-11-21 23:36:58 +03:00
31 changed files with 504 additions and 169 deletions
+17
View File
@@ -0,0 +1,17 @@
FROM python:3.11.4
WORKDIR /app
COPY poetry.lock pyproject.toml /app/
RUN apt-get -y update
RUN apt-get -y upgrade
RUN apt-get install -y ffmpeg
RUN pip install poetry
RUN poetry install --no-root
COPY .. /app
CMD poetry run python main.py
+43 -2
View File
@@ -1,19 +1,60 @@
version: "2.1"
networks:
network:
services:
web_service:
container_name: web_service
build:
context: .
dockerfile: web.Dockerfile
ports:
- "8000:8000"
depends_on:
redis:
condition: service_started
rabbitmq:
condition: service_healthy
restart: always
networks:
- network
download_service:
container_name: download_service
build:
context: .
dockerfile: Dockerfile
depends_on:
redis:
condition: service_started
rabbitmq:
condition: service_healthy
restart: always
networks:
- network
rabbitmq:
container_name: rabbitmq
image: rabbitmq:3.10.7-management
hostname: rabbitmq
restart: always
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 30s
timeout: 30s
retries: 3
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
volumes:
- ./rabbitmq:/var/lib/rabbitmq
ports:
- 15672:15672
- 5672:5672
- "15672:15672"
- "5672:5672"
networks:
- network
redis:
container_name: redis_video_downloader
image: redis:latest
ports:
- "6379:6379"
networks:
- network
+26 -2
View File
@@ -1,8 +1,32 @@
import asyncio
import json
from typing import Any
from multiprocessing import freeze_support
from src.core.master_service import MasterService
from loguru import logger
def json_logs(message: Any) -> None:
record = message.record
data = {
"timestamp": record["time"].strftime("%d.%m.%y %H.%M.%S %Z%z"),
"level": record["level"].name,
"message": record["message"],
"path": record["file"].path,
"function": record["function"],
"line": record["line"],
}
print(json.dumps(data))
logger.remove(0)
logger.add(json_logs)
if __name__ == '__main__':
freeze_support()
ms = MasterService()
ms.loop.run_until_complete(ms.run())
loop = asyncio.new_event_loop()
ms = MasterService(loop)
ms.loop.run_until_complete(ms.run())
Generated
+15 -1
View File
@@ -1680,6 +1680,20 @@ files = [
[package.dependencies]
six = ">=1.5"
[[package]]
name = "python-dotenv"
version = "1.0.0"
description = "Read key-value pairs from a .env file and set them as environment variables"
optional = false
python-versions = ">=3.8"
files = [
{file = "python-dotenv-1.0.0.tar.gz", hash = "sha256:a8df96034aae6d2d50a4ebe8216326c61c3eb64836776504fcca410e5937a3ba"},
{file = "python_dotenv-1.0.0-py3-none-any.whl", hash = "sha256:f5971a9226b701070a4bf2c38c89e5a3f0d64de8debda981d1db98583009122a"},
]
[package.extras]
cli = ["click (>=5.0)"]
[[package]]
name = "python-multipart"
version = "0.0.6"
@@ -2327,4 +2341,4 @@ websockets = "*"
[metadata]
lock-version = "2.0"
python-versions = "^3.11"
content-hash = "272fe31fba150b0b0fcca1b7d60f706dc2a05ea730ef19e34ccb8e5524f47d66"
content-hash = "b7973dc522b312b75a798bc966c5001b24e134cccd332de7b978e5a1ec495b57"
+1
View File
@@ -36,6 +36,7 @@ ply = "3.11"
ruamel-yaml = "0.17.21"
flask-login = "0.6.2"
pycryptodome = "3.18.0"
python-dotenv = "^1.0.0"
[build-system]
+80
View File
@@ -0,0 +1,80 @@
aio-pika==9.2.2
aiofiles==23.1.0
aiogram==3.0.0
aiohttp==3.8.5
aiormq==6.7.7
aiosignal==1.3.1
annotated-types==0.5.0
anyio==3.7.1
async-timeout==4.0.3
attrs==23.1.0
beautifulsoup4==4.12.2
boto3==1.28.36
botocore==1.31.36
Brotli==1.0.9
certifi==2023.7.22
charset-normalizer==3.2.0
click==8.1.6
commonmark==0.9.1
fastapi==0.101.0
ffmpeg==1.4
ffprobe==0.5
Flask==2.2.2
Flask-Login==0.6.2
frozenlist==1.4.0
greenlet==2.0.2
h11==0.14.0
idna==3.4
iniconfig==2.0.0
itsdangerous==2.1.2
Jinja2==3.1.2
jmespath==1.0.1
loguru==0.6.0
lxml==4.9.3
magic-filter==1.0.11
MarkupSafe==2.1.3
minio==7.1.16
multidict==6.0.4
mutagen==1.46.0
packaging==23.1
pamqp==3.2.1
pika==1.3.2
playwright==1.37.0
pluggy==1.3.0
ply==3.11
pyaes==1.6.1
pycryptodome==3.18.0
pycryptodomex==3.18.0
pydantic==2.3.0
pydantic_core==2.6.3
pyee==9.0.4
Pygments==2.16.1
Pyrogram @ https://github.com/Dineshkarthik/pyrogram/archive/refs/heads/master.zip
PySocks==1.7.1
pytest==7.4.0
pytest-base-url==2.0.0
pytest-playwright==0.4.2
python-dateutil==2.8.2
python-multipart==0.0.6
python-slugify==8.0.1
PyYAML==5.3.1
redis==5.0.0
requests==2.31.0
rich==12.5.1
ruamel.yaml==0.17.21
s3transfer==0.6.2
setproctitle==1.3.2
six==1.16.0
sniffio==1.3.0
soupsieve==2.4.1
starlette==0.27.0
text-unidecode==1.3
TgCrypto==1.2.5
typing_extensions==4.7.1
urllib3==1.26.16
uvicorn==0.23.2
websockets==11.0.3
Werkzeug==2.2.2
yarl==1.9.2
youtube-dl @ git+https://github.com/ytdl-org/youtube-dl.git@86e3cf5e5849aefcc540c19bb5fa5ab7f470d1c1
yt-dlp==2023.7.6
+13
View File
@@ -0,0 +1,13 @@
import os
from dotenv import load_dotenv
load_dotenv()
S3_HOST = os.environ.get("S3_HOST", "s3-api.grfc.ru")
S3_ACCESS_KEY = os.environ.get("S3_ACCESS_KEY", "cl-i-oculus-dev1")
S3_SECRET_KEY = os.environ.get("S3_SECRET_KEY", "Nom8qKEU6IYtQSrNt5ZPN1XncQTZdtUM")
S3_BUCKET_NAME = os.environ.get("S3_BUCKET_NAME", "clean-internet-oculus-integration-dev")
DEFAULT_DURATION = os.environ.get("DEFAULT_DURATION", 600)
+3 -1
View File
@@ -1,6 +1,7 @@
import asyncio
import json
from loguru import logger
from playwright.async_api import async_playwright
from playwright.async_api import Playwright
from aio_pika import Message, connect, DeliveryMode
@@ -38,12 +39,13 @@ async def run(playwright: Playwright):
routing_key='hello',
)
print(f" [x] Sent '{body}'")
logger.info(f" [x] Sent '{body}'")
await page.keyboard.press("ArrowDown")
while title == await page.title():
await page.title()
async def main():
async with async_playwright() as playwright:
await run(playwright)
+25 -35
View File
@@ -1,7 +1,8 @@
import asyncio
import concurrent.futures as pool
import json
import os
import subprocess
import pyrogram
import traceback
from functools import partial
@@ -12,9 +13,9 @@ from src.core.async_queue import AsyncQueue
from src.core.rabbitmq import get_messages, publish_message_with_task_done
from src.core.redis_client import RedisClient
from src.core.result import Result, ResultTypeEnum
from src.core.s3_client import S3Client
from src.exceptions.download_exceptions import FileAlreadyExistException, SiteNotImplementedException
from src.parsers.MyMail.my_mail_parser import MyMailParser
from src.parsers.Telegram.telegram_media_downloader.media_downloader import app, _check_config
from src.parsers.Yappy.yappy_parser import YappyParser
from src.parsers.base_parser import BaseParser
from src.parsers.parser_mapping import get_parser
@@ -22,8 +23,8 @@ from src.parsers.Telegram.telegram_media_downloader.telegram_parser import Teleg
class MasterService:
def __init__(self):
self.loop = asyncio.get_event_loop()
def __init__(self, loop):
self.loop = loop
self.MAX_EXECUTOR_WORKERS = 8
self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS,
@@ -37,12 +38,24 @@ class MasterService:
"for pid in $(ps -ef | grep video_downloader_executor_process | awk '{print $2}'); do kill -9 $pid; done",
shell=True, capture_output=True
)
# TODO Возможно бросать ошибку если упал мастер сервис.
redis = RedisClient()
messages = await redis.get_all_tasks_from_queue(redis.TASKS_NAME)
if messages:
messages = {k.decode("utf-8"): json.loads(v.decode("utf-8")) for k, v in
messages.items()}
for params in list(messages.values()):
await self.queue.put(params)
await redis.del_tasks_queue()
tasks = [self.loop.create_task(self.create_workers()) for i in range(self.MAX_EXECUTOR_WORKERS + 1)]
await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
async def result_processing(self, result: Result | list, redis: RedisClient, video_params: dict):
async def result_processing(self, result: Result | list, redis: RedisClient, s3_client: S3Client, video_params: dict):
file_path = os.path.join(os.getcwd() + "/downloads/")
links_to_download = s3_client.upload(file_name=result.value["result"], file_path=file_path)
result.value["result"] = links_to_download
await redis.del_task_from_tasks_and_add_to_task_done(task=result.value, link=video_params["link"])
await publish_message_with_task_done(task=result.value)
self.queue.task_done()
@@ -51,7 +64,7 @@ class MasterService:
while True:
video_params = await self.queue.get()
redis = RedisClient()
await redis.del_tasks_queue()
s3_client = S3Client()
await redis.del_task_from_queue_and_add_to_tasks(link=video_params["link"], task=video_params)
self.currently_underway[video_params['link']] = video_params
@@ -60,7 +73,7 @@ class MasterService:
))
result: Result = await download_task
await self.result_processing(result, redis, video_params)
await self.result_processing(result, redis, s3_client, video_params)
if video_params['link'] in self.currently_underway:
del self.currently_underway[video_params['link']]
@@ -70,44 +83,20 @@ class MasterService:
downloader: BaseParser | YappyParser | MyMailParser | TelegramParser = MasterService.get_parser(video_params)
match downloader:
case TelegramParser():
if _check_config():
tg_client = pyrogram.Client(
"media_downloader",
api_id=app.api_id,
api_hash=app.api_hash,
proxy=app.proxy,
workdir=app.session_file_path,
)
app.pre_run()
app.is_running = True
tg_client.start()
result = downloader.video_download(client=tg_client)
return result
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(downloader.video_download())
return result
case _:
result = downloader.video_download()
return result
@staticmethod
def get_parser(params: dict):
try:
url_parse_result = urlparse(params["link"])
uri = f"{url_parse_result.netloc}{url_parse_result.path}"
logger.info(uri)
# # TODO: похоже нужно переделать на регулярки, т.к. добавлять каждую вариацию домена моветон, вероятно я сделаюне-
# parser_mapping = {
# "my.mail.ru": MyMailParser(params),
# "www.youtube.com": BaseParser(params),
# "youtube.com": BaseParser(params),
# "youtu.be": BaseParser(params),
# "vk.com": BaseParser(params),
# "ok.ru": BaseParser(params) if "topic" not in params["link"] else OkParser(params),
# "likee.video": BaseParser(params),
# "dzen.ru": BaseParser(params),
# "yappy.media": YappyParser(params),
# "yandex.ru": BaseParser(params),
# }
return get_parser(uri)(params)
except KeyError:
raise SiteNotImplementedException
@@ -140,6 +129,7 @@ class MasterService:
"status": "error"
})
except Exception as ex:
logger.error(traceback.format_exc())
return Result(result_type=ResultTypeEnum.EXCEPTION, value={
"link": video_params["link"],
"result": traceback.format_exc(),
+5 -5
View File
@@ -4,16 +4,17 @@ from functools import partial
from aio_pika import connect, Message, DeliveryMode
from aio_pika.abc import AbstractIncomingMessage
from loguru import logger
async def on_message(message: AbstractIncomingMessage, queue) -> None:
async with message.process():
await queue.put(json.loads(message.body))
print(f" Message body is: {message.body!r}")
logger.info(f" Message body is: {message.body!r}")
async def get_messages(inner_queue) -> None:
async with await connect("amqp://guest:guest@localhost/") as connection:
async with await connect("amqp://guest:guest@rabbitmq/") as connection:
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
@@ -23,14 +24,13 @@ async def get_messages(inner_queue) -> None:
)
await queue.consume(partial(on_message, queue=inner_queue))
print(" [*] Waiting for messages. To exit press CTRL+C")
logger.info("[*] Waiting for messages. To exit press CTRL+C")
await asyncio.Future()
async def publish_message_with_task_done(task: dict | list) -> None:
queue_name = "tasks_done"
async with await connect("amqp://guest:guest@localhost/") as connection:
async with await connect("amqp://guest:guest@rabbitmq/") as connection:
# Creating channel
channel = await connection.channel()
+5 -1
View File
@@ -9,7 +9,7 @@ class RedisClient:
TASKS_DONE_NAME = "tasks_done"
def __init__(self):
self.connection = redis.Redis(host="localhost", port=6379, db=0)
self.connection = redis.Redis(host="redis_video_downloader", port=6379, db=0)
async def _set_task(self, queue_name: str, link: str, task: dict | list, ) -> int:
async with self.connection as connection:
@@ -46,3 +46,7 @@ class RedisClient:
async with self.connection as connection:
res = await connection.delete(self.TASKS_NAME)
return res
async def update_task_in_tasks_done(self, task: dict | list, link: str) -> int:
await self._del_task(self.TASKS_DONE_NAME, link)
return await self._set_task(self.TASKS_DONE_NAME, link, task)
+66
View File
@@ -0,0 +1,66 @@
from loguru import logger
from minio import Minio
from minio.commonconfig import CopySource
from src.core.config import S3_HOST, S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET_NAME
class S3Client:
HOST = S3_HOST
ACCESS_KEY = S3_ACCESS_KEY
SECRET_KEY = S3_SECRET_KEY
BUCKET_NAME = S3_BUCKET_NAME
def __init__(self):
self.client = Minio(
self.HOST,
access_key=self.ACCESS_KEY,
secret_key=self.SECRET_KEY,
secure=True
)
def _make_sure_bucket_exist(self):
found = self.client.bucket_exists(self.BUCKET_NAME)
if not found:
self.client.make_bucket(self.BUCKET_NAME)
else:
logger.info(f"Bucket {self.BUCKET_NAME} already exists")
def upload(self, file_name: str, file_path: str | list[str]):
self._make_sure_bucket_exist()
if isinstance(file_name, str):
file_path = file_path + file_name
self.client.fput_object(self.BUCKET_NAME, file_name, file_path)
logger.info(f"{file_path} is successfully uploaded as object {file_name} to bucket {self.BUCKET_NAME}.")
link_to_download = self.get(file_name)
return link_to_download
else:
result = []
for file_name_part in file_name:
current_file_path = file_path + file_name_part
self.client.fput_object(self.BUCKET_NAME, file_name_part, current_file_path)
logger.info(f"{current_file_path} is successfully uploaded as object {file_name_part} to bucket {self.BUCKET_NAME}.")
link_to_download = self.get(file_name_part)
result.append(link_to_download)
return result
def get(self, file_name: str):
self._make_sure_bucket_exist()
result = self.client.get_presigned_url(
"GET",
self.BUCKET_NAME,
file_name,
)
return result
def delete(self, file_name: str):
result = self.client.remove_object(self.BUCKET_NAME, file_name)
return result
def copy_to_another_bucket(self, file_name: str, bucket_name):
result = self.client.copy_object(
bucket_name,
file_name,
CopySource(self.BUCKET_NAME, file_name),
)
return result
-32
View File
@@ -1,32 +0,0 @@
from minio import Minio
from minio.error import S3Error
def main():
client = Minio(
"grfc.ru",
access_key="cl-i-oculus-dev1",
secret_key="Nom8qKEU6IYtQSrNt5ZPN1XncQTZdtUM",
secure=True
)
found = client.bucket_exists("clean-internet-oculus-integration-dev")
if not found:
client.make_bucket("clean-internet-oculus-integration-dev")
else:
print("Bucket 'clean-internet-oculus-integration-dev' already exists")
client.fput_object(
"clean-internet-oculus-integration-dev", "4uv2GNc_ybc_1080p.mp4", "/Users/garickbadalov/PycharmProjects/video_downloader_service/downloads/Youtube/4uv2GNc_ybc_1080p.mp4",
)
print(
"'/Users/garickbadalov/PycharmProjects/video_downloader_service/downloads/Youtube/4uv2GNc_ybc_1080p.mp4' is successfully uploaded as "
"object '4uv2GNc_ybc_1080p.mp4' to bucket 'clean-internet-oculus-integration-dev'."
)
if __name__ == "__main__":
try:
main()
except S3Error as exc:
print("error occurred.", exc)
+1 -1
View File
@@ -36,4 +36,4 @@ class DzenParser(BaseParser):
self.params["outtmpl"] = f"downloads/ZenYandex/{title}_%(resolution)s.%(ext)s"
file_path = super().video_download()
self.params["link"] = base_link
return file_path
return file_path.replace("master", title) if "master" in file_path else file_path
+12 -3
View File
@@ -1,8 +1,10 @@
import os
import re
import requests
from bs4 import BeautifulSoup
from lxml import etree
from src.exceptions.download_exceptions import FileAlreadyExistException
from src.parsers.base_parser import BaseParser
@@ -16,9 +18,16 @@ class OkParser(BaseParser):
resp = requests.get(self.params["link"])
resp.encoding = self.BASE_ENCODING
soup = BeautifulSoup(resp.text, 'lxml')
required_div = [div for div in soup.find_all('div', {'class': 'invisible'}) if len(div['class']) < 2][0]
video_tags = required_div.find('span').find_all_next('span', {'itemprop': "video"})
links = [video_tag.find('a').get("href") for video_tag in video_tags]
if "topic" in self.params["link"]:
dom = etree.HTML(str(soup))
elements_with_video_id = dom.xpath(
"//div[@class='mlr_cnt']/div[contains(@data-l, 'gA,VIDEO,mB,movie,ti,')]/div[@class='vid-card "
"vid-card__xl']/div[@class='video-card_n-w']/a[contains(@onclick, 'OK.VideoPlayer.openMovie')]")
links = ["https://ok.ru/video/" + re.findall('\d+', elem.get("onclick"))[0] for elem in elements_with_video_id]
else:
required_div = [div for div in soup.find_all('div', {'class': 'invisible'}) if len(div['class']) < 2][0]
video_tags = required_div.find('span').find_all_next('span', {'itemprop': "video"})
links = [video_tag.find('a').get("href") for video_tag in video_tags]
return links
except Exception as ex:
raise
@@ -1,9 +1,9 @@
api_hash: cb06da2bf01e15627434223242b6446d
api_id: 21648766
chat:
- chat_id: -1001966291562
download_filter: id == 2048
last_read_message_id: 2048
- chat_id: dvachannel
download_filter: id == 125493
last_read_message_id: 125493
file_formats:
video:
- all
@@ -1,3 +1,3 @@
from module.filter import Filter
from src.parsers.Telegram.telegram_media_downloader.module.filter import Filter
Filter()
@@ -11,12 +11,12 @@ from loguru import logger
from pyrogram.types import Audio, Document, Photo, Video, VideoNote, Voice
from rich.logging import RichHandler
from module.app import Application, ChatDownloadConfig, DownloadStatus, TaskNode
from module.bot import start_download_bot, stop_download_bot
from module.download_stat import update_download_status
from module.get_chat_history_v2 import get_chat_history_v2
from module.language import _t
from module.pyrogram_extension import (
from src.parsers.Telegram.telegram_media_downloader.module.app import Application, ChatDownloadConfig, DownloadStatus, TaskNode
from src.parsers.Telegram.telegram_media_downloader.module.bot import start_download_bot, stop_download_bot
from src.parsers.Telegram.telegram_media_downloader.module.download_stat import update_download_status
from src.parsers.Telegram.telegram_media_downloader.module.get_chat_history_v2 import get_chat_history_v2
from src.parsers.Telegram.telegram_media_downloader.module.language import _t
from src.parsers.Telegram.telegram_media_downloader.module.pyrogram_extension import (
fetch_message,
get_extension,
record_download_status,
@@ -25,12 +25,12 @@ from module.pyrogram_extension import (
set_meta_data,
upload_telegram_chat,
)
from module.web import init_web
from utils.format import truncate_filename, validate_title
from utils.log import LogFilter
from utils.meta import print_meta
from utils.meta_data import MetaData
from utils.updates import check_for_updates
from src.parsers.Telegram.telegram_media_downloader.module.web import init_web
from src.parsers.Telegram.telegram_media_downloader.utils.format import truncate_filename, validate_title
from src.parsers.Telegram.telegram_media_downloader.utils.log import LogFilter
from src.parsers.Telegram.telegram_media_downloader.utils.meta import print_meta
from src.parsers.Telegram.telegram_media_downloader.utils.meta_data import MetaData
from src.parsers.Telegram.telegram_media_downloader.utils.updates import check_for_updates
logging.basicConfig(
level=logging.INFO,
@@ -478,6 +478,7 @@ def _check_config() -> bool:
async def worker(client: pyrogram.client.Client):
"""Work for download task"""
# TODO: mb replace with asyncio.Event
while app.is_running:
try:
item = await queue.get()
@@ -566,7 +567,7 @@ async def run_until_all_task_finish():
def _exec_loop():
"""Exec loop"""
# TODO: broken, no loop
if app.bot_token:
app.loop.run_forever()
else:
@@ -591,7 +592,7 @@ def main():
client.start()
logger.success(_t("Successfully started (Press Ctrl+C to stop)"))
# TODO: broken
app.loop.create_task(download_all_chat(client))
for _ in range(app.max_download_task):
task = app.loop.create_task(worker(client))
@@ -10,11 +10,11 @@ from typing import List, Optional, Union
import loguru
from ruamel import yaml
from module.cloud_drive import CloudDrive, CloudDriveConfig
from module.filter import Filter
from module.language import Language, set_language
from utils.format import replace_date_time, validate_title
from utils.meta_data import MetaData
from src.parsers.Telegram.telegram_media_downloader.module.cloud_drive import CloudDrive, CloudDriveConfig
from src.parsers.Telegram.telegram_media_downloader.module.filter import Filter
from src.parsers.Telegram.telegram_media_downloader.module.language import Language, set_language
from src.parsers.Telegram.telegram_media_downloader.utils.format import replace_date_time, validate_title
from src.parsers.Telegram.telegram_media_downloader.utils.meta_data import MetaData
_yaml = yaml.YAML()
# pylint: disable = R0902
@@ -227,9 +227,6 @@ class Application:
self.web_login_secret: str = ""
self.debug_web: bool = False
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.executor = ThreadPoolExecutor(
min(32, (os.cpu_count() or 0) + 4), thread_name_prefix="multi_task"
)
@@ -447,7 +444,8 @@ class Application:
self.cloud_drive_config, self.save_path, local_file_path
)
elif self.cloud_drive_config.upload_adapter == "aligo":
ret = await self.loop.run_in_executor(
loop = asyncio.get_running_loop()
ret = await loop.run_in_executor(
self.executor,
CloudDrive.aligo_upload_file(
self.cloud_drive_config, self.save_path, local_file_path
@@ -11,25 +11,25 @@ from pyrogram import types
from pyrogram.handlers import MessageHandler
from ruamel import yaml
import utils
from module.app import (
from src.parsers.Telegram.telegram_media_downloader import utils
from src.parsers.Telegram.telegram_media_downloader.module.app import (
Application,
ChatDownloadConfig,
ForwardStatus,
TaskNode,
TaskType,
)
from module.filter import Filter
from module.language import Language, _t
from module.pyrogram_extension import (
from src.parsers.Telegram.telegram_media_downloader.module.filter import Filter
from src.parsers.Telegram.telegram_media_downloader.module.language import Language, _t
from src.parsers.Telegram.telegram_media_downloader.module.pyrogram_extension import (
check_user_permission,
get_message_with_retry,
report_bot_forward_status,
report_bot_status,
set_meta_data,
)
from utils.format import extract_info_from_link, replace_date_time, validate_title
from utils.meta_data import MetaData
from src.parsers.Telegram.telegram_media_downloader.utils.format import extract_info_from_link, replace_date_time, validate_title
from src.parsers.Telegram.telegram_media_downloader.utils.meta_data import MetaData
# from pyrogram.types import (ReplyKeyboardMarkup, InlineKeyboardMarkup,
# InlineKeyboardButton)
@@ -232,8 +232,10 @@ class DownloadBot:
pass
# TODO: add admin
# self.bot.set_my_commands(commands, scope=types.BotCommandScopeChatAdministrators(self.app.))
_bot.app.loop.create_task(_bot.update_reply_message())
# TODO: check for correctness
loop = asyncio.get_running_loop()
loop.create_task(_bot.update_reply_message())
# _bot.app.loop.create_task(_bot.update_reply_message())
_bot = DownloadBot()
@@ -8,7 +8,7 @@ from zipfile import ZipFile
from loguru import logger
from utils import platform
from src.parsers.Telegram.telegram_media_downloader.utils import platforms as platform
# pylint: disable = R0902
@@ -6,8 +6,8 @@ from typing import Any, Optional, Tuple
from ply import lex, yacc
from utils.format import get_byte_from_str
from utils.meta_data import MetaData, NoneObj, ReString
from src.parsers.Telegram.telegram_media_downloader.utils.format import get_byte_from_str
from src.parsers.Telegram.telegram_media_downloader.utils.meta_data import MetaData, NoneObj, ReString
# pylint: disable = R0904
@@ -23,11 +23,11 @@ from pyrogram.file_id import (
)
from pyrogram.mime_types import mime_types
from module.app import Application, DownloadStatus, ForwardStatus, TaskNode
from module.download_stat import get_download_result
from module.language import Language, _t
from utils.format import create_progress_bar, format_byte, truncate_filename
from utils.meta_data import MetaData
from src.parsers.Telegram.telegram_media_downloader.module.app import Application, DownloadStatus, ForwardStatus, TaskNode
from src.parsers.Telegram.telegram_media_downloader.module.download_stat import get_download_result
from src.parsers.Telegram.telegram_media_downloader.module.language import Language, _t
from src.parsers.Telegram.telegram_media_downloader.utils.format import create_progress_bar, format_byte, truncate_filename
from src.parsers.Telegram.telegram_media_downloader.utils.meta_data import MetaData
_mimetypes = MimeTypes()
_mimetypes.readfp(StringIO(mime_types))
@@ -6,18 +6,18 @@ import threading
from flask import Flask, jsonify, render_template, request
import utils
from src.parsers.Telegram.telegram_media_downloader import utils
from flask_login import LoginManager, UserMixin, login_required, login_user
from module.app import Application
from module.download_stat import (
from src.parsers.Telegram.telegram_media_downloader.module.app import Application
from src.parsers.Telegram.telegram_media_downloader.module.download_stat import (
DownloadState,
get_download_result,
get_download_state,
get_total_download_speed,
set_download_state,
)
from utils.crypto import AesBase64
from utils.format import format_byte
from src.parsers.Telegram.telegram_media_downloader.utils.crypto import AesBase64
from src.parsers.Telegram.telegram_media_downloader.utils.format import format_byte
log = logging.getLogger("werkzeug")
log.setLevel(logging.ERROR)
@@ -2,7 +2,7 @@ import os
from urllib.parse import urlparse
from loguru import logger
from pyrogram import Client
import pyrogram
from ruamel.yaml import YAML
from src.exceptions.download_exceptions import FileAlreadyExistException
@@ -12,7 +12,7 @@ from src.parsers.base_parser import BaseParser
class TelegramParser(BaseParser):
def video_download(self, client: Client = None):
async def video_download(self):
url_parse_result = urlparse(self.params["link"])
channel, message_id = url_parse_result.path[1:].split('/') if "/c/" not in url_parse_result.path else \
url_parse_result.path[3:].split('/')
@@ -29,9 +29,19 @@ class TelegramParser(BaseParser):
mode="w+", encoding="utf-8") as f:
YAML().dump(config, f)
if _check_config():
app.loop.run_until_complete(download_all_chat(client))
app.loop.run_until_complete(worker(client))
client.stop()
tg_client = pyrogram.Client(
"media_downloader",
api_id=app.api_id,
api_hash=app.api_hash,
proxy=app.proxy,
workdir=app.session_file_path,
)
app.pre_run()
app.is_running = True
await tg_client.start()
await download_all_chat(tg_client)
await worker(tg_client)
await tg_client.stop()
app.is_running = False
logger.info("Stopped!")
return f"Telegram/{message_id}.mp4"
+9 -5
View File
@@ -2,7 +2,9 @@ import errno
import os
from loguru import logger
from yt_dlp import download_range_func
from src.core.config import DEFAULT_DURATION
from src.core.ydl import VideoDownloader
from src.exceptions.download_exceptions import FileAlreadyExistException
@@ -20,6 +22,7 @@ class BaseParser:
"logger": logger,
"merge_output_format": self.params["merge_output_format"],
'outtmpl': self.params["outtmpl"],
'download_ranges': download_range_func(None, [(0, int(DEFAULT_DURATION))]),
# "quiet": True
}
downloader = VideoDownloader(link=self.params["link"], ydl_opts=ydl_opts)
@@ -29,18 +32,19 @@ class BaseParser:
resolution = downloader.info['resolution']
else:
resolution = "NA"
base_file_name = f"{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
if "Yahoo" in ydl_opts["outtmpl"]["default"]:
path_to_video = f"Yahoo/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
path_to_video = f"Yahoo/{base_file_name}"
elif "ZenYandex" in ydl_opts["outtmpl"]["default"]:
path_to_video = f"ZenYandex/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
path_to_video = f"ZenYandex/{base_file_name}"
elif "Bing" in ydl_opts["outtmpl"]["default"]:
path_to_video = f"Bing/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
path_to_video = f"Bing/{base_file_name}"
else:
path_to_video = f"{downloader.info['extractor_key']}/{downloader.info['id']}_{resolution}.{downloader.info['ext']}"
path_to_video = f"{downloader.info['extractor_key']}/{base_file_name}"
if os.path.exists(os.path.join(os.getcwd() + "/downloads/" + path_to_video)):
raise FileAlreadyExistException(message=path_to_video)
downloader.ydl_opts["quiet"] = False
downloader.ydl_opts["quiet"] = False
downloader.download()
return path_to_video
+43 -19
View File
@@ -6,17 +6,17 @@ import logging
from aio_pika import connect, Message, DeliveryMode
from fastapi import FastAPI, Request, Depends
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import JSONResponse, FileResponse, StreamingResponse
from starlette.responses import JSONResponse, FileResponse, Response
from starlette.templating import Jinja2Templates
from src.core.redis_client import RedisClient
from src.web.schemes.submit import SubmitIn, CheckIn
from src.core.s3_client import S3Client
from src.web.schemes.submit import SubmitIn, CheckIn, DeleteFromS3, CopyToAnotherBucketS3
app = FastAPI(
title="video_downloader", openapi_url=f"/api/v1/openapi.json"
)
templates = Jinja2Templates(directory="templates")
templates = Jinja2Templates(directory=f"{os.path.dirname(os.path.dirname(os.path.abspath(__file__)))}/web/templates")
app.add_middleware(
CORSMiddleware,
@@ -82,6 +82,7 @@ async def index(request: Request):
@app.post('/submit')
async def get_url_for_download_video(request: Request, data: SubmitIn = Depends()):
red = RedisClient()
s3_client = S3Client()
task_done = await is_task_already_done_or_exist(red, data.link)
# TODO: где-то не обновился статус после выполнения\провала задачи
task_in_process = await is_task_in_process(red, data.link)
@@ -89,13 +90,19 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
return JSONResponse(status_code=202, content={"result": "Задача в работе. Ожидайте"})
if task_done:
if isinstance(task_done["result"], str):
links_to_download_video = [str(request.base_url) + "get/?file_path=" + task_done["result"]]
file_name = task_done["result"][task_done["result"].index("dev/") + 4:task_done["result"].index("?")]
links_to_download_video = [s3_client.get(file_name)]
task_done["result"] = links_to_download_video[0]
else:
links_to_download_video = [str(request.base_url) + "get/?file_path=" + path for path in task_done["result"]]
file_names = [task_done_part[task_done_part.index("dev/") + 4:task_done_part.index("?")] for
task_done_part in task_done["result"]]
links_to_download_video = [s3_client.get(file_name) for file_name in file_names]
task_done["result"] = links_to_download_video
await red.update_task_in_tasks_done(task_done, task_done["link"])
return JSONResponse({"result": links_to_download_video})
# TODO: учесть, что если делать запрос CURL\urllib3\etc, в теле может быть несколько ссылок -> должно быть создано несколько задач
async with await connect("amqp://guest:guest@localhost/") as connection:
async with await connect("amqp://guest:guest@rabbitmq/") as connection:
# Creating a channel
channel = await connection.channel()
body = [
@@ -104,6 +111,7 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
"format": f"bv[width={data.resolution.value}][ext={data.video_format.value}]+ba[ext={data.audio_format.value}]/"
f"bv[width={data.resolution.value}][ext=mp4]+ba[ext=m4a]/"
f"bv[width={data.resolution.value}][ext=webm]+ba[ext=webm]/"
f"best[width={data.resolution.value}]/"
f"best[ext={data.video_format.value}]/"
f"best[ext!=unknown_video]",
"merge_output_format": data.merge_output_format.value,
@@ -132,15 +140,10 @@ async def get_url_for_download_video(request: Request, data: SubmitIn = Depends(
@app.get('/get/', response_class=FileResponse, status_code=200)
async def download_video(file_path):
base = os.path.dirname(os.path.dirname(os.path.abspath(file_path)))
base_download_dir = os.path.join(base, os.pardir, os.pardir, "downloads")
def iterfile():
with open(base_download_dir + f'/{file_path}', mode="rb") as file_like:
yield from file_like
return StreamingResponse(iterfile(), headers={'Content-Disposition': f'inline; filename="{file_path}"'},
media_type="video")
s3_client = S3Client()
file_response = s3_client.get(file_path)
return Response(content=file_response.data, headers={'Content-Disposition': f'inline; filename="{file_path}"'},
media_type="video")
@app.post('/check/', response_class=FileResponse, status_code=200)
@@ -168,9 +171,9 @@ async def download_video(data: CheckIn, request: Request):
content={"result": f"Задача выполнена с ошибкой, попробуйте загрузить еще раз"})
if tasks_done and data.link in tasks_done:
if isinstance(tasks_done[data.link]["result"], str):
links_to_download_video = [str(request.base_url) + "get/?file_path=" + tasks_done[data.link]["result"]]
links_to_download_video = [tasks_done[data.link]["result"]]
else:
links_to_download_video = [str(request.base_url) + "get/?file_path=" + path for path in
links_to_download_video = [link for link in
tasks_done[data.link]["result"]]
return JSONResponse({"result": links_to_download_video})
return JSONResponse(status_code=404, content={"result": "Задача не найдена"})
@@ -180,4 +183,25 @@ async def download_video(data: CheckIn, request: Request):
except Exception as ex:
print(ex)
uvicorn.run("src.web.main:app", host="0.0.0.0", log_level="info")
@app.delete('/from-s3/', status_code=200)
async def delete_video_from_s3(delete_data: DeleteFromS3):
s3_client = S3Client()
s3_client.delete(delete_data.file_name)
return JSONResponse(
status_code=200,
content={"result": f"Файл {delete_data.file_name} успешно удален из корзины {s3_client.BUCKET_NAME}"}
)
@app.post('/copy-to-another-bucket/', status_code=200)
async def delete_video_from_s3(data: CopyToAnotherBucketS3):
s3_client = S3Client()
s3_client.copy_to_another_bucket(data.file_name, data.bucket_name)
return JSONResponse(
status_code=200,
content={"result": f"Файл {data.file_name} успешно скопирован в корзину {data.bucket_name}"}
)
uvicorn.run("src.web.main:app", host="0.0.0.0", port=8000, log_level="info")
+12 -3
View File
@@ -50,11 +50,20 @@ class MergeOutputFormatEnum(Enum):
@dataclass
class SubmitIn:
link: str = Form(...)
video_format: VideoFormatEnum = Form(default=MergeOutputFormatEnum.format_webm)
audio_format: AudioFormatEnum = Form(default=AudioFormatEnum.format_webm)
resolution: ResolutionEnum = Form(default=ResolutionEnum.resolution_1080)
video_format: VideoFormatEnum = Form(default=MergeOutputFormatEnum.format_mp4)
audio_format: AudioFormatEnum = Form(default=AudioFormatEnum.format_m4a)
resolution: ResolutionEnum = Form(default=ResolutionEnum.resolution_720)
merge_output_format: MergeOutputFormatEnum = Form(default=MergeOutputFormatEnum.format_mkv)
class CheckIn(BaseModel):
link: str
class DeleteFromS3(BaseModel):
file_name: str
class CopyToAnotherBucketS3(BaseModel):
file_name: str
bucket_name: str
+50 -5
View File
@@ -72,10 +72,51 @@
<body>
<form method="post" action="/submit" id="download">
<input type="text" name="link" id="link" placeholder="link">
<input type="text" name="video_format" placeholder="video_format">
<input type="text" name="audio_format" placeholder="audio_format">
<input type="text" name="resolution" placeholder="resolution">
<input type="text" name="merge_output_format" placeholder="merge_output_format">
<p>Формат видео
<select id="video_format" name="video_format">
<option hidden>Выбор формата видео</option>
<option value="3gp">3gp</option>
<option value="flv">flv</option>
<option value="mp4" selected>mp4</option>
<option value="mov">mov</option>
<option value="webm">webm</option>
</select >
</p>
<p>Формат аудио
<select id="audio_format" name="audio_format">
<option hidden>Выбор формата аудио</option>
<option value="mp3">mp3</option>
<option value="ogg">ogg</option>
<option value="m4a" selected>m4a</option>
<option value="opus">opus</option>
<option value="webm">webm</option>
<option value="wav">wav</option>
<option value="aac">aac</option>
</select >
</p>
<p>Качество видео
<select id="resolution" name="resolution">
<option hidden>Выбор формата аудио</option>
<option value="240">240</option>
<option value="360">360</option>
<option value="480" >480</option>
<option value="720" selected>720</option>
<option value="1080">1080</option>
<option value="2048">2048</option>
<option value="3840">3840</option>
</select >
</p>
<p>Выходной формат видео
<select id="merge_output_format" name="merge_output_format">
<option hidden>Выбор формата аудио</option>
<option value="avi">avi</option>
<option value="flv">flv</option>
<option value="mp4">mp4</option>
<option value="mkv" selected>mkv</option>
<option value="mov">mov</option>
<option value="webm">webm</option>
</select>
</p>
<button type="submit" class="custom-btn btn-1"><span class="submit-spinner submit-spinner_hide"></span> Download</button>
</form>
<div id="linksList" class="col">
@@ -170,7 +211,11 @@
document.forms.download.querySelector('.submit-spinner').classList.add('submit-spinner_hide');
console.log(xhr.status);
if (xhr.status !== 200 && xhr.status !== 201) {
return;
if ('response' in xhr && xhr.response !== null) {
sendReq()
} else {
return;
};
};
const response = xhr.response;
+13
View File
@@ -0,0 +1,13 @@
FROM python:3.11.4
WORKDIR /app
COPY poetry.lock pyproject.toml /app/
RUN pip install poetry
RUN poetry install --no-root
COPY .. /app
CMD poetry run python main_web.py