Compare commits

..

2 Commits

10 changed files with 331 additions and 73 deletions

19
docker-compose.yml Normal file
View File

@ -0,0 +1,19 @@
version: "2.1"
services:
rabbitmq:
image: rabbitmq:3.10.7-management
hostname: rabbitmq
restart: always
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
volumes:
- ./rabbitmq:/var/lib/rabbitmq
ports:
- 15672:15672
- 5672:5672
redis:
container_name: redis_video_downloader
image: redis:latest
ports:
- "6379:6379"

8
main.py Normal file
View File

@ -0,0 +1,8 @@
from multiprocessing import freeze_support
from src.core.master_service import MasterService
if __name__ == '__main__':
freeze_support()
ms = MasterService()
ms.loop.run_until_complete(ms.run())

115
poetry.lock generated
View File

@ -61,6 +61,17 @@ doc = ["Sphinx", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd-
test = ["anyio[trio]", "coverage[toml] (>=4.5)", "hypothesis (>=4.0)", "mock (>=4)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"] test = ["anyio[trio]", "coverage[toml] (>=4.5)", "hypothesis (>=4.0)", "mock (>=4)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"]
trio = ["trio (<0.22)"] trio = ["trio (<0.22)"]
[[package]]
name = "async-timeout"
version = "4.0.3"
description = "Timeout context manager for asyncio programs"
optional = false
python-versions = ">=3.7"
files = [
{file = "async-timeout-4.0.3.tar.gz", hash = "sha256:4640d96be84d82d02ed59ea2b7105a0f7b33abe8703703cd0ab0bf87c427522f"},
{file = "async_timeout-4.0.3-py3-none-any.whl", hash = "sha256:7405140ff1230c310e51dc27b3145b9092d659ce68ff733fb0cefe3ee42be028"},
]
[[package]] [[package]]
name = "click" name = "click"
version = "8.1.6" version = "8.1.6"
@ -463,6 +474,108 @@ files = [
[package.extras] [package.extras]
dev = ["atomicwrites (==1.2.1)", "attrs (==19.2.0)", "coverage (==6.5.0)", "hatch", "invoke (==1.7.3)", "more-itertools (==4.3.0)", "pbr (==4.3.0)", "pluggy (==1.0.0)", "py (==1.11.0)", "pytest (==7.2.0)", "pytest-cov (==4.0.0)", "pytest-timeout (==2.1.0)", "pyyaml (==5.1)"] dev = ["atomicwrites (==1.2.1)", "attrs (==19.2.0)", "coverage (==6.5.0)", "hatch", "invoke (==1.7.3)", "more-itertools (==4.3.0)", "pbr (==4.3.0)", "pluggy (==1.0.0)", "py (==1.11.0)", "pytest (==7.2.0)", "pytest-cov (==4.0.0)", "pytest-timeout (==2.1.0)", "pyyaml (==5.1)"]
[[package]]
name = "redis"
version = "5.0.0"
description = "Python client for Redis database and key-value store"
optional = false
python-versions = ">=3.7"
files = [
{file = "redis-5.0.0-py3-none-any.whl", hash = "sha256:06570d0b2d84d46c21defc550afbaada381af82f5b83e5b3777600e05d8e2ed0"},
{file = "redis-5.0.0.tar.gz", hash = "sha256:5cea6c0d335c9a7332a460ed8729ceabb4d0c489c7285b0a86dbbf8a017bd120"},
]
[package.dependencies]
async-timeout = {version = ">=4.0.2", markers = "python_full_version <= \"3.11.2\""}
[package.extras]
hiredis = ["hiredis (>=1.0.0)"]
ocsp = ["cryptography (>=36.0.1)", "pyopenssl (==20.0.1)", "requests (>=2.26.0)"]
[[package]]
name = "setproctitle"
version = "1.3.2"
description = "A Python module to customize the process title"
optional = false
python-versions = ">=3.7"
files = [
{file = "setproctitle-1.3.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:288943dec88e178bb2fd868adf491197cc0fc8b6810416b1c6775e686bab87fe"},
{file = "setproctitle-1.3.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:630f6fe5e24a619ccf970c78e084319ee8be5be253ecc9b5b216b0f474f5ef18"},
{file = "setproctitle-1.3.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6c877691b90026670e5a70adfbcc735460a9f4c274d35ec5e8a43ce3f8443005"},
{file = "setproctitle-1.3.2-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:7a55fe05f15c10e8c705038777656fe45e3bd676d49ad9ac8370b75c66dd7cd7"},
{file = "setproctitle-1.3.2-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ab45146c71ca6592c9cc8b354a2cc9cc4843c33efcbe1d245d7d37ce9696552d"},
{file = "setproctitle-1.3.2-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e00c9d5c541a2713ba0e657e0303bf96ddddc412ef4761676adc35df35d7c246"},
{file = "setproctitle-1.3.2-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:265ecbe2c6eafe82e104f994ddd7c811520acdd0647b73f65c24f51374cf9494"},
{file = "setproctitle-1.3.2-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:c2c46200656280a064073447ebd363937562debef329482fd7e570c8d498f806"},
{file = "setproctitle-1.3.2-cp310-cp310-musllinux_1_1_ppc64le.whl", hash = "sha256:fa2f50678f04fda7a75d0fe5dd02bbdd3b13cbe6ed4cf626e4472a7ccf47ae94"},
{file = "setproctitle-1.3.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:7f2719a398e1a2c01c2a63bf30377a34d0b6ef61946ab9cf4d550733af8f1ef1"},
{file = "setproctitle-1.3.2-cp310-cp310-win32.whl", hash = "sha256:e425be62524dc0c593985da794ee73eb8a17abb10fe692ee43bb39e201d7a099"},
{file = "setproctitle-1.3.2-cp310-cp310-win_amd64.whl", hash = "sha256:e85e50b9c67854f89635a86247412f3ad66b132a4d8534ac017547197c88f27d"},
{file = "setproctitle-1.3.2-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:2a97d51c17d438cf5be284775a322d57b7ca9505bb7e118c28b1824ecaf8aeaa"},
{file = "setproctitle-1.3.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:587c7d6780109fbd8a627758063d08ab0421377c0853780e5c356873cdf0f077"},
{file = "setproctitle-1.3.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d7d17c8bd073cbf8d141993db45145a70b307385b69171d6b54bcf23e5d644de"},
{file = "setproctitle-1.3.2-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e932089c35a396dc31a5a1fc49889dd559548d14cb2237adae260382a090382e"},
{file = "setproctitle-1.3.2-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8e4f8f12258a8739c565292a551c3db62cca4ed4f6b6126664e2381acb4931bf"},
{file = "setproctitle-1.3.2-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:570d255fd99c7f14d8f91363c3ea96bd54f8742275796bca67e1414aeca7d8c3"},
{file = "setproctitle-1.3.2-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:a8e0881568c5e6beff91ef73c0ec8ac2a9d3ecc9edd6bd83c31ca34f770910c4"},
{file = "setproctitle-1.3.2-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:4bba3be4c1fabf170595b71f3af46c6d482fbe7d9e0563999b49999a31876f77"},
{file = "setproctitle-1.3.2-cp311-cp311-musllinux_1_1_ppc64le.whl", hash = "sha256:37ece938110cab2bb3957e3910af8152ca15f2b6efdf4f2612e3f6b7e5459b80"},
{file = "setproctitle-1.3.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:db684d6bbb735a80bcbc3737856385b55d53f8a44ce9b46e9a5682c5133a9bf7"},
{file = "setproctitle-1.3.2-cp311-cp311-win32.whl", hash = "sha256:ca58cd260ea02759238d994cfae844fc8b1e206c684beb8f38877dcab8451dfc"},
{file = "setproctitle-1.3.2-cp311-cp311-win_amd64.whl", hash = "sha256:88486e6cce2a18a033013d17b30a594f1c5cb42520c49c19e6ade40b864bb7ff"},
{file = "setproctitle-1.3.2-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:92c626edc66169a1b09e9541b9c0c9f10488447d8a2b1d87c8f0672e771bc927"},
{file = "setproctitle-1.3.2-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:710e16fa3bade3b026907e4a5e841124983620046166f355bbb84be364bf2a02"},
{file = "setproctitle-1.3.2-cp37-cp37m-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:1f29b75e86260b0ab59adb12661ef9f113d2f93a59951373eb6d68a852b13e83"},
{file = "setproctitle-1.3.2-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1c8d9650154afaa86a44ff195b7b10d683c73509d085339d174e394a22cccbb9"},
{file = "setproctitle-1.3.2-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f0452282258dfcc01697026a8841258dd2057c4438b43914b611bccbcd048f10"},
{file = "setproctitle-1.3.2-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:e49ae693306d7624015f31cb3e82708916759d592c2e5f72a35c8f4cc8aef258"},
{file = "setproctitle-1.3.2-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:1ff863a20d1ff6ba2c24e22436a3daa3cd80be1dfb26891aae73f61b54b04aca"},
{file = "setproctitle-1.3.2-cp37-cp37m-musllinux_1_1_ppc64le.whl", hash = "sha256:55ce1e9925ce1765865442ede9dca0ba9bde10593fcd570b1f0fa25d3ec6b31c"},
{file = "setproctitle-1.3.2-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:7fe9df7aeb8c64db6c34fc3b13271a363475d77bc157d3f00275a53910cb1989"},
{file = "setproctitle-1.3.2-cp37-cp37m-win32.whl", hash = "sha256:e5c50e164cd2459bc5137c15288a9ef57160fd5cbf293265ea3c45efe7870865"},
{file = "setproctitle-1.3.2-cp37-cp37m-win_amd64.whl", hash = "sha256:a499fff50387c1520c085a07578a000123f519e5f3eee61dd68e1d301659651f"},
{file = "setproctitle-1.3.2-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:5b932c3041aa924163f4aab970c2f0e6b4d9d773f4d50326e0ea1cd69240e5c5"},
{file = "setproctitle-1.3.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:f4bfc89bd33ebb8e4c0e9846a09b1f5a4a86f5cb7a317e75cc42fee1131b4f4f"},
{file = "setproctitle-1.3.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fcd3cf4286a60fdc95451d8d14e0389a6b4f5cebe02c7f2609325eb016535963"},
{file = "setproctitle-1.3.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:5fb4f769c02f63fac90989711a3fee83919f47ae9afd4758ced5d86596318c65"},
{file = "setproctitle-1.3.2-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5194b4969f82ea842a4f6af2f82cd16ebdc3f1771fb2771796e6add9835c1973"},
{file = "setproctitle-1.3.2-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1f0cde41857a644b7353a0060b5f94f7ba7cf593ebde5a1094da1be581ac9a31"},
{file = "setproctitle-1.3.2-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:9124bedd8006b0e04d4e8a71a0945da9b67e7a4ab88fdad7b1440dc5b6122c42"},
{file = "setproctitle-1.3.2-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:c8a09d570b39517de10ee5b718730e171251ce63bbb890c430c725c8c53d4484"},
{file = "setproctitle-1.3.2-cp38-cp38-musllinux_1_1_ppc64le.whl", hash = "sha256:8ff3c8cb26afaed25e8bca7b9dd0c1e36de71f35a3a0706b5c0d5172587a3827"},
{file = "setproctitle-1.3.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:589be87172b238f839e19f146b9ea47c71e413e951ef0dc6db4218ddacf3c202"},
{file = "setproctitle-1.3.2-cp38-cp38-win32.whl", hash = "sha256:4749a2b0c9ac52f864d13cee94546606f92b981b50e46226f7f830a56a9dc8e1"},
{file = "setproctitle-1.3.2-cp38-cp38-win_amd64.whl", hash = "sha256:e43f315c68aa61cbdef522a2272c5a5b9b8fd03c301d3167b5e1343ef50c676c"},
{file = "setproctitle-1.3.2-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:de3a540cd1817ede31f530d20e6a4935bbc1b145fd8f8cf393903b1e02f1ae76"},
{file = "setproctitle-1.3.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:4058564195b975ddc3f0462375c533cce310ccdd41b80ac9aed641c296c3eff4"},
{file = "setproctitle-1.3.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1c5d5dad7c28bdd1ec4187d818e43796f58a845aa892bb4481587010dc4d362b"},
{file = "setproctitle-1.3.2-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:ffc61a388a5834a97953d6444a2888c24a05f2e333f9ed49f977a87bb1ad4761"},
{file = "setproctitle-1.3.2-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1fa1a0fbee72b47dc339c87c890d3c03a72ea65c061ade3204f285582f2da30f"},
{file = "setproctitle-1.3.2-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fe8a988c7220c002c45347430993830666e55bc350179d91fcee0feafe64e1d4"},
{file = "setproctitle-1.3.2-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:bae283e85fc084b18ffeb92e061ff7ac5af9e183c9d1345c93e178c3e5069cbe"},
{file = "setproctitle-1.3.2-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:fed18e44711c5af4b681c2b3b18f85e6f0f1b2370a28854c645d636d5305ccd8"},
{file = "setproctitle-1.3.2-cp39-cp39-musllinux_1_1_ppc64le.whl", hash = "sha256:b34baef93bfb20a8ecb930e395ccd2ae3268050d8cf4fe187de5e2bd806fd796"},
{file = "setproctitle-1.3.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:7f0bed90a216ef28b9d227d8d73e28a8c9b88c0f48a082d13ab3fa83c581488f"},
{file = "setproctitle-1.3.2-cp39-cp39-win32.whl", hash = "sha256:4d8938249a7cea45ab7e1e48b77685d0f2bab1ebfa9dde23e94ab97968996a7c"},
{file = "setproctitle-1.3.2-cp39-cp39-win_amd64.whl", hash = "sha256:a47d97a75fd2d10c37410b180f67a5835cb1d8fdea2648fd7f359d4277f180b9"},
{file = "setproctitle-1.3.2-pp37-pypy37_pp73-macosx_10_9_x86_64.whl", hash = "sha256:dad42e676c5261eb50fdb16bdf3e2771cf8f99a79ef69ba88729aeb3472d8575"},
{file = "setproctitle-1.3.2-pp37-pypy37_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c91b9bc8985d00239f7dc08a49927a7ca1ca8a6af2c3890feec3ed9665b6f91e"},
{file = "setproctitle-1.3.2-pp37-pypy37_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e8579a43eafd246e285eb3a5b939e7158073d5087aacdd2308f23200eac2458b"},
{file = "setproctitle-1.3.2-pp37-pypy37_pp73-win_amd64.whl", hash = "sha256:2fbd8187948284293f43533c150cd69a0e4192c83c377da837dbcd29f6b83084"},
{file = "setproctitle-1.3.2-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:faec934cfe5fd6ac1151c02e67156c3f526e82f96b24d550b5d51efa4a5527c6"},
{file = "setproctitle-1.3.2-pp38-pypy38_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e1aafc91cbdacc9e5fe712c52077369168e6b6c346f3a9d51bf600b53eae56bb"},
{file = "setproctitle-1.3.2-pp38-pypy38_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b617f12c9be61e8f4b2857be4a4319754756845dbbbd9c3718f468bbb1e17bcb"},
{file = "setproctitle-1.3.2-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:b2c9cb2705fc84cb8798f1ba74194f4c080aaef19d9dae843591c09b97678e98"},
{file = "setproctitle-1.3.2-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:a149a5f7f2c5a065d4e63cb0d7a4b6d3b66e6e80f12e3f8827c4f63974cbf122"},
{file = "setproctitle-1.3.2-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2e3ac25bfc4a0f29d2409650c7532d5ddfdbf29f16f8a256fc31c47d0dc05172"},
{file = "setproctitle-1.3.2-pp39-pypy39_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:65d884e22037b23fa25b2baf1a3316602ed5c5971eb3e9d771a38c3a69ce6e13"},
{file = "setproctitle-1.3.2-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:7aa0aac1711fadffc1d51e9d00a3bea61f68443d6ac0241a224e4d622489d665"},
{file = "setproctitle-1.3.2.tar.gz", hash = "sha256:b9fb97907c830d260fa0658ed58afd48a86b2b88aac521135c352ff7fd3477fd"},
]
[package.extras]
test = ["pytest"]
[[package]] [[package]]
name = "sniffio" name = "sniffio"
version = "1.3.0" version = "1.3.0"
@ -625,4 +738,4 @@ resolved_reference = "86e3cf5e5849aefcc540c19bb5fa5ab7f470d1c1"
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.11" python-versions = "^3.11"
content-hash = "705938d942d3a13faa2f467967fa4f4ebd04d5ee0d58927ea2347360aa9e9981" content-hash = "efcb5800a420dd066a767d8a7a2765c047e58a76e3472b6cf11dd2e64abb6c8d"

View File

@ -14,6 +14,8 @@ jinja2 = "^3.1.2"
python-multipart = "^0.0.6" python-multipart = "^0.0.6"
pika = "^1.3.2" pika = "^1.3.2"
aio-pika = "^9.2.2" aio-pika = "^9.2.2"
setproctitle = "^1.3.2"
redis = "^5.0.0"
[build-system] [build-system]

12
src/core/async_queue.py Normal file
View File

@ -0,0 +1,12 @@
import asyncio
from src.core.redis_client import RedisClient
redis = RedisClient()
class AsyncQueue(asyncio.Queue):
async def put(self, item):
await redis.set_task_to_queue(item)
return await super().put(item)

View File

@ -1,85 +1,93 @@
import asyncio import asyncio
import json
import concurrent.futures as pool import concurrent.futures as pool
import multiprocessing import os.path
import subprocess
from functools import partial from functools import partial
from multiprocessing import freeze_support
from collections import deque
from fastapi import HTTPException from fastapi import HTTPException
from aio_pika import connect
from aio_pika.abc import AbstractIncomingMessage
from src.core.send import body from src.core.async_queue import AsyncQueue
from src.core.rabbitmq import get_messages
from src.core.redis_client import RedisClient
from src.core.result import Result, ResultTypeEnum
from src.core.ydl import VideoDownloader from src.core.ydl import VideoDownloader
from src.exceptions.download_exceptions import SiteNotImplementedException from src.exceptions.download_exceptions import SiteNotImplementedException
async def on_message(message: AbstractIncomingMessage, queue) -> None:
async with message.process():
# print(f" [x] Received message {message!r}")
await queue.put(json.loads(message.body))
print(f" Message body is: {message.body!r}")
async def get_messages(inner_queue) -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
# Creating a channel
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
# Declaring queue
queue = await channel.declare_queue(
"hello",
durable=True,
)
# Start listening the queue with name 'task_queue'
await queue.consume(partial(on_message, queue=inner_queue))
print(" [*] Waiting for messages. To exit press CTRL+C")
await asyncio.Future()
def get_url_for_download_video(data: str):
file_name = VideoDownloader.get_unique_video_filename()
ydl_opts = {
"format": data["format"],
"merge_output_format": data["merge_output_format"],
'outtmpl': data["outtmpl"],
}
downloader = VideoDownloader(link=data["link"], ydl_opts=ydl_opts)
try:
result = downloader.download()
except SiteNotImplementedException as ex:
raise HTTPException(
status_code=400,
detail=ex.message
)
async def create_workers(queue):
link = await queue.get()
get_url_for_download_video(link)
class MasterService: class MasterService:
def __init__(self): def __init__(self):
self.loop = asyncio.get_event_loop()
self.MAX_EXECUTOR_WORKERS = 8 self.MAX_EXECUTOR_WORKERS = 8
self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS) self.executor = pool.ProcessPoolExecutor(max_workers=self.MAX_EXECUTOR_WORKERS,
self.queue = asyncio.Queue() initializer=executor_initializer)
self.queue = AsyncQueue()
self.rabbit_consumer = get_messages self.rabbit_consumer = get_messages
self.currently_underway = {} # contains currently in progress videos
async def run(self): async def run(self):
loop = asyncio.get_event_loop() subprocess.run(
tasks = [loop.run_in_executor(self.executor, create_workers, self.queue) for i in range(self.MAX_EXECUTOR_WORKERS)] "for pid in $(ps -ef | grep video_downloader_executor_process | awk '{print $2}'); do kill -9 $pid; done",
shell=True, capture_output=True
)
tasks = [self.loop.create_task(self.create_workers()) for i in range(self.MAX_EXECUTOR_WORKERS + 1)]
await asyncio.gather(self.rabbit_consumer(self.queue), *tasks) await asyncio.gather(self.rabbit_consumer(self.queue), *tasks)
async def create_workers(self):
while True:
video_params = await self.queue.get()
redis = RedisClient()
await redis.del_task_from_queue_and_add_to_tasks(task=video_params)
self.currently_underway[video_params['link']] = video_params
download_task = self.loop.run_in_executor(self.executor, partial(
MasterService.video_processing_executor, video_params=video_params
)
)
result = await download_task
await redis.del_task_from_tasks_and_add_to_task_done(task=video_params)
# TODO process result
self.queue.task_done()
@staticmethod
def video_download(video_params: dict):
ydl_opts = {
"format": video_params["format"],
"merge_output_format": video_params["merge_output_format"],
'outtmpl': video_params["outtmpl"],
"quiet": True
}
downloader = VideoDownloader(link=video_params["link"], ydl_opts=ydl_opts)
video_info = downloader.get_info()
if os.path.exists(
os.path.join(os.getcwd() + f"Youtube/{video_info['id']}_{video_info['width']}.{video_info['ext']}")
):
return Result(result_type=ResultTypeEnum.EXIST)
try:
downloader.ydl_opts["quiet"] = False
result = downloader.download()
return result
except SiteNotImplementedException as ex:
raise HTTPException(
status_code=400,
detail=ex.message
)
@staticmethod
def video_processing_executor(video_params: dict):
try:
MasterService.video_download(video_params=video_params)
return Result(result_type=ResultTypeEnum.DONE)
except Exception as ex:
return Result(result_type=ResultTypeEnum.EXCEPTION, value=ex)
# TODO upload to server
def executor_initializer():
import setproctitle
setproctitle.setproctitle(f'video_downloader_executor_process')
return True
if __name__ == '__main__':
freeze_support()
ms = MasterService()
asyncio.run(ms.run())

28
src/core/rabbitmq.py Normal file
View File

@ -0,0 +1,28 @@
import asyncio
import json
from functools import partial
from aio_pika import connect
from aio_pika.abc import AbstractIncomingMessage
async def on_message(message: AbstractIncomingMessage, queue) -> None:
async with message.process():
await queue.put(json.loads(message.body))
print(f" Message body is: {message.body!r}")
async def get_messages(inner_queue) -> None:
async with await connect("amqp://guest:guest@localhost/") as connection:
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue(
"hello",
durable=True,
)
await queue.consume(partial(on_message, queue=inner_queue))
print(" [*] Waiting for messages. To exit press CTRL+C")
await asyncio.Future()

55
src/core/redis_client.py Normal file
View File

@ -0,0 +1,55 @@
import json
import redis.asyncio as redis
class RedisClient:
SET_NAME = "queue"
TASKS_NAME = "tasks_working"
TASKS_DONE_NAME = "tasks_done"
def __init__(self):
self.connection = redis.Redis(host="localhost", port=6379, db=0)
async def _set_task(self, task: dict) -> int:
async with self.connection as connection:
res = await connection.set(f'{self.TASKS_NAME}:{task["link"]}', json.dumps(task, indent=4).encode('utf-8'))
return res
async def _set_task_done(self, task: dict) -> int:
async with self.connection as connection:
res = await connection.set(
f'{self.TASKS_DONE_NAME}:1:{task["link"]}',
json.dumps(task, indent=4).encode('utf-8')
)
return res
async def _del_task(self, task: dict) -> int:
async with self.connection as connection:
res = await connection.delete(f'{self.TASKS_NAME}:{task["link"]}')
return res
async def set_task_to_queue(self, task: dict) -> int:
async with self.connection as connection:
res = await connection.sadd(self.SET_NAME, json.dumps(task, indent=4).encode('utf-8'))
return res
async def get_queue(self) -> set:
async with self.connection as connection:
res = await connection.smembers(self.SET_NAME)
return res
async def del_task_from_queue(self, task: dict) -> int:
async with self.connection as connection:
res = await connection.srem(self.SET_NAME, json.dumps(task, indent=4).encode('utf-8'))
return res
async def del_task_from_queue_and_add_to_tasks(self, task: dict) -> int:
await self.del_task_from_queue(task)
return await self._set_task(task)
async def del_task_from_tasks_and_add_to_task_done(self, task: dict) -> int:
await self._del_task(task)
return await self._set_task_done(task)

16
src/core/result.py Normal file
View File

@ -0,0 +1,16 @@
from enum import Enum
class ResultTypeEnum(Enum):
EXCEPTION = "Error"
DONE = "Done"
EXIST = "Exist"
class Result:
def __init__(self, result_type: ResultTypeEnum, value: Exception | bool = None):
self.result_type = result_type
self.value = value
def __repr__(self):
return f'Result: {self.result_type.value}. Traceback: {self.value if self.value else None}'

View File

@ -24,12 +24,9 @@ class VideoDownloader:
self.username = username self.username = username
self.password = password self.password = password
@staticmethod def get_info(self):
def get_unique_video_filename(): with youtube_dl.YoutubeDL(self.ydl_opts if self.ydl_opts else {}) as ydl:
prefix = f'vid_{datetime.now().strftime("%Y%m%d%H%M%S")}' return ydl.extract_info(self.link, download=False)
random_uuid4 = uuid.uuid4().hex[:8]
filename = f"{prefix}_{random_uuid4}"
return filename
def download(self): def download(self):
domain = urlparse(self.link).netloc domain = urlparse(self.link).netloc