Третий коммит, добавление share, share_kb, а также ADMIN_ID

This commit is contained in:
2025-07-22 13:50:14 +03:00
parent 849feb7beb
commit b98123f4dc
1479 changed files with 323549 additions and 11 deletions

View File

@@ -0,0 +1,34 @@
import hashlib
import hmac
from typing import Any, Dict
def check_signature(token: str, hash: str, **kwargs: Any) -> bool:
"""
Generate hexadecimal representation
of the HMAC-SHA-256 signature of the data-check-string
with the SHA256 hash of the bot's token used as a secret key
:param token:
:param hash:
:param kwargs: all params received on auth
:return:
"""
secret = hashlib.sha256(token.encode("utf-8"))
check_string = "\n".join(f"{k}={kwargs[k]}" for k in sorted(kwargs))
hmac_string = hmac.new(
secret.digest(), check_string.encode("utf-8"), digestmod=hashlib.sha256
).hexdigest()
return hmac_string == hash
def check_integrity(token: str, data: Dict[str, Any]) -> bool:
"""
Verify the authentication and the integrity
of the data received on user's auth
:param token: Bot's token
:param data: all data that came on auth
:return:
"""
return check_signature(token, **data)

View File

@@ -0,0 +1,83 @@
import asyncio
import time
from dataclasses import dataclass
from random import normalvariate
@dataclass(frozen=True)
class BackoffConfig:
min_delay: float
max_delay: float
factor: float
jitter: float
def __post_init__(self) -> None:
if self.max_delay <= self.min_delay:
raise ValueError("`max_delay` should be greater than `min_delay`")
if self.factor <= 1:
raise ValueError("`factor` should be greater than 1")
class Backoff:
def __init__(self, config: BackoffConfig) -> None:
self.config = config
self._next_delay = config.min_delay
self._current_delay = 0.0
self._counter = 0
def __iter__(self) -> "Backoff":
return self
@property
def min_delay(self) -> float:
return self.config.min_delay
@property
def max_delay(self) -> float:
return self.config.max_delay
@property
def factor(self) -> float:
return self.config.factor
@property
def jitter(self) -> float:
return self.config.jitter
@property
def next_delay(self) -> float:
return self._next_delay
@property
def current_delay(self) -> float:
return self._current_delay
@property
def counter(self) -> int:
return self._counter
def sleep(self) -> None:
time.sleep(next(self))
async def asleep(self) -> None:
await asyncio.sleep(next(self))
def _calculate_next(self, value: float) -> float:
return normalvariate(min(value * self.factor, self.max_delay), self.jitter)
def __next__(self) -> float:
self._current_delay = self._next_delay
self._next_delay = self._calculate_next(self._next_delay)
self._counter += 1
return self._current_delay
def reset(self) -> None:
self._current_delay = 0.0
self._counter = 0
self._next_delay = self.min_delay
def __str__(self) -> str:
return (
f"Backoff(tryings={self._counter}, current_delay={self._current_delay}, "
f"next_delay={self._next_delay})"
)

View File

@@ -0,0 +1,211 @@
from typing import Any, Awaitable, Callable, Dict, Optional, Union
from aiogram import BaseMiddleware, loggers
from aiogram.dispatcher.flags import get_flag
from aiogram.exceptions import CallbackAnswerException
from aiogram.methods import AnswerCallbackQuery
from aiogram.types import CallbackQuery, TelegramObject
class CallbackAnswer:
def __init__(
self,
answered: bool,
disabled: bool = False,
text: Optional[str] = None,
show_alert: Optional[bool] = None,
url: Optional[str] = None,
cache_time: Optional[int] = None,
) -> None:
"""
Callback answer configuration
:param answered: this request is already answered by middleware
:param disabled: answer will not be performed
:param text: answer with text
:param show_alert: show alert
:param url: game url
:param cache_time: cache answer for some time
"""
self._answered = answered
self._disabled = disabled
self._text = text
self._show_alert = show_alert
self._url = url
self._cache_time = cache_time
def disable(self) -> None:
"""
Deactivate answering for this handler
"""
self.disabled = True
@property
def disabled(self) -> bool:
"""Indicates that automatic answer is disabled in this handler"""
return self._disabled
@disabled.setter
def disabled(self, value: bool) -> None:
if self._answered:
raise CallbackAnswerException("Can't change disabled state after answer")
self._disabled = value
@property
def answered(self) -> bool:
"""
Indicates that request is already answered by middleware
"""
return self._answered
@property
def text(self) -> Optional[str]:
"""
Response text
:return:
"""
return self._text
@text.setter
def text(self, value: Optional[str]) -> None:
if self._answered:
raise CallbackAnswerException("Can't change text after answer")
self._text = value
@property
def show_alert(self) -> Optional[bool]:
"""
Whether to display an alert
"""
return self._show_alert
@show_alert.setter
def show_alert(self, value: Optional[bool]) -> None:
if self._answered:
raise CallbackAnswerException("Can't change show_alert after answer")
self._show_alert = value
@property
def url(self) -> Optional[str]:
"""
Game url
"""
return self._url
@url.setter
def url(self, value: Optional[str]) -> None:
if self._answered:
raise CallbackAnswerException("Can't change url after answer")
self._url = value
@property
def cache_time(self) -> Optional[int]:
"""
Response cache time
"""
return self._cache_time
@cache_time.setter
def cache_time(self, value: Optional[int]) -> None:
if self._answered:
raise CallbackAnswerException("Can't change cache_time after answer")
self._cache_time = value
def __str__(self) -> str:
args = ", ".join(
f"{k}={v!r}"
for k, v in {
"answered": self.answered,
"disabled": self.disabled,
"text": self.text,
"show_alert": self.show_alert,
"url": self.url,
"cache_time": self.cache_time,
}.items()
if v is not None
)
return f"{type(self).__name__}({args})"
class CallbackAnswerMiddleware(BaseMiddleware):
def __init__(
self,
pre: bool = False,
text: Optional[str] = None,
show_alert: Optional[bool] = None,
url: Optional[str] = None,
cache_time: Optional[int] = None,
) -> None:
"""
Inner middleware for callback query handlers, can be useful in bots with a lot of callback
handlers to automatically take answer to all requests
:param pre: send answer before execute handler
:param text: answer with text
:param show_alert: show alert
:param url: game url
:param cache_time: cache answer for some time
"""
self.pre = pre
self.text = text
self.show_alert = show_alert
self.url = url
self.cache_time = cache_time
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any],
) -> Any:
if not isinstance(event, CallbackQuery):
return await handler(event, data)
callback_answer = data["callback_answer"] = self.construct_callback_answer(
properties=get_flag(data, "callback_answer")
)
if not callback_answer.disabled and callback_answer.answered:
await self.answer(event, callback_answer)
try:
return await handler(event, data)
finally:
if not callback_answer.disabled and not callback_answer.answered:
await self.answer(event, callback_answer)
def construct_callback_answer(
self, properties: Optional[Union[Dict[str, Any], bool]]
) -> CallbackAnswer:
pre, disabled, text, show_alert, url, cache_time = (
self.pre,
False,
self.text,
self.show_alert,
self.url,
self.cache_time,
)
if isinstance(properties, dict):
pre = properties.get("pre", pre)
disabled = properties.get("disabled", disabled)
text = properties.get("text", text)
show_alert = properties.get("show_alert", show_alert)
url = properties.get("url", url)
cache_time = properties.get("cache_time", cache_time)
return CallbackAnswer(
answered=pre,
disabled=disabled,
text=text,
show_alert=show_alert,
url=url,
cache_time=cache_time,
)
def answer(self, event: CallbackQuery, callback_answer: CallbackAnswer) -> AnswerCallbackQuery:
loggers.middlewares.info("Answer to callback query id=%s", event.id)
return event.answer(
text=callback_answer.text,
show_alert=callback_answer.show_alert,
url=callback_answer.url,
cache_time=callback_answer.cache_time,
)

View File

@@ -0,0 +1,379 @@
import asyncio
import logging
import time
from asyncio import Event, Lock
from contextlib import suppress
from types import TracebackType
from typing import Any, Awaitable, Callable, Dict, Optional, Type, Union
from aiogram import BaseMiddleware, Bot
from aiogram.dispatcher.flags import get_flag
from aiogram.types import Message, TelegramObject
logger = logging.getLogger(__name__)
DEFAULT_INTERVAL = 5.0
DEFAULT_INITIAL_SLEEP = 0.0
class ChatActionSender:
"""
This utility helps to automatically send chat action until long actions is done
to take acknowledge bot users the bot is doing something and not crashed.
Provides simply to use context manager.
Technically sender start background task with infinity loop which works
until action will be finished and sends the
`chat action <https://core.telegram.org/bots/api#sendchataction>`_
every 5 seconds.
"""
def __init__(
self,
*,
bot: Bot,
chat_id: Union[str, int],
message_thread_id: Optional[int] = None,
action: str = "typing",
interval: float = DEFAULT_INTERVAL,
initial_sleep: float = DEFAULT_INITIAL_SLEEP,
) -> None:
"""
:param bot: instance of the bot
:param chat_id: target chat id
:param message_thread_id: unique identifier for the target message thread; supergroups only
:param action: chat action type
:param interval: interval between iterations
:param initial_sleep: sleep before first sending of the action
"""
self.chat_id = chat_id
self.message_thread_id = message_thread_id
self.action = action
self.interval = interval
self.initial_sleep = initial_sleep
self.bot = bot
self._lock = Lock()
self._close_event = Event()
self._closed_event = Event()
self._task: Optional[asyncio.Task[Any]] = None
@property
def running(self) -> bool:
return bool(self._task)
async def _wait(self, interval: float) -> None:
with suppress(asyncio.TimeoutError):
await asyncio.wait_for(self._close_event.wait(), interval)
async def _worker(self) -> None:
logger.debug(
"Started chat action %r sender in chat_id=%s via bot id=%d",
self.action,
self.chat_id,
self.bot.id,
)
try:
counter = 0
await self._wait(self.initial_sleep)
while not self._close_event.is_set():
start = time.monotonic()
logger.debug(
"Sent chat action %r to chat_id=%s via bot %d (already sent actions %d)",
self.action,
self.chat_id,
self.bot.id,
counter,
)
await self.bot.send_chat_action(
chat_id=self.chat_id,
action=self.action,
message_thread_id=self.message_thread_id,
)
counter += 1
interval = self.interval - (time.monotonic() - start)
await self._wait(interval)
finally:
logger.debug(
"Finished chat action %r sender in chat_id=%s via bot id=%d",
self.action,
self.chat_id,
self.bot.id,
)
self._closed_event.set()
async def _run(self) -> None:
async with self._lock:
self._close_event.clear()
self._closed_event.clear()
if self.running:
raise RuntimeError("Already running")
self._task = asyncio.create_task(self._worker())
async def _stop(self) -> None:
async with self._lock:
if not self.running:
return
if not self._close_event.is_set(): # pragma: no branches
self._close_event.set()
await self._closed_event.wait()
self._task = None
async def __aenter__(self) -> "ChatActionSender":
await self._run()
return self
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Any:
await self._stop()
@classmethod
def typing(
cls,
chat_id: Union[int, str],
bot: Bot,
message_thread_id: Optional[int] = None,
interval: float = DEFAULT_INTERVAL,
initial_sleep: float = DEFAULT_INITIAL_SLEEP,
) -> "ChatActionSender":
"""Create instance of the sender with `typing` action"""
return cls(
bot=bot,
chat_id=chat_id,
message_thread_id=message_thread_id,
action="typing",
interval=interval,
initial_sleep=initial_sleep,
)
@classmethod
def upload_photo(
cls,
chat_id: Union[int, str],
bot: Bot,
message_thread_id: Optional[int] = None,
interval: float = DEFAULT_INTERVAL,
initial_sleep: float = DEFAULT_INITIAL_SLEEP,
) -> "ChatActionSender":
"""Create instance of the sender with `upload_photo` action"""
return cls(
bot=bot,
chat_id=chat_id,
message_thread_id=message_thread_id,
action="upload_photo",
interval=interval,
initial_sleep=initial_sleep,
)
@classmethod
def record_video(
cls,
chat_id: Union[int, str],
bot: Bot,
message_thread_id: Optional[int] = None,
interval: float = DEFAULT_INTERVAL,
initial_sleep: float = DEFAULT_INITIAL_SLEEP,
) -> "ChatActionSender":
"""Create instance of the sender with `record_video` action"""
return cls(
bot=bot,
chat_id=chat_id,
message_thread_id=message_thread_id,
action="record_video",
interval=interval,
initial_sleep=initial_sleep,
)
@classmethod
def upload_video(
cls,
chat_id: Union[int, str],
bot: Bot,
message_thread_id: Optional[int] = None,
interval: float = DEFAULT_INTERVAL,
initial_sleep: float = DEFAULT_INITIAL_SLEEP,
) -> "ChatActionSender":
"""Create instance of the sender with `upload_video` action"""
return cls(
bot=bot,
chat_id=chat_id,
message_thread_id=message_thread_id,
action="upload_video",
interval=interval,
initial_sleep=initial_sleep,
)
@classmethod
def record_voice(
cls,
chat_id: Union[int, str],
bot: Bot,
message_thread_id: Optional[int] = None,
interval: float = DEFAULT_INTERVAL,
initial_sleep: float = DEFAULT_INITIAL_SLEEP,
) -> "ChatActionSender":
"""Create instance of the sender with `record_voice` action"""
return cls(
bot=bot,
chat_id=chat_id,
message_thread_id=message_thread_id,
action="record_voice",
interval=interval,
initial_sleep=initial_sleep,
)
@classmethod
def upload_voice(
cls,
chat_id: Union[int, str],
bot: Bot,
message_thread_id: Optional[int] = None,
interval: float = DEFAULT_INTERVAL,
initial_sleep: float = DEFAULT_INITIAL_SLEEP,
) -> "ChatActionSender":
"""Create instance of the sender with `upload_voice` action"""
return cls(
bot=bot,
chat_id=chat_id,
message_thread_id=message_thread_id,
action="upload_voice",
interval=interval,
initial_sleep=initial_sleep,
)
@classmethod
def upload_document(
cls,
chat_id: Union[int, str],
bot: Bot,
message_thread_id: Optional[int] = None,
interval: float = DEFAULT_INTERVAL,
initial_sleep: float = DEFAULT_INITIAL_SLEEP,
) -> "ChatActionSender":
"""Create instance of the sender with `upload_document` action"""
return cls(
bot=bot,
chat_id=chat_id,
message_thread_id=message_thread_id,
action="upload_document",
interval=interval,
initial_sleep=initial_sleep,
)
@classmethod
def choose_sticker(
cls,
chat_id: Union[int, str],
bot: Bot,
message_thread_id: Optional[int] = None,
interval: float = DEFAULT_INTERVAL,
initial_sleep: float = DEFAULT_INITIAL_SLEEP,
) -> "ChatActionSender":
"""Create instance of the sender with `choose_sticker` action"""
return cls(
bot=bot,
chat_id=chat_id,
message_thread_id=message_thread_id,
action="choose_sticker",
interval=interval,
initial_sleep=initial_sleep,
)
@classmethod
def find_location(
cls,
chat_id: Union[int, str],
bot: Bot,
message_thread_id: Optional[int] = None,
interval: float = DEFAULT_INTERVAL,
initial_sleep: float = DEFAULT_INITIAL_SLEEP,
) -> "ChatActionSender":
"""Create instance of the sender with `find_location` action"""
return cls(
bot=bot,
chat_id=chat_id,
message_thread_id=message_thread_id,
action="find_location",
interval=interval,
initial_sleep=initial_sleep,
)
@classmethod
def record_video_note(
cls,
chat_id: Union[int, str],
bot: Bot,
message_thread_id: Optional[int] = None,
interval: float = DEFAULT_INTERVAL,
initial_sleep: float = DEFAULT_INITIAL_SLEEP,
) -> "ChatActionSender":
"""Create instance of the sender with `record_video_note` action"""
return cls(
bot=bot,
chat_id=chat_id,
message_thread_id=message_thread_id,
action="record_video_note",
interval=interval,
initial_sleep=initial_sleep,
)
@classmethod
def upload_video_note(
cls,
chat_id: Union[int, str],
bot: Bot,
message_thread_id: Optional[int] = None,
interval: float = DEFAULT_INTERVAL,
initial_sleep: float = DEFAULT_INITIAL_SLEEP,
) -> "ChatActionSender":
"""Create instance of the sender with `upload_video_note` action"""
return cls(
bot=bot,
chat_id=chat_id,
message_thread_id=message_thread_id,
action="upload_video_note",
interval=interval,
initial_sleep=initial_sleep,
)
class ChatActionMiddleware(BaseMiddleware):
"""
Helps to automatically use chat action sender for all message handlers
"""
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any],
) -> Any:
if not isinstance(event, Message):
return await handler(event, data)
bot = data["bot"]
chat_action = get_flag(data, "chat_action") or "typing"
kwargs = {}
if isinstance(chat_action, dict):
if initial_sleep := chat_action.get("initial_sleep"):
kwargs["initial_sleep"] = initial_sleep
if interval := chat_action.get("interval"):
kwargs["interval"] = interval
if action := chat_action.get("action"):
kwargs["action"] = action
elif isinstance(chat_action, bool):
kwargs["action"] = "typing"
else:
kwargs["action"] = chat_action
kwargs["message_thread_id"] = (
event.message_thread_id
if isinstance(event, Message) and event.is_topic_message
else None
)
async with ChatActionSender(bot=bot, chat_id=event.chat.id, **kwargs):
return await handler(event, data)

View File

@@ -0,0 +1,37 @@
from typing import Tuple, Type, Union
from pydantic import Field, TypeAdapter
from typing_extensions import Annotated
from aiogram.types import (
ChatMember,
ChatMemberAdministrator,
ChatMemberBanned,
ChatMemberLeft,
ChatMemberMember,
ChatMemberOwner,
ChatMemberRestricted,
)
ChatMemberUnion = Union[
ChatMemberOwner,
ChatMemberAdministrator,
ChatMemberMember,
ChatMemberRestricted,
ChatMemberLeft,
ChatMemberBanned,
]
ChatMemberCollection = Tuple[Type[ChatMember], ...]
ChatMemberAdapter: TypeAdapter[ChatMemberUnion] = TypeAdapter(
Annotated[
ChatMemberUnion,
Field(discriminator="status"),
]
)
ADMINS: ChatMemberCollection = (ChatMemberOwner, ChatMemberAdministrator)
USERS: ChatMemberCollection = (ChatMemberMember, ChatMemberRestricted)
MEMBERS: ChatMemberCollection = ADMINS + USERS
NOT_MEMBERS: ChatMemberCollection = (ChatMemberLeft, ChatMemberBanned)

View File

@@ -0,0 +1,86 @@
import inspect
from dataclasses import dataclass
from operator import itemgetter
from typing import Any, Generator, NamedTuple, Protocol
from aiogram.utils.dataclass import dataclass_kwargs
class ClassAttrsResolver(Protocol):
def __call__(self, cls: type) -> Generator[tuple[str, Any], None, None]: ...
def inspect_members_resolver(cls: type) -> Generator[tuple[str, Any], None, None]:
"""
Inspects and resolves attributes of a given class.
This function uses the `inspect.getmembers` utility to yield all attributes of
a provided class. The output is a generator that produces tuples containing
attribute names and their corresponding values. This function is suitable for
analyzing class attributes dynamically. However, it guarantees alphabetical
order of attributes.
:param cls: The class for which the attributes will be resolved.
:return: A generator yielding tuples containing attribute names and their values.
"""
yield from inspect.getmembers(cls)
def get_reversed_mro_unique_attrs_resolver(cls: type) -> Generator[tuple[str, Any], None, None]:
"""
Resolve and yield attributes from the reversed method resolution order (MRO) of a given class.
This function iterates through the reversed MRO of a class and yields attributes
that have not yet been encountered. It avoids duplicates by keeping track of
attribute names that have already been processed.
:param cls: The class for which the attributes will be resolved.
:return: A generator yielding tuples containing attribute names and their values.
"""
known_attrs = set()
for base in reversed(inspect.getmro(cls)):
for name, value in base.__dict__.items():
if name in known_attrs:
continue
yield name, value
known_attrs.add(name)
class _Position(NamedTuple):
in_mro: int
in_class: int
@dataclass(**dataclass_kwargs(slots=True))
class _AttributeContainer:
position: _Position
value: Any
def __lt__(self, other: "_AttributeContainer") -> bool:
return self.position < other.position
def get_sorted_mro_attrs_resolver(cls: type) -> Generator[tuple[str, Any], None, None]:
"""
Resolve and yield attributes from the method resolution order (MRO) of a given class.
Iterates through a class's method resolution order (MRO) and collects its attributes
along with their respective positions in the MRO and the class hierarchy. This generator
yields a tuple containing the name of each attribute and its associated value.
:param cls: The class for which the attributes will be resolved.
:return: A generator yielding tuples containing attribute names and their values.
"""
attributes: dict[str, _AttributeContainer] = {}
for position_in_mro, base in enumerate(inspect.getmro(cls)):
for position_in_class, (name, value) in enumerate(vars(base).items()):
position = _Position(position_in_mro, position_in_class)
if attribute := attributes.get(name):
attribute.position = position
continue
attributes[name] = _AttributeContainer(value=value, position=position)
for name, attribute in sorted(attributes.items(), key=itemgetter(1)):
yield name, attribute.value

View File

@@ -0,0 +1,64 @@
"""
This module contains utility functions for working with dataclasses in Python.
DO NOT USE THIS MODULE DIRECTLY. IT IS INTENDED FOR INTERNAL USE ONLY.
"""
import sys
from typing import Any, Union
def dataclass_kwargs(
init: Union[bool, None] = None,
repr: Union[bool, None] = None,
eq: Union[bool, None] = None,
order: Union[bool, None] = None,
unsafe_hash: Union[bool, None] = None,
frozen: Union[bool, None] = None,
match_args: Union[bool, None] = None,
kw_only: Union[bool, None] = None,
slots: Union[bool, None] = None,
weakref_slot: Union[bool, None] = None,
) -> dict[str, Any]:
"""
Generates a dictionary of keyword arguments that can be passed to a Python
dataclass. This function allows specifying attributes related to the behavior
and configuration of dataclasses, including attributes added in specific
Python versions. This abstraction improves compatibility across different
Python versions by ensuring only the parameters supported by the current
version are included.
:return: A dictionary containing the specified dataclass configuration that
dynamically adapts to the current Python version.
"""
params = {}
# All versions
if init is not None:
params["init"] = init
if repr is not None:
params["repr"] = repr
if eq is not None:
params["eq"] = eq
if order is not None:
params["order"] = order
if unsafe_hash is not None:
params["unsafe_hash"] = unsafe_hash
if frozen is not None:
params["frozen"] = frozen
# Added in 3.10
if sys.version_info >= (3, 10):
if match_args is not None:
params["match_args"] = match_args
if kw_only is not None:
params["kw_only"] = kw_only
if slots is not None:
params["slots"] = slots
# Added in 3.11
if sys.version_info >= (3, 11):
if weakref_slot is not None:
params["weakref_slot"] = weakref_slot
return params

View File

@@ -0,0 +1,153 @@
from __future__ import annotations
__all__ = [
"create_start_link",
"create_startgroup_link",
"create_startapp_link",
"create_deep_link",
"create_telegram_link",
"encode_payload",
"decode_payload",
]
import re
from typing import TYPE_CHECKING, Callable, Literal, Optional, cast
from aiogram.utils.link import create_telegram_link
from aiogram.utils.payload import decode_payload, encode_payload
if TYPE_CHECKING:
from aiogram import Bot
BAD_PATTERN = re.compile(r"[^a-zA-Z0-9-_]")
async def create_start_link(
bot: Bot,
payload: str,
encode: bool = False,
encoder: Optional[Callable[[bytes], bytes]] = None,
) -> str:
"""
Create 'start' deep link with your payload.
If you need to encode payload or pass special characters - set encode as True
:param bot: bot instance
:param payload: args passed with /start
:param encode: encode payload with base64url or custom encoder
:param encoder: custom encoder callable
:return: link
"""
username = (await bot.me()).username
return create_deep_link(
username=cast(str, username),
link_type="start",
payload=payload,
encode=encode,
encoder=encoder,
)
async def create_startgroup_link(
bot: Bot,
payload: str,
encode: bool = False,
encoder: Optional[Callable[[bytes], bytes]] = None,
) -> str:
"""
Create 'startgroup' deep link with your payload.
If you need to encode payload or pass special characters - set encode as True
:param bot: bot instance
:param payload: args passed with /start
:param encode: encode payload with base64url or custom encoder
:param encoder: custom encoder callable
:return: link
"""
username = (await bot.me()).username
return create_deep_link(
username=cast(str, username),
link_type="startgroup",
payload=payload,
encode=encode,
encoder=encoder,
)
async def create_startapp_link(
bot: Bot,
payload: str,
encode: bool = False,
app_name: Optional[str] = None,
encoder: Optional[Callable[[bytes], bytes]] = None,
) -> str:
"""
Create 'startapp' deep link with your payload.
If you need to encode payload or pass special characters - set encode as True
**Read more**:
- `Main Mini App links <https://core.telegram.org/api/links#main-mini-app-links>`_
- `Direct mini app links <https://core.telegram.org/api/links#direct-mini-app-links>`_
:param bot: bot instance
:param payload: args passed with /start
:param encode: encode payload with base64url or custom encoder
:param app_name: if you want direct mini app link
:param encoder: custom encoder callable
:return: link
"""
username = (await bot.me()).username
return create_deep_link(
username=cast(str, username),
link_type="startapp",
payload=payload,
app_name=app_name,
encode=encode,
encoder=encoder,
)
def create_deep_link(
username: str,
link_type: Literal["start", "startgroup", "startapp"],
payload: str,
app_name: Optional[str] = None,
encode: bool = False,
encoder: Optional[Callable[[bytes], bytes]] = None,
) -> str:
"""
Create deep link.
:param username:
:param link_type: `start`, `startgroup` or `startapp`
:param payload: any string-convertible data
:param app_name: if you want direct mini app link
:param encode: encode payload with base64url or custom encoder
:param encoder: custom encoder callable
:return: deeplink
"""
if not isinstance(payload, str):
payload = str(payload)
if encode or encoder:
payload = encode_payload(payload, encoder=encoder)
if re.search(BAD_PATTERN, payload):
raise ValueError(
"Wrong payload! Only A-Z, a-z, 0-9, _ and - are allowed. "
"Pass `encode=True` or encode payload manually."
)
if len(payload) > 64:
raise ValueError("Payload must be up to 64 characters long.")
if not app_name:
deep_link = create_telegram_link(username, **{cast(str, link_type): payload})
else:
deep_link = create_telegram_link(username, app_name, **{cast(str, link_type): payload})
return deep_link

View File

@@ -0,0 +1,716 @@
import textwrap
from typing import (
Any,
ClassVar,
Dict,
Generator,
Iterable,
Iterator,
List,
Optional,
Tuple,
Type,
)
from typing_extensions import Self
from aiogram.enums import MessageEntityType
from aiogram.types import MessageEntity, User
from aiogram.utils.text_decorations import (
add_surrogates,
html_decoration,
markdown_decoration,
remove_surrogates,
)
NodeType = Any
def sizeof(value: str) -> int:
return len(value.encode("utf-16-le")) // 2
class Text(Iterable[NodeType]):
"""
Simple text element
"""
type: ClassVar[Optional[str]] = None
__slots__ = ("_body", "_params")
def __init__(
self,
*body: NodeType,
**params: Any,
) -> None:
self._body: Tuple[NodeType, ...] = body
self._params: Dict[str, Any] = params
@classmethod
def from_entities(cls, text: str, entities: List[MessageEntity]) -> "Text":
return cls(
*_unparse_entities(
text=add_surrogates(text),
entities=sorted(entities, key=lambda item: item.offset) if entities else [],
)
)
def render(
self,
*,
_offset: int = 0,
_sort: bool = True,
_collect_entities: bool = True,
) -> Tuple[str, List[MessageEntity]]:
"""
Render elements tree as text with entities list
:return:
"""
text = ""
entities = []
offset = _offset
for node in self._body:
if not isinstance(node, Text):
node = str(node)
text += node
offset += sizeof(node)
else:
node_text, node_entities = node.render(
_offset=offset,
_sort=False,
_collect_entities=_collect_entities,
)
text += node_text
offset += sizeof(node_text)
if _collect_entities:
entities.extend(node_entities)
if _collect_entities and self.type:
entities.append(self._render_entity(offset=_offset, length=offset - _offset))
if _collect_entities and _sort:
entities.sort(key=lambda entity: entity.offset)
return text, entities
def _render_entity(self, *, offset: int, length: int) -> MessageEntity:
assert self.type is not None, "Node without type can't be rendered as entity"
return MessageEntity(type=self.type, offset=offset, length=length, **self._params)
def as_kwargs(
self,
*,
text_key: str = "text",
entities_key: str = "entities",
replace_parse_mode: bool = True,
parse_mode_key: str = "parse_mode",
) -> Dict[str, Any]:
"""
Render element tree as keyword arguments for usage in an API call, for example:
.. code-block:: python
entities = Text(...)
await message.answer(**entities.as_kwargs())
:param text_key:
:param entities_key:
:param replace_parse_mode:
:param parse_mode_key:
:return:
"""
text_value, entities_value = self.render()
result: Dict[str, Any] = {
text_key: text_value,
entities_key: entities_value,
}
if replace_parse_mode:
result[parse_mode_key] = None
return result
def as_caption_kwargs(self, *, replace_parse_mode: bool = True) -> Dict[str, Any]:
"""
Shortcut for :meth:`as_kwargs` for usage with API calls that take
``caption`` as a parameter.
.. code-block:: python
entities = Text(...)
await message.answer_photo(**entities.as_caption_kwargs(), photo=phot)
:param replace_parse_mode: Will be passed to :meth:`as_kwargs`.
:return:
"""
return self.as_kwargs(
text_key="caption",
entities_key="caption_entities",
replace_parse_mode=replace_parse_mode,
)
def as_poll_question_kwargs(self, *, replace_parse_mode: bool = True) -> Dict[str, Any]:
"""
Shortcut for :meth:`as_kwargs` for usage with
method :class:`aiogram.methods.send_poll.SendPoll`.
.. code-block:: python
entities = Text(...)
await message.answer_poll(**entities.as_poll_question_kwargs(), options=options)
:param replace_parse_mode: Will be passed to :meth:`as_kwargs`.
:return:
"""
return self.as_kwargs(
text_key="question",
entities_key="question_entities",
parse_mode_key="question_parse_mode",
replace_parse_mode=replace_parse_mode,
)
def as_poll_explanation_kwargs(self, *, replace_parse_mode: bool = True) -> Dict[str, Any]:
"""
Shortcut for :meth:`as_kwargs` for usage with
method :class:`aiogram.methods.send_poll.SendPoll`.
.. code-block:: python
question_entities = Text(...)
explanation_entities = Text(...)
await message.answer_poll(
**question_entities.as_poll_question_kwargs(),
options=options,
**explanation_entities.as_poll_explanation_kwargs(),
)
:param replace_parse_mode: Will be passed to :meth:`as_kwargs`.
:return:
"""
return self.as_kwargs(
text_key="explanation",
entities_key="explanation_entities",
parse_mode_key="explanation_parse_mode",
replace_parse_mode=replace_parse_mode,
)
def as_gift_text_kwargs(self, *, replace_parse_mode: bool = True) -> Dict[str, Any]:
"""
Shortcut for :meth:`as_kwargs` for usage with
method :class:`aiogram.methods.send_gift.SendGift`.
.. code-block:: python
entities = Text(...)
await bot.send_gift(gift_id=gift_id, user_id=user_id, **entities.as_gift_text_kwargs())
:param replace_parse_mode: Will be passed to :meth:`as_kwargs`.
:return:
"""
return self.as_kwargs(
text_key="text",
entities_key="text_entities",
parse_mode_key="text_parse_mode",
replace_parse_mode=replace_parse_mode,
)
def as_html(self) -> str:
"""
Render elements tree as HTML markup
"""
text, entities = self.render()
return html_decoration.unparse(text, entities)
def as_markdown(self) -> str:
"""
Render elements tree as MarkdownV2 markup
"""
text, entities = self.render()
return markdown_decoration.unparse(text, entities)
def replace(self: Self, *args: Any, **kwargs: Any) -> Self:
return type(self)(*args, **{**self._params, **kwargs})
def as_pretty_string(self, indent: bool = False) -> str:
sep = ",\n" if indent else ", "
body = sep.join(
item.as_pretty_string(indent=indent) if isinstance(item, Text) else repr(item)
for item in self._body
)
params = sep.join(f"{k}={v!r}" for k, v in self._params.items() if v is not None)
args = []
if body:
args.append(body)
if params:
args.append(params)
args_str = sep.join(args)
if indent:
args_str = textwrap.indent("\n" + args_str + "\n", " ")
return f"{type(self).__name__}({args_str})"
def __add__(self, other: NodeType) -> "Text":
if isinstance(other, Text) and other.type == self.type and self._params == other._params:
return type(self)(*self, *other, **self._params)
if type(self) is Text and isinstance(other, str):
return type(self)(*self, other, **self._params)
return Text(self, other)
def __iter__(self) -> Iterator[NodeType]:
yield from self._body
def __len__(self) -> int:
text, _ = self.render(_collect_entities=False)
return sizeof(text)
def __getitem__(self, item: slice) -> "Text":
if not isinstance(item, slice):
raise TypeError("Can only be sliced")
if (item.start is None or item.start == 0) and item.stop is None:
return self.replace(*self._body)
start = 0 if item.start is None else item.start
stop = len(self) if item.stop is None else item.stop
if start == stop:
return self.replace()
nodes = []
position = 0
for node in self._body:
node_size = len(node)
current_position = position
position += node_size
if position < start:
continue
if current_position > stop:
break
a = max((0, start - current_position))
b = min((node_size, stop - current_position))
new_node = node[a:b]
if not new_node:
continue
nodes.append(new_node)
return self.replace(*nodes)
class HashTag(Text):
"""
Hashtag element.
.. warning::
The value should always start with '#' symbol
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.HASHTAG`
"""
type = MessageEntityType.HASHTAG
def __init__(self, *body: NodeType, **params: Any) -> None:
if len(body) != 1:
raise ValueError("Hashtag can contain only one element")
if not isinstance(body[0], str):
raise ValueError("Hashtag can contain only string")
if not body[0].startswith("#"):
body = ("#" + body[0],)
super().__init__(*body, **params)
class CashTag(Text):
"""
Cashtag element.
.. warning::
The value should always start with '$' symbol
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.CASHTAG`
"""
type = MessageEntityType.CASHTAG
def __init__(self, *body: NodeType, **params: Any) -> None:
if len(body) != 1:
raise ValueError("Cashtag can contain only one element")
if not isinstance(body[0], str):
raise ValueError("Cashtag can contain only string")
if not body[0].startswith("$"):
body = ("$" + body[0],)
super().__init__(*body, **params)
class BotCommand(Text):
"""
Bot command element.
.. warning::
The value should always start with '/' symbol
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.BOT_COMMAND`
"""
type = MessageEntityType.BOT_COMMAND
class Url(Text):
"""
Url element.
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.URL`
"""
type = MessageEntityType.URL
class Email(Text):
"""
Email element.
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.EMAIL`
"""
type = MessageEntityType.EMAIL
class PhoneNumber(Text):
"""
Phone number element.
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.PHONE_NUMBER`
"""
type = MessageEntityType.PHONE_NUMBER
class Bold(Text):
"""
Bold element.
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.BOLD`
"""
type = MessageEntityType.BOLD
class Italic(Text):
"""
Italic element.
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.ITALIC`
"""
type = MessageEntityType.ITALIC
class Underline(Text):
"""
Underline element.
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.UNDERLINE`
"""
type = MessageEntityType.UNDERLINE
class Strikethrough(Text):
"""
Strikethrough element.
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.STRIKETHROUGH`
"""
type = MessageEntityType.STRIKETHROUGH
class Spoiler(Text):
"""
Spoiler element.
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.SPOILER`
"""
type = MessageEntityType.SPOILER
class Code(Text):
"""
Code element.
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.CODE`
"""
type = MessageEntityType.CODE
class Pre(Text):
"""
Pre element.
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.PRE`
"""
type = MessageEntityType.PRE
def __init__(self, *body: NodeType, language: Optional[str] = None, **params: Any) -> None:
super().__init__(*body, language=language, **params)
class TextLink(Text):
"""
Text link element.
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.TEXT_LINK`
"""
type = MessageEntityType.TEXT_LINK
def __init__(self, *body: NodeType, url: str, **params: Any) -> None:
super().__init__(*body, url=url, **params)
class TextMention(Text):
"""
Text mention element.
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.TEXT_MENTION`
"""
type = MessageEntityType.TEXT_MENTION
def __init__(self, *body: NodeType, user: User, **params: Any) -> None:
super().__init__(*body, user=user, **params)
class CustomEmoji(Text):
"""
Custom emoji element.
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.CUSTOM_EMOJI`
"""
type = MessageEntityType.CUSTOM_EMOJI
def __init__(self, *body: NodeType, custom_emoji_id: str, **params: Any) -> None:
super().__init__(*body, custom_emoji_id=custom_emoji_id, **params)
class BlockQuote(Text):
"""
Block quote element.
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.BLOCKQUOTE`
"""
type = MessageEntityType.BLOCKQUOTE
class ExpandableBlockQuote(Text):
"""
Expandable block quote element.
Will be wrapped into :obj:`aiogram.types.message_entity.MessageEntity`
with type :obj:`aiogram.enums.message_entity_type.MessageEntityType.EXPANDABLE_BLOCKQUOTE`
"""
type = MessageEntityType.EXPANDABLE_BLOCKQUOTE
NODE_TYPES: Dict[Optional[str], Type[Text]] = {
Text.type: Text,
HashTag.type: HashTag,
CashTag.type: CashTag,
BotCommand.type: BotCommand,
Url.type: Url,
Email.type: Email,
PhoneNumber.type: PhoneNumber,
Bold.type: Bold,
Italic.type: Italic,
Underline.type: Underline,
Strikethrough.type: Strikethrough,
Spoiler.type: Spoiler,
Code.type: Code,
Pre.type: Pre,
TextLink.type: TextLink,
TextMention.type: TextMention,
CustomEmoji.type: CustomEmoji,
BlockQuote.type: BlockQuote,
ExpandableBlockQuote.type: ExpandableBlockQuote,
}
def _apply_entity(entity: MessageEntity, *nodes: NodeType) -> NodeType:
"""
Apply single entity to text
:param entity:
:param text:
:return:
"""
node_type = NODE_TYPES.get(entity.type, Text)
return node_type(
*nodes, **entity.model_dump(exclude={"type", "offset", "length"}, warnings=False)
)
def _unparse_entities(
text: bytes,
entities: List[MessageEntity],
offset: Optional[int] = None,
length: Optional[int] = None,
) -> Generator[NodeType, None, None]:
if offset is None:
offset = 0
length = length or len(text)
for index, entity in enumerate(entities):
if entity.offset * 2 < offset:
continue
if entity.offset * 2 > offset:
yield remove_surrogates(text[offset : entity.offset * 2])
start = entity.offset * 2
offset = entity.offset * 2 + entity.length * 2
sub_entities = list(filter(lambda e: e.offset * 2 < (offset or 0), entities[index + 1 :]))
yield _apply_entity(
entity,
*_unparse_entities(text, sub_entities, offset=start, length=offset),
)
if offset < length:
yield remove_surrogates(text[offset:length])
def as_line(*items: NodeType, end: str = "\n", sep: str = "") -> Text:
"""
Wrap multiple nodes into line with :code:`\\\\n` at the end of line.
:param items: Text or Any
:param end: ending of the line, by default is :code:`\\\\n`
:param sep: separator between items, by default is empty string
:return: Text
"""
if sep:
nodes = []
for item in items[:-1]:
nodes.extend([item, sep])
nodes.append(items[-1])
nodes.append(end)
else:
nodes = [*items, end]
return Text(*nodes)
def as_list(*items: NodeType, sep: str = "\n") -> Text:
"""
Wrap each element to separated lines
:param items:
:param sep:
:return:
"""
nodes = []
for item in items[:-1]:
nodes.extend([item, sep])
nodes.append(items[-1])
return Text(*nodes)
def as_marked_list(*items: NodeType, marker: str = "- ") -> Text:
"""
Wrap elements as marked list
:param items:
:param marker: line marker, by default is '- '
:return: Text
"""
return as_list(*(Text(marker, item) for item in items))
def as_numbered_list(*items: NodeType, start: int = 1, fmt: str = "{}. ") -> Text:
"""
Wrap elements as numbered list
:param items:
:param start: initial number, by default 1
:param fmt: number format, by default '{}. '
:return: Text
"""
return as_list(*(Text(fmt.format(index), item) for index, item in enumerate(items, start)))
def as_section(title: NodeType, *body: NodeType) -> Text:
"""
Wrap elements as simple section, section has title and body
:param title:
:param body:
:return: Text
"""
return Text(title, "\n", *body)
def as_marked_section(
title: NodeType,
*body: NodeType,
marker: str = "- ",
) -> Text:
"""
Wrap elements as section with marked list
:param title:
:param body:
:param marker:
:return:
"""
return as_section(title, as_marked_list(*body, marker=marker))
def as_numbered_section(
title: NodeType,
*body: NodeType,
start: int = 1,
fmt: str = "{}. ",
) -> Text:
"""
Wrap elements as section with numbered list
:param title:
:param body:
:param start:
:param fmt:
:return:
"""
return as_section(title, as_numbered_list(*body, start=start, fmt=fmt))
def as_key_value(key: NodeType, value: NodeType) -> Text:
"""
Wrap elements pair as key-value line. (:code:`<b>{key}:</b> {value}`)
:param key:
:param value:
:return: Text
"""
return Text(Bold(key, ":"), " ", value)

View File

@@ -0,0 +1,21 @@
from .context import get_i18n, gettext, lazy_gettext, lazy_ngettext, ngettext
from .core import I18n
from .middleware import (
ConstI18nMiddleware,
FSMI18nMiddleware,
I18nMiddleware,
SimpleI18nMiddleware,
)
__all__ = (
"I18n",
"I18nMiddleware",
"SimpleI18nMiddleware",
"ConstI18nMiddleware",
"FSMI18nMiddleware",
"gettext",
"lazy_gettext",
"ngettext",
"lazy_ngettext",
"get_i18n",
)

View File

@@ -0,0 +1,23 @@
from typing import Any
from aiogram.utils.i18n.core import I18n
from aiogram.utils.i18n.lazy_proxy import LazyProxy
def get_i18n() -> I18n:
i18n = I18n.get_current(no_error=True)
if i18n is None:
raise LookupError("I18n context is not set")
return i18n
def gettext(*args: Any, **kwargs: Any) -> str:
return get_i18n().gettext(*args, **kwargs)
def lazy_gettext(*args: Any, **kwargs: Any) -> LazyProxy:
return LazyProxy(gettext, *args, **kwargs, enable_cache=False)
ngettext = gettext
lazy_ngettext = lazy_gettext

View File

@@ -0,0 +1,123 @@
import gettext
import os
from contextlib import contextmanager
from contextvars import ContextVar
from pathlib import Path
from typing import Dict, Generator, Optional, Tuple, Union
from aiogram.utils.i18n.lazy_proxy import LazyProxy
from aiogram.utils.mixins import ContextInstanceMixin
class I18n(ContextInstanceMixin["I18n"]):
def __init__(
self,
*,
path: Union[str, Path],
default_locale: str = "en",
domain: str = "messages",
) -> None:
self.path = path
self.default_locale = default_locale
self.domain = domain
self.ctx_locale = ContextVar("aiogram_ctx_locale", default=default_locale)
self.locales = self.find_locales()
@property
def current_locale(self) -> str:
return self.ctx_locale.get()
@current_locale.setter
def current_locale(self, value: str) -> None:
self.ctx_locale.set(value)
@contextmanager
def use_locale(self, locale: str) -> Generator[None, None, None]:
"""
Create context with specified locale
"""
ctx_token = self.ctx_locale.set(locale)
try:
yield
finally:
self.ctx_locale.reset(ctx_token)
@contextmanager
def context(self) -> Generator["I18n", None, None]:
"""
Use I18n context
"""
token = self.set_current(self)
try:
yield self
finally:
self.reset_current(token)
def find_locales(self) -> Dict[str, gettext.GNUTranslations]:
"""
Load all compiled locales from path
:return: dict with locales
"""
translations: Dict[str, gettext.GNUTranslations] = {}
for name in os.listdir(self.path):
if not os.path.isdir(os.path.join(self.path, name)):
continue
mo_path = os.path.join(self.path, name, "LC_MESSAGES", self.domain + ".mo")
if os.path.exists(mo_path):
with open(mo_path, "rb") as fp:
translations[name] = gettext.GNUTranslations(fp)
elif os.path.exists(mo_path[:-2] + "po"): # pragma: no cover
raise RuntimeError(f"Found locale '{name}' but this language is not compiled!")
return translations
def reload(self) -> None:
"""
Hot reload locales
"""
self.locales = self.find_locales()
@property
def available_locales(self) -> Tuple[str, ...]:
"""
list of loaded locales
:return:
"""
return tuple(self.locales.keys())
def gettext(
self, singular: str, plural: Optional[str] = None, n: int = 1, locale: Optional[str] = None
) -> str:
"""
Get text
:param singular:
:param plural:
:param n:
:param locale:
:return:
"""
if locale is None:
locale = self.current_locale
if locale not in self.locales:
if n == 1:
return singular
return plural if plural else singular
translator = self.locales[locale]
if plural is None:
return translator.gettext(singular)
return translator.ngettext(singular, plural, n)
def lazy_gettext(
self, singular: str, plural: Optional[str] = None, n: int = 1, locale: Optional[str] = None
) -> LazyProxy:
return LazyProxy(
self.gettext, singular=singular, plural=plural, n=n, locale=locale, enable_cache=False
)

View File

@@ -0,0 +1,13 @@
from typing import Any
try:
from babel.support import LazyProxy
except ImportError: # pragma: no cover
class LazyProxy: # type: ignore
def __init__(self, func: Any, *args: Any, **kwargs: Any) -> None:
raise RuntimeError(
"LazyProxy can be used only when Babel installed\n"
"Just install Babel (`pip install Babel`) "
"or aiogram with i18n support (`pip install aiogram[i18n]`)"
)

View File

@@ -0,0 +1,187 @@
from abc import ABC, abstractmethod
from typing import Any, Awaitable, Callable, Dict, Optional, Set
try:
from babel import Locale, UnknownLocaleError
except ImportError: # pragma: no cover
Locale = None # type: ignore
class UnknownLocaleError(Exception): # type: ignore
pass
from aiogram import BaseMiddleware, Router
from aiogram.fsm.context import FSMContext
from aiogram.types import TelegramObject, User
from aiogram.utils.i18n.core import I18n
class I18nMiddleware(BaseMiddleware, ABC):
"""
Abstract I18n middleware.
"""
def __init__(
self,
i18n: I18n,
i18n_key: Optional[str] = "i18n",
middleware_key: str = "i18n_middleware",
) -> None:
"""
Create an instance of middleware
:param i18n: instance of I18n
:param i18n_key: context key for I18n instance
:param middleware_key: context key for this middleware
"""
self.i18n = i18n
self.i18n_key = i18n_key
self.middleware_key = middleware_key
def setup(
self: BaseMiddleware, router: Router, exclude: Optional[Set[str]] = None
) -> BaseMiddleware:
"""
Register middleware for all events in the Router
:param router:
:param exclude:
:return:
"""
if exclude is None:
exclude = set()
exclude_events = {"update", *exclude}
for event_name, observer in router.observers.items():
if event_name in exclude_events:
continue
observer.outer_middleware(self)
return self
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any],
) -> Any:
current_locale = await self.get_locale(event=event, data=data) or self.i18n.default_locale
if self.i18n_key:
data[self.i18n_key] = self.i18n
if self.middleware_key:
data[self.middleware_key] = self
with self.i18n.context(), self.i18n.use_locale(current_locale):
return await handler(event, data)
@abstractmethod
async def get_locale(self, event: TelegramObject, data: Dict[str, Any]) -> str:
"""
Detect current user locale based on event and context.
**This method must be defined in child classes**
:param event:
:param data:
:return:
"""
pass
class SimpleI18nMiddleware(I18nMiddleware):
"""
Simple I18n middleware.
Chooses language code from the User object received in event
"""
def __init__(
self,
i18n: I18n,
i18n_key: Optional[str] = "i18n",
middleware_key: str = "i18n_middleware",
) -> None:
super().__init__(i18n=i18n, i18n_key=i18n_key, middleware_key=middleware_key)
if Locale is None: # pragma: no cover
raise RuntimeError(
f"{type(self).__name__} can be used only when Babel installed\n"
"Just install Babel (`pip install Babel`) "
"or aiogram with i18n support (`pip install aiogram[i18n]`)"
)
async def get_locale(self, event: TelegramObject, data: Dict[str, Any]) -> str:
if Locale is None: # pragma: no cover
raise RuntimeError(
f"{type(self).__name__} can be used only when Babel installed\n"
"Just install Babel (`pip install Babel`) "
"or aiogram with i18n support (`pip install aiogram[i18n]`)"
)
event_from_user: Optional[User] = data.get("event_from_user", None)
if event_from_user is None or event_from_user.language_code is None:
return self.i18n.default_locale
try:
locale = Locale.parse(event_from_user.language_code, sep="-")
except UnknownLocaleError:
return self.i18n.default_locale
if locale.language not in self.i18n.available_locales:
return self.i18n.default_locale
return locale.language
class ConstI18nMiddleware(I18nMiddleware):
"""
Const middleware chooses statically defined locale
"""
def __init__(
self,
locale: str,
i18n: I18n,
i18n_key: Optional[str] = "i18n",
middleware_key: str = "i18n_middleware",
) -> None:
super().__init__(i18n=i18n, i18n_key=i18n_key, middleware_key=middleware_key)
self.locale = locale
async def get_locale(self, event: TelegramObject, data: Dict[str, Any]) -> str:
return self.locale
class FSMI18nMiddleware(SimpleI18nMiddleware):
"""
This middleware stores locale in the FSM storage
"""
def __init__(
self,
i18n: I18n,
key: str = "locale",
i18n_key: Optional[str] = "i18n",
middleware_key: str = "i18n_middleware",
) -> None:
super().__init__(i18n=i18n, i18n_key=i18n_key, middleware_key=middleware_key)
self.key = key
async def get_locale(self, event: TelegramObject, data: Dict[str, Any]) -> str:
fsm_context: Optional[FSMContext] = data.get("state")
locale = None
if fsm_context:
fsm_data = await fsm_context.get_data()
locale = fsm_data.get(self.key, None)
if not locale:
locale = await super().get_locale(event=event, data=data)
if fsm_context:
await fsm_context.update_data(data={self.key: locale})
return locale
async def set_locale(self, state: FSMContext, locale: str) -> None:
"""
Write new locale to the storage
:param state: instance of FSMContext
:param locale: new locale
"""
await state.update_data(data={self.key: locale})
self.i18n.current_locale = locale

View File

@@ -0,0 +1,427 @@
from __future__ import annotations
from abc import ABC
from copy import deepcopy
from itertools import chain
from itertools import cycle as repeat_all
from typing import (
TYPE_CHECKING,
Any,
Generator,
Generic,
Iterable,
List,
Optional,
Type,
TypeVar,
Union,
cast,
)
from aiogram.filters.callback_data import CallbackData
from aiogram.types import (
CallbackGame,
CopyTextButton,
InlineKeyboardButton,
InlineKeyboardMarkup,
KeyboardButton,
KeyboardButtonPollType,
KeyboardButtonRequestChat,
KeyboardButtonRequestUsers,
LoginUrl,
ReplyKeyboardMarkup,
SwitchInlineQueryChosenChat,
WebAppInfo,
)
ButtonType = TypeVar("ButtonType", InlineKeyboardButton, KeyboardButton)
T = TypeVar("T")
class KeyboardBuilder(Generic[ButtonType], ABC):
"""
Generic keyboard builder that helps to adjust your markup with defined shape of lines.
Works both of InlineKeyboardMarkup and ReplyKeyboardMarkup.
"""
max_width: int = 0
min_width: int = 0
max_buttons: int = 0
def __init__(
self, button_type: Type[ButtonType], markup: Optional[List[List[ButtonType]]] = None
) -> None:
if not issubclass(button_type, (InlineKeyboardButton, KeyboardButton)):
raise ValueError(f"Button type {button_type} are not allowed here")
self._button_type: Type[ButtonType] = button_type
if markup:
self._validate_markup(markup)
else:
markup = []
self._markup: List[List[ButtonType]] = markup
@property
def buttons(self) -> Generator[ButtonType, None, None]:
"""
Get flatten set of all buttons
:return:
"""
yield from chain.from_iterable(self.export())
def _validate_button(self, button: ButtonType) -> bool:
"""
Check that button item has correct type
:param button:
:return:
"""
allowed = self._button_type
if not isinstance(button, allowed):
raise ValueError(
f"{button!r} should be type {allowed.__name__!r} not {type(button).__name__!r}"
)
return True
def _validate_buttons(self, *buttons: ButtonType) -> bool:
"""
Check that all passed button has correct type
:param buttons:
:return:
"""
return all(map(self._validate_button, buttons))
def _validate_row(self, row: List[ButtonType]) -> bool:
"""
Check that row of buttons are correct
Row can be only list of allowed button types and has length 0 <= n <= 8
:param row:
:return:
"""
if not isinstance(row, list):
raise ValueError(
f"Row {row!r} should be type 'List[{self._button_type.__name__}]' "
f"not type {type(row).__name__}"
)
if len(row) > self.max_width:
raise ValueError(f"Row {row!r} is too long (max width: {self.max_width})")
self._validate_buttons(*row)
return True
def _validate_markup(self, markup: List[List[ButtonType]]) -> bool:
"""
Check that passed markup has correct data structure
Markup is list of lists of buttons
:param markup:
:return:
"""
count = 0
if not isinstance(markup, list):
raise ValueError(
f"Markup should be type 'List[List[{self._button_type.__name__}]]' "
f"not type {type(markup).__name__!r}"
)
for row in markup:
self._validate_row(row)
count += len(row)
if count > self.max_buttons:
raise ValueError(f"Too much buttons detected Max allowed count - {self.max_buttons}")
return True
def _validate_size(self, size: Any) -> int:
"""
Validate that passed size is legit
:param size:
:return:
"""
if not isinstance(size, int):
raise ValueError("Only int sizes are allowed")
if size not in range(self.min_width, self.max_width + 1):
raise ValueError(
f"Row size {size} is not allowed, range: [{self.min_width}, {self.max_width}]"
)
return size
def export(self) -> List[List[ButtonType]]:
"""
Export configured markup as list of lists of buttons
.. code-block:: python
>>> builder = KeyboardBuilder(button_type=InlineKeyboardButton)
>>> ... # Add buttons to builder
>>> markup = InlineKeyboardMarkup(inline_keyboard=builder.export())
:return:
"""
return deepcopy(self._markup)
def add(self, *buttons: ButtonType) -> "KeyboardBuilder[ButtonType]":
"""
Add one or many buttons to markup.
:param buttons:
:return:
"""
self._validate_buttons(*buttons)
markup = self.export()
# Try to add new buttons to the end of last row if it possible
if markup and len(markup[-1]) < self.max_width:
last_row = markup[-1]
pos = self.max_width - len(last_row)
head, buttons = buttons[:pos], buttons[pos:]
last_row.extend(head)
# Separate buttons to exclusive rows with max possible row width
if self.max_width > 0:
while buttons:
row, buttons = buttons[: self.max_width], buttons[self.max_width :]
markup.append(list(row))
else:
markup.append(list(buttons))
self._markup = markup
return self
def row(
self, *buttons: ButtonType, width: Optional[int] = None
) -> "KeyboardBuilder[ButtonType]":
"""
Add row to markup
When too much buttons is passed it will be separated to many rows
:param buttons:
:param width:
:return:
"""
if width is None:
width = self.max_width
self._validate_size(width)
self._validate_buttons(*buttons)
self._markup.extend(
list(buttons[pos : pos + width]) for pos in range(0, len(buttons), width)
)
return self
def adjust(self, *sizes: int, repeat: bool = False) -> "KeyboardBuilder[ButtonType]":
"""
Adjust previously added buttons to specific row sizes.
By default, when the sum of passed sizes is lower than buttons count the last
one size will be used for tail of the markup.
If repeat=True is passed - all sizes will be cycled when available more buttons
count than all sizes
:param sizes:
:param repeat:
:return:
"""
if not sizes:
sizes = (self.max_width,)
validated_sizes = map(self._validate_size, sizes)
sizes_iter = repeat_all(validated_sizes) if repeat else repeat_last(validated_sizes)
size = next(sizes_iter)
markup = []
row: List[ButtonType] = []
for button in self.buttons:
if len(row) >= size:
markup.append(row)
size = next(sizes_iter)
row = []
row.append(button)
if row:
markup.append(row)
self._markup = markup
return self
def _button(self, **kwargs: Any) -> "KeyboardBuilder[ButtonType]":
"""
Add button to markup
:param kwargs:
:return:
"""
if isinstance(callback_data := kwargs.get("callback_data", None), CallbackData):
kwargs["callback_data"] = callback_data.pack()
button = self._button_type(**kwargs)
return self.add(button)
def as_markup(self, **kwargs: Any) -> Union[InlineKeyboardMarkup, ReplyKeyboardMarkup]:
if self._button_type is KeyboardButton:
keyboard = cast(List[List[KeyboardButton]], self.export()) # type: ignore
return ReplyKeyboardMarkup(keyboard=keyboard, **kwargs)
inline_keyboard = cast(List[List[InlineKeyboardButton]], self.export()) # type: ignore
return InlineKeyboardMarkup(inline_keyboard=inline_keyboard)
def attach(self, builder: "KeyboardBuilder[ButtonType]") -> "KeyboardBuilder[ButtonType]":
if not isinstance(builder, KeyboardBuilder):
raise ValueError(f"Only KeyboardBuilder can be attached, not {type(builder).__name__}")
if builder._button_type is not self._button_type:
raise ValueError(
f"Only builders with same button type can be attached, "
f"not {self._button_type.__name__} and {builder._button_type.__name__}"
)
self._markup.extend(builder.export())
return self
def repeat_last(items: Iterable[T]) -> Generator[T, None, None]:
items_iter = iter(items)
try:
value = next(items_iter)
except StopIteration: # pragma: no cover
# Possible case but not in place where this function is used
return
yield value
finished = False
while True:
if not finished:
try:
value = next(items_iter)
except StopIteration:
finished = True
yield value
class InlineKeyboardBuilder(KeyboardBuilder[InlineKeyboardButton]):
"""
Inline keyboard builder inherits all methods from generic builder
"""
max_width: int = 8
min_width: int = 1
max_buttons: int = 100
def button(
self,
*,
text: str,
url: Optional[str] = None,
callback_data: Optional[Union[str, CallbackData]] = None,
web_app: Optional[WebAppInfo] = None,
login_url: Optional[LoginUrl] = None,
switch_inline_query: Optional[str] = None,
switch_inline_query_current_chat: Optional[str] = None,
switch_inline_query_chosen_chat: Optional[SwitchInlineQueryChosenChat] = None,
copy_text: Optional[CopyTextButton] = None,
callback_game: Optional[CallbackGame] = None,
pay: Optional[bool] = None,
**kwargs: Any,
) -> "InlineKeyboardBuilder":
return cast(
InlineKeyboardBuilder,
self._button(
text=text,
url=url,
callback_data=callback_data,
web_app=web_app,
login_url=login_url,
switch_inline_query=switch_inline_query,
switch_inline_query_current_chat=switch_inline_query_current_chat,
switch_inline_query_chosen_chat=switch_inline_query_chosen_chat,
copy_text=copy_text,
callback_game=callback_game,
pay=pay,
**kwargs,
),
)
if TYPE_CHECKING:
def as_markup(self, **kwargs: Any) -> InlineKeyboardMarkup:
"""Construct an InlineKeyboardMarkup"""
...
def __init__(self, markup: Optional[List[List[InlineKeyboardButton]]] = None) -> None:
super().__init__(button_type=InlineKeyboardButton, markup=markup)
def copy(self: "InlineKeyboardBuilder") -> "InlineKeyboardBuilder":
"""
Make full copy of current builder with markup
:return:
"""
return InlineKeyboardBuilder(markup=self.export())
@classmethod
def from_markup(
cls: Type["InlineKeyboardBuilder"], markup: InlineKeyboardMarkup
) -> "InlineKeyboardBuilder":
"""
Create builder from existing markup
:param markup:
:return:
"""
return cls(markup=markup.inline_keyboard)
class ReplyKeyboardBuilder(KeyboardBuilder[KeyboardButton]):
"""
Reply keyboard builder inherits all methods from generic builder
"""
max_width: int = 10
min_width: int = 1
max_buttons: int = 300
def button(
self,
*,
text: str,
request_users: Optional[KeyboardButtonRequestUsers] = None,
request_chat: Optional[KeyboardButtonRequestChat] = None,
request_contact: Optional[bool] = None,
request_location: Optional[bool] = None,
request_poll: Optional[KeyboardButtonPollType] = None,
web_app: Optional[WebAppInfo] = None,
**kwargs: Any,
) -> "ReplyKeyboardBuilder":
return cast(
ReplyKeyboardBuilder,
self._button(
text=text,
request_users=request_users,
request_chat=request_chat,
request_contact=request_contact,
request_location=request_location,
request_poll=request_poll,
web_app=web_app,
**kwargs,
),
)
if TYPE_CHECKING:
def as_markup(self, **kwargs: Any) -> ReplyKeyboardMarkup: ...
def __init__(self, markup: Optional[List[List[KeyboardButton]]] = None) -> None:
super().__init__(button_type=KeyboardButton, markup=markup)
def copy(self: "ReplyKeyboardBuilder") -> "ReplyKeyboardBuilder":
"""
Make full copy of current builder with markup
:return:
"""
return ReplyKeyboardBuilder(markup=self.export())
@classmethod
def from_markup(cls, markup: ReplyKeyboardMarkup) -> "ReplyKeyboardBuilder":
"""
Create builder from existing markup
:param markup:
:return:
"""
return cls(markup=markup.keyboard)

View File

@@ -0,0 +1,74 @@
from typing import Any, Optional
from urllib.parse import urlencode, urljoin
BASE_DOCS_URL = "https://docs.aiogram.dev/"
BRANCH = "dev-3.x"
BASE_PAGE_URL = f"{BASE_DOCS_URL}/en/{BRANCH}/"
def _format_url(url: str, *path: str, fragment_: Optional[str] = None, **query: Any) -> str:
url = urljoin(url, "/".join(path), allow_fragments=True)
if query:
url += "?" + urlencode(query)
if fragment_:
url += "#" + fragment_
return url
def docs_url(*path: str, fragment_: Optional[str] = None, **query: Any) -> str:
return _format_url(BASE_PAGE_URL, *path, fragment_=fragment_, **query)
def create_tg_link(link: str, **kwargs: Any) -> str:
return _format_url(f"tg://{link}", **kwargs)
def create_telegram_link(*path: str, **kwargs: Any) -> str:
return _format_url("https://t.me", *path, **kwargs)
def create_channel_bot_link(
username: str,
parameter: Optional[str] = None,
change_info: bool = False,
post_messages: bool = False,
edit_messages: bool = False,
delete_messages: bool = False,
restrict_members: bool = False,
invite_users: bool = False,
pin_messages: bool = False,
promote_members: bool = False,
manage_video_chats: bool = False,
anonymous: bool = False,
manage_chat: bool = False,
) -> str:
params = {}
if parameter is not None:
params["startgroup"] = parameter
permissions = []
if change_info:
permissions.append("change_info")
if post_messages:
permissions.append("post_messages")
if edit_messages:
permissions.append("edit_messages")
if delete_messages:
permissions.append("delete_messages")
if restrict_members:
permissions.append("restrict_members")
if invite_users:
permissions.append("invite_users")
if pin_messages:
permissions.append("pin_messages")
if promote_members:
permissions.append("promote_members")
if manage_video_chats:
permissions.append("manage_video_chats")
if anonymous:
permissions.append("anonymous")
if manage_chat:
permissions.append("manage_chat")
if permissions:
params["admin"] = "+".join(permissions)
return create_telegram_link(username, **params)

View File

@@ -0,0 +1,22 @@
from typing import Any, Iterable
from magic_filter import MagicFilter as _MagicFilter
from magic_filter import MagicT as _MagicT
from magic_filter.operations import BaseOperation
class AsFilterResultOperation(BaseOperation):
__slots__ = ("name",)
def __init__(self, name: str) -> None:
self.name = name
def resolve(self, value: Any, initial_value: Any) -> Any:
if value is None or (isinstance(value, Iterable) and not value):
return None
return {self.name: value}
class MagicFilter(_MagicFilter):
def as_(self: _MagicT, name: str) -> _MagicT:
return self._extend(AsFilterResultOperation(name=name))

View File

@@ -0,0 +1,209 @@
from typing import Any
from .text_decorations import html_decoration, markdown_decoration
def _join(*content: Any, sep: str = " ") -> str:
return sep.join(map(str, content))
def text(*content: Any, sep: str = " ") -> str:
"""
Join all elements with a separator
:param content:
:param sep:
:return:
"""
return _join(*content, sep=sep)
def bold(*content: Any, sep: str = " ") -> str:
"""
Make bold text (Markdown)
:param content:
:param sep:
:return:
"""
return markdown_decoration.bold(value=markdown_decoration.quote(_join(*content, sep=sep)))
def hbold(*content: Any, sep: str = " ") -> str:
"""
Make bold text (HTML)
:param content:
:param sep:
:return:
"""
return html_decoration.bold(value=html_decoration.quote(_join(*content, sep=sep)))
def italic(*content: Any, sep: str = " ") -> str:
"""
Make italic text (Markdown)
:param content:
:param sep:
:return:
"""
return markdown_decoration.italic(value=markdown_decoration.quote(_join(*content, sep=sep)))
def hitalic(*content: Any, sep: str = " ") -> str:
"""
Make italic text (HTML)
:param content:
:param sep:
:return:
"""
return html_decoration.italic(value=html_decoration.quote(_join(*content, sep=sep)))
def code(*content: Any, sep: str = " ") -> str:
"""
Make mono-width text (Markdown)
:param content:
:param sep:
:return:
"""
return markdown_decoration.code(value=markdown_decoration.quote(_join(*content, sep=sep)))
def hcode(*content: Any, sep: str = " ") -> str:
"""
Make mono-width text (HTML)
:param content:
:param sep:
:return:
"""
return html_decoration.code(value=html_decoration.quote(_join(*content, sep=sep)))
def pre(*content: Any, sep: str = "\n") -> str:
"""
Make mono-width text block (Markdown)
:param content:
:param sep:
:return:
"""
return markdown_decoration.pre(value=markdown_decoration.quote(_join(*content, sep=sep)))
def hpre(*content: Any, sep: str = "\n") -> str:
"""
Make mono-width text block (HTML)
:param content:
:param sep:
:return:
"""
return html_decoration.pre(value=html_decoration.quote(_join(*content, sep=sep)))
def underline(*content: Any, sep: str = " ") -> str:
"""
Make underlined text (Markdown)
:param content:
:param sep:
:return:
"""
return markdown_decoration.underline(value=markdown_decoration.quote(_join(*content, sep=sep)))
def hunderline(*content: Any, sep: str = " ") -> str:
"""
Make underlined text (HTML)
:param content:
:param sep:
:return:
"""
return html_decoration.underline(value=html_decoration.quote(_join(*content, sep=sep)))
def strikethrough(*content: Any, sep: str = " ") -> str:
"""
Make strikethrough text (Markdown)
:param content:
:param sep:
:return:
"""
return markdown_decoration.strikethrough(
value=markdown_decoration.quote(_join(*content, sep=sep))
)
def hstrikethrough(*content: Any, sep: str = " ") -> str:
"""
Make strikethrough text (HTML)
:param content:
:param sep:
:return:
"""
return html_decoration.strikethrough(value=html_decoration.quote(_join(*content, sep=sep)))
def link(title: str, url: str) -> str:
"""
Format URL (Markdown)
:param title:
:param url:
:return:
"""
return markdown_decoration.link(value=markdown_decoration.quote(title), link=url)
def hlink(title: str, url: str) -> str:
"""
Format URL (HTML)
:param title:
:param url:
:return:
"""
return html_decoration.link(value=html_decoration.quote(title), link=url)
def blockquote(*content: Any, sep: str = "\n") -> str:
"""
Make blockquote (Markdown)
:param content:
:param sep:
:return:
"""
return markdown_decoration.blockquote(
value=markdown_decoration.quote(_join(*content, sep=sep))
)
def hblockquote(*content: Any, sep: str = "\n") -> str:
"""
Make blockquote (HTML)
:param content:
:param sep:
:return:
"""
return html_decoration.blockquote(value=html_decoration.quote(_join(*content, sep=sep)))
def hide_link(url: str) -> str:
"""
Hide URL (HTML only)
Can be used for adding an image to a text message
:param url:
:return:
"""
return f'<a href="{url}">&#8203;</a>'

View File

@@ -0,0 +1,368 @@
from typing import Any, Dict, List, Literal, Optional, Union, overload
from aiogram.enums import InputMediaType
from aiogram.types import (
UNSET_PARSE_MODE,
InputFile,
InputMedia,
InputMediaAudio,
InputMediaDocument,
InputMediaPhoto,
InputMediaVideo,
MessageEntity,
)
MediaType = Union[
InputMediaAudio,
InputMediaPhoto,
InputMediaVideo,
InputMediaDocument,
]
MAX_MEDIA_GROUP_SIZE = 10
class MediaGroupBuilder:
# Animated media is not supported yet in Bot API to send as a media group
def __init__(
self,
media: Optional[List[MediaType]] = None,
caption: Optional[str] = None,
caption_entities: Optional[List[MessageEntity]] = None,
) -> None:
"""
Helper class for building media groups.
:param media: A list of media elements to add to the media group. (optional)
:param caption: Caption for the media group. (optional)
:param caption_entities: List of special entities in the caption,
like usernames, URLs, etc. (optional)
"""
self._media: List[MediaType] = []
self.caption = caption
self.caption_entities = caption_entities
self._extend(media or [])
def _add(self, media: MediaType) -> None:
if not isinstance(media, InputMedia):
raise ValueError("Media must be instance of InputMedia")
if len(self._media) >= MAX_MEDIA_GROUP_SIZE:
raise ValueError("Media group can't contain more than 10 elements")
self._media.append(media)
def _extend(self, media: List[MediaType]) -> None:
for m in media:
self._add(m)
@overload
def add(
self,
*,
type: Literal[InputMediaType.AUDIO],
media: Union[str, InputFile],
caption: Optional[str] = None,
parse_mode: Optional[str] = UNSET_PARSE_MODE,
caption_entities: Optional[List[MessageEntity]] = None,
duration: Optional[int] = None,
performer: Optional[str] = None,
title: Optional[str] = None,
**kwargs: Any,
) -> None:
pass
@overload
def add(
self,
*,
type: Literal[InputMediaType.PHOTO],
media: Union[str, InputFile],
caption: Optional[str] = None,
parse_mode: Optional[str] = UNSET_PARSE_MODE,
caption_entities: Optional[List[MessageEntity]] = None,
has_spoiler: Optional[bool] = None,
**kwargs: Any,
) -> None:
pass
@overload
def add(
self,
*,
type: Literal[InputMediaType.VIDEO],
media: Union[str, InputFile],
thumbnail: Optional[Union[InputFile, str]] = None,
caption: Optional[str] = None,
parse_mode: Optional[str] = UNSET_PARSE_MODE,
caption_entities: Optional[List[MessageEntity]] = None,
width: Optional[int] = None,
height: Optional[int] = None,
duration: Optional[int] = None,
supports_streaming: Optional[bool] = None,
has_spoiler: Optional[bool] = None,
**kwargs: Any,
) -> None:
pass
@overload
def add(
self,
*,
type: Literal[InputMediaType.DOCUMENT],
media: Union[str, InputFile],
thumbnail: Optional[Union[InputFile, str]] = None,
caption: Optional[str] = None,
parse_mode: Optional[str] = UNSET_PARSE_MODE,
caption_entities: Optional[List[MessageEntity]] = None,
disable_content_type_detection: Optional[bool] = None,
**kwargs: Any,
) -> None:
pass
def add(self, **kwargs: Any) -> None:
"""
Add a media object to the media group.
:param kwargs: Keyword arguments for the media object.
The available keyword arguments depend on the media type.
:return: None
"""
type_ = kwargs.pop("type", None)
if type_ == InputMediaType.AUDIO:
self.add_audio(**kwargs)
elif type_ == InputMediaType.PHOTO:
self.add_photo(**kwargs)
elif type_ == InputMediaType.VIDEO:
self.add_video(**kwargs)
elif type_ == InputMediaType.DOCUMENT:
self.add_document(**kwargs)
else:
raise ValueError(f"Unknown media type: {type_!r}")
def add_audio(
self,
media: Union[str, InputFile],
thumbnail: Optional[InputFile] = None,
caption: Optional[str] = None,
parse_mode: Optional[str] = UNSET_PARSE_MODE,
caption_entities: Optional[List[MessageEntity]] = None,
duration: Optional[int] = None,
performer: Optional[str] = None,
title: Optional[str] = None,
**kwargs: Any,
) -> None:
"""
Add an audio file to the media group.
:param media: File to send. Pass a file_id to send a file that exists on the
Telegram servers (recommended), pass an HTTP URL for Telegram to get a file from
the Internet, or pass 'attach://<file_attach_name>' to upload a new one using
multipart/form-data under <file_attach_name> name.
:ref:`More information on Sending Files » <sending-files>`
:param thumbnail: *Optional*. Thumbnail of the file sent; can be ignored if
thumbnail generation for the file is supported server-side. The thumbnail should
be in JPEG format and less than 200 kB in size. A thumbnail's width and height
should not exceed 320.
:param caption: *Optional*. Caption of the audio to be sent, 0-1024 characters
after entities parsing
:param parse_mode: *Optional*. Mode for parsing entities in the audio caption.
See `formatting options <https://core.telegram.org/bots/api#formatting-options>`_
for more details.
:param caption_entities: *Optional*. List of special entities that appear in the caption,
which can be specified instead of *parse_mode*
:param duration: *Optional*. Duration of the audio in seconds
:param performer: *Optional*. Performer of the audio
:param title: *Optional*. Title of the audio
:return: None
"""
self._add(
InputMediaAudio(
media=media,
thumbnail=thumbnail,
caption=caption,
parse_mode=parse_mode,
caption_entities=caption_entities,
duration=duration,
performer=performer,
title=title,
**kwargs,
)
)
def add_photo(
self,
media: Union[str, InputFile],
caption: Optional[str] = None,
parse_mode: Optional[str] = UNSET_PARSE_MODE,
caption_entities: Optional[List[MessageEntity]] = None,
has_spoiler: Optional[bool] = None,
**kwargs: Any,
) -> None:
"""
Add a photo to the media group.
:param media: File to send. Pass a file_id to send a file that exists on the
Telegram servers (recommended), pass an HTTP URL for Telegram to get a file
from the Internet, or pass 'attach://<file_attach_name>' to upload a new
one using multipart/form-data under <file_attach_name> name.
:ref:`More information on Sending Files » <sending-files>`
:param caption: *Optional*. Caption of the photo to be sent, 0-1024 characters
after entities parsing
:param parse_mode: *Optional*. Mode for parsing entities in the photo caption.
See `formatting options <https://core.telegram.org/bots/api#formatting-options>`_
for more details.
:param caption_entities: *Optional*. List of special entities that appear in the caption,
which can be specified instead of *parse_mode*
:param has_spoiler: *Optional*. Pass :code:`True` if the photo needs to be covered
with a spoiler animation
:return: None
"""
self._add(
InputMediaPhoto(
media=media,
caption=caption,
parse_mode=parse_mode,
caption_entities=caption_entities,
has_spoiler=has_spoiler,
**kwargs,
)
)
def add_video(
self,
media: Union[str, InputFile],
thumbnail: Optional[InputFile] = None,
caption: Optional[str] = None,
parse_mode: Optional[str] = UNSET_PARSE_MODE,
caption_entities: Optional[List[MessageEntity]] = None,
width: Optional[int] = None,
height: Optional[int] = None,
duration: Optional[int] = None,
supports_streaming: Optional[bool] = None,
has_spoiler: Optional[bool] = None,
**kwargs: Any,
) -> None:
"""
Add a video to the media group.
:param media: File to send. Pass a file_id to send a file that exists on the
Telegram servers (recommended), pass an HTTP URL for Telegram to get a file
from the Internet, or pass 'attach://<file_attach_name>' to upload a new one
using multipart/form-data under <file_attach_name> name.
:ref:`More information on Sending Files » <sending-files>`
:param thumbnail: *Optional*. Thumbnail of the file sent; can be ignored if thumbnail
generation for the file is supported server-side. The thumbnail should be in JPEG
format and less than 200 kB in size. A thumbnail's width and height should
not exceed 320. Ignored if the file is not uploaded using multipart/form-data.
Thumbnails can't be reused and can be only uploaded as a new file, so you
can pass 'attach://<file_attach_name>' if the thumbnail was uploaded using
multipart/form-data under <file_attach_name>.
:ref:`More information on Sending Files » <sending-files>`
:param caption: *Optional*. Caption of the video to be sent,
0-1024 characters after entities parsing
:param parse_mode: *Optional*. Mode for parsing entities in the video caption.
See `formatting options <https://core.telegram.org/bots/api#formatting-options>`_
for more details.
:param caption_entities: *Optional*. List of special entities that appear in the caption,
which can be specified instead of *parse_mode*
:param width: *Optional*. Video width
:param height: *Optional*. Video height
:param duration: *Optional*. Video duration in seconds
:param supports_streaming: *Optional*. Pass :code:`True` if the uploaded video is
suitable for streaming
:param has_spoiler: *Optional*. Pass :code:`True` if the video needs to be covered
with a spoiler animation
:return: None
"""
self._add(
InputMediaVideo(
media=media,
thumbnail=thumbnail,
caption=caption,
parse_mode=parse_mode,
caption_entities=caption_entities,
width=width,
height=height,
duration=duration,
supports_streaming=supports_streaming,
has_spoiler=has_spoiler,
**kwargs,
)
)
def add_document(
self,
media: Union[str, InputFile],
thumbnail: Optional[InputFile] = None,
caption: Optional[str] = None,
parse_mode: Optional[str] = UNSET_PARSE_MODE,
caption_entities: Optional[List[MessageEntity]] = None,
disable_content_type_detection: Optional[bool] = None,
**kwargs: Any,
) -> None:
"""
Add a document to the media group.
:param media: File to send. Pass a file_id to send a file that exists on the
Telegram servers (recommended), pass an HTTP URL for Telegram to get a file
from the Internet, or pass 'attach://<file_attach_name>' to upload a new one using
multipart/form-data under <file_attach_name> name.
:ref:`More information on Sending Files » <sending-files>`
:param thumbnail: *Optional*. Thumbnail of the file sent; can be ignored
if thumbnail generation for the file is supported server-side.
The thumbnail should be in JPEG format and less than 200 kB in size.
A thumbnail's width and height should not exceed 320.
Ignored if the file is not uploaded using multipart/form-data.
Thumbnails can't be reused and can be only uploaded as a new file,
so you can pass 'attach://<file_attach_name>' if the thumbnail was uploaded
using multipart/form-data under <file_attach_name>.
:ref:`More information on Sending Files » <sending-files>`
:param caption: *Optional*. Caption of the document to be sent,
0-1024 characters after entities parsing
:param parse_mode: *Optional*. Mode for parsing entities in the document caption.
See `formatting options <https://core.telegram.org/bots/api#formatting-options>`_
for more details.
:param caption_entities: *Optional*. List of special entities that appear
in the caption, which can be specified instead of *parse_mode*
:param disable_content_type_detection: *Optional*. Disables automatic server-side
content type detection for files uploaded using multipart/form-data.
Always :code:`True`, if the document is sent as part of an album.
:return: None
"""
self._add(
InputMediaDocument(
media=media,
thumbnail=thumbnail,
caption=caption,
parse_mode=parse_mode,
caption_entities=caption_entities,
disable_content_type_detection=disable_content_type_detection,
**kwargs,
)
)
def build(self) -> List[MediaType]:
"""
Builds a list of media objects for a media group.
Adds the caption to the first media object if it is present.
:return: List of media objects.
"""
update_first_media: Dict[str, Any] = {"caption": self.caption}
if self.caption_entities is not None:
update_first_media["caption_entities"] = self.caption_entities
update_first_media["parse_mode"] = None
return [
(
media.model_copy(update=update_first_media)
if index == 0 and self.caption is not None
else media
)
for index, media in enumerate(self._media)
]

View File

@@ -0,0 +1,95 @@
from __future__ import annotations
import contextvars
from typing import TYPE_CHECKING, Any, Dict, Generic, Optional, TypeVar, cast, overload
if TYPE_CHECKING:
from typing_extensions import Literal
__all__ = ("ContextInstanceMixin", "DataMixin")
class DataMixin:
@property
def data(self) -> Dict[str, Any]:
data: Optional[Dict[str, Any]] = getattr(self, "_data", None)
if data is None:
data = {}
setattr(self, "_data", data)
return data
def __getitem__(self, key: str) -> Any:
return self.data[key]
def __setitem__(self, key: str, value: Any) -> None:
self.data[key] = value
def __delitem__(self, key: str) -> None:
del self.data[key]
def __contains__(self, key: str) -> bool:
return key in self.data
def get(self, key: str, default: Optional[Any] = None) -> Optional[Any]:
return self.data.get(key, default)
ContextInstance = TypeVar("ContextInstance")
class ContextInstanceMixin(Generic[ContextInstance]):
__context_instance: contextvars.ContextVar[ContextInstance]
def __init_subclass__(cls, **kwargs: Any) -> None:
super().__init_subclass__()
cls.__context_instance = contextvars.ContextVar(f"instance_{cls.__name__}")
@overload # noqa: F811
@classmethod
def get_current(cls) -> Optional[ContextInstance]: # pragma: no cover # noqa: F811
...
@overload # noqa: F811
@classmethod
def get_current( # noqa: F811
cls, no_error: Literal[True]
) -> Optional[ContextInstance]: # pragma: no cover # noqa: F811
...
@overload # noqa: F811
@classmethod
def get_current( # noqa: F811
cls, no_error: Literal[False]
) -> ContextInstance: # pragma: no cover # noqa: F811
...
@classmethod # noqa: F811
def get_current( # noqa: F811
cls, no_error: bool = True
) -> Optional[ContextInstance]: # pragma: no cover # noqa: F811
# on mypy 0.770 I catch that contextvars.ContextVar always contextvars.ContextVar[Any]
cls.__context_instance = cast(
contextvars.ContextVar[ContextInstance], cls.__context_instance
)
try:
current: Optional[ContextInstance] = cls.__context_instance.get()
except LookupError:
if no_error:
current = None
else:
raise
return current
@classmethod
def set_current(cls, value: ContextInstance) -> contextvars.Token[ContextInstance]:
if not isinstance(value, cls):
raise TypeError(
f"Value should be instance of {cls.__name__!r} not {type(value).__name__!r}"
)
return cls.__context_instance.set(value)
@classmethod
def reset_current(cls, token: contextvars.Token[ContextInstance]) -> None:
cls.__context_instance.reset(token)

View File

@@ -0,0 +1,16 @@
import functools
from typing import Callable, TypeVar
T = TypeVar("T")
def lru_cache(maxsize: int = 128, typed: bool = False) -> Callable[[T], T]:
"""
fix: lru_cache annotation doesn't work with a property
this hack is only needed for the property, so type annotations are as they are
"""
def wrapper(func: T) -> T:
return functools.lru_cache(maxsize, typed)(func) # type: ignore
return wrapper

View File

@@ -0,0 +1,109 @@
"""
Payload preparing
We have added some utils to make work with payload easier.
Basic encode example:
.. code-block:: python
from aiogram.utils.payload import encode_payload
encoded = encode_payload("foo")
# result: "Zm9v"
Basic decode it back example:
.. code-block:: python
from aiogram.utils.payload import decode_payload
encoded = "Zm9v"
decoded = decode_payload(encoded)
# result: "foo"
Encoding and decoding with your own methods:
1. Create your own cryptor
.. code-block:: python
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import pad, unpad
class Cryptor:
def __init__(self, key: str):
self.key = key.encode("utf-8")
self.mode = AES.MODE_ECB # never use ECB in strong systems obviously
self.size = 32
@property
def cipher(self):
return AES.new(self.key, self.mode)
def encrypt(self, data: bytes) -> bytes:
return self.cipher.encrypt(pad(data, self.size))
def decrypt(self, data: bytes) -> bytes:
decrypted_data = self.cipher.decrypt(data)
return unpad(decrypted_data, self.size)
2. Pass cryptor callable methods to aiogram payload tools
.. code-block:: python
cryptor = Cryptor("abcdefghijklmnop")
encoded = encode_payload("foo", encoder=cryptor.encrypt)
decoded = decode_payload(encoded_payload, decoder=cryptor.decrypt)
# result: decoded == "foo"
"""
from base64 import urlsafe_b64decode, urlsafe_b64encode
from typing import Callable, Optional
def encode_payload(
payload: str,
encoder: Optional[Callable[[bytes], bytes]] = None,
) -> str:
"""Encode payload with encoder.
Result also will be encoded with URL-safe base64url.
"""
if not isinstance(payload, str):
payload = str(payload)
payload_bytes = payload.encode("utf-8")
if encoder is not None:
payload_bytes = encoder(payload_bytes)
return _encode_b64(payload_bytes)
def decode_payload(
payload: str,
decoder: Optional[Callable[[bytes], bytes]] = None,
) -> str:
"""Decode URL-safe base64url payload with decoder."""
original_payload = _decode_b64(payload)
if decoder is None:
return original_payload.decode()
return decoder(original_payload).decode()
def _encode_b64(payload: bytes) -> str:
"""Encode with URL-safe base64url."""
bytes_payload: bytes = urlsafe_b64encode(payload)
str_payload = bytes_payload.decode()
return str_payload.replace("=", "")
def _decode_b64(payload: str) -> bytes:
"""Decode with URL-safe base64url."""
payload += "=" * (4 - len(payload) % 4)
return urlsafe_b64decode(payload.encode())

View File

@@ -0,0 +1,89 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional
from pydantic import BaseModel
from aiogram import Bot
from aiogram.client.default import DefaultBotProperties
from aiogram.methods import TelegramMethod
from aiogram.types import InputFile
def _get_fake_bot(default: Optional[DefaultBotProperties] = None) -> Bot:
if default is None:
default = DefaultBotProperties()
return Bot(token="42:Fake", default=default)
@dataclass
class DeserializedTelegramObject:
"""
Represents a dumped Telegram object.
:param data: The dumped data of the Telegram object.
:type data: Any
:param files: The dictionary containing the file names as keys
and the corresponding `InputFile` objects as values.
:type files: Dict[str, InputFile]
"""
data: Any
files: Dict[str, InputFile]
def deserialize_telegram_object(
obj: Any,
default: Optional[DefaultBotProperties] = None,
include_api_method_name: bool = True,
) -> DeserializedTelegramObject:
"""
Deserialize Telegram Object to JSON compatible Python object.
:param obj: The object to be deserialized.
:param default: Default bot properties
should be passed only if you want to use custom defaults.
:param include_api_method_name: Whether to include the API method name in the result.
:return: The deserialized Telegram object.
"""
extends = {}
if include_api_method_name and isinstance(obj, TelegramMethod):
extends["method"] = obj.__api_method__
if isinstance(obj, BaseModel):
obj = obj.model_dump(mode="python", warnings=False)
# Fake bot is needed to exclude global defaults from the object.
fake_bot = _get_fake_bot(default=default)
files: Dict[str, InputFile] = {}
prepared = fake_bot.session.prepare_value(
obj,
bot=fake_bot,
files=files,
_dumps_json=False,
)
if isinstance(prepared, dict):
prepared.update(extends)
return DeserializedTelegramObject(data=prepared, files=files)
def deserialize_telegram_object_to_python(
obj: Any,
default: Optional[DefaultBotProperties] = None,
include_api_method_name: bool = True,
) -> Any:
"""
Deserialize telegram object to JSON compatible Python object excluding files.
:param obj: The telegram object to be deserialized.
:param default: Default bot properties
should be passed only if you want to use custom defaults.
:param include_api_method_name: Whether to include the API method name in the result.
:return: The deserialized telegram object.
"""
return deserialize_telegram_object(
obj,
default=default,
include_api_method_name=include_api_method_name,
).data

View File

@@ -0,0 +1,274 @@
from __future__ import annotations
import html
import re
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Generator, List, Optional, Pattern, cast
from aiogram.enums import MessageEntityType
if TYPE_CHECKING:
from aiogram.types import MessageEntity
__all__ = (
"HtmlDecoration",
"MarkdownDecoration",
"TextDecoration",
"html_decoration",
"markdown_decoration",
"add_surrogates",
"remove_surrogates",
)
def add_surrogates(text: str) -> bytes:
return text.encode("utf-16-le")
def remove_surrogates(text: bytes) -> str:
return text.decode("utf-16-le")
class TextDecoration(ABC):
def apply_entity(self, entity: MessageEntity, text: str) -> str:
"""
Apply single entity to text
:param entity:
:param text:
:return:
"""
if entity.type in {
MessageEntityType.BOT_COMMAND,
MessageEntityType.URL,
MessageEntityType.MENTION,
MessageEntityType.PHONE_NUMBER,
MessageEntityType.HASHTAG,
MessageEntityType.CASHTAG,
MessageEntityType.EMAIL,
}:
# These entities should not be changed
return text
if entity.type in {
MessageEntityType.BOLD,
MessageEntityType.ITALIC,
MessageEntityType.CODE,
MessageEntityType.UNDERLINE,
MessageEntityType.STRIKETHROUGH,
MessageEntityType.SPOILER,
MessageEntityType.BLOCKQUOTE,
MessageEntityType.EXPANDABLE_BLOCKQUOTE,
}:
return cast(str, getattr(self, entity.type)(value=text))
if entity.type == MessageEntityType.PRE:
return (
self.pre_language(value=text, language=entity.language)
if entity.language
else self.pre(value=text)
)
if entity.type == MessageEntityType.TEXT_MENTION:
from aiogram.types import User
user = cast(User, entity.user)
return self.link(value=text, link=f"tg://user?id={user.id}")
if entity.type == MessageEntityType.TEXT_LINK:
return self.link(value=text, link=cast(str, entity.url))
if entity.type == MessageEntityType.CUSTOM_EMOJI:
return self.custom_emoji(value=text, custom_emoji_id=cast(str, entity.custom_emoji_id))
# This case is not possible because of `if` above, but if any new entity is added to
# API it will be here too
return self.quote(text)
def unparse(self, text: str, entities: Optional[List[MessageEntity]] = None) -> str:
"""
Unparse message entities
:param text: raw text
:param entities: Array of MessageEntities
:return:
"""
return "".join(
self._unparse_entities(
add_surrogates(text),
sorted(entities, key=lambda item: item.offset) if entities else [],
)
)
def _unparse_entities(
self,
text: bytes,
entities: List[MessageEntity],
offset: Optional[int] = None,
length: Optional[int] = None,
) -> Generator[str, None, None]:
if offset is None:
offset = 0
length = length or len(text)
for index, entity in enumerate(entities):
if entity.offset * 2 < offset:
continue
if entity.offset * 2 > offset:
yield self.quote(remove_surrogates(text[offset : entity.offset * 2]))
start = entity.offset * 2
offset = entity.offset * 2 + entity.length * 2
sub_entities = list(
filter(lambda e: e.offset * 2 < (offset or 0), entities[index + 1 :])
)
yield self.apply_entity(
entity,
"".join(self._unparse_entities(text, sub_entities, offset=start, length=offset)),
)
if offset < length:
yield self.quote(remove_surrogates(text[offset:length]))
@abstractmethod
def link(self, value: str, link: str) -> str:
pass
@abstractmethod
def bold(self, value: str) -> str:
pass
@abstractmethod
def italic(self, value: str) -> str:
pass
@abstractmethod
def code(self, value: str) -> str:
pass
@abstractmethod
def pre(self, value: str) -> str:
pass
@abstractmethod
def pre_language(self, value: str, language: str) -> str:
pass
@abstractmethod
def underline(self, value: str) -> str:
pass
@abstractmethod
def strikethrough(self, value: str) -> str:
pass
@abstractmethod
def spoiler(self, value: str) -> str:
pass
@abstractmethod
def quote(self, value: str) -> str:
pass
@abstractmethod
def custom_emoji(self, value: str, custom_emoji_id: str) -> str:
pass
@abstractmethod
def blockquote(self, value: str) -> str:
pass
@abstractmethod
def expandable_blockquote(self, value: str) -> str:
pass
class HtmlDecoration(TextDecoration):
BOLD_TAG = "b"
ITALIC_TAG = "i"
UNDERLINE_TAG = "u"
STRIKETHROUGH_TAG = "s"
SPOILER_TAG = "tg-spoiler"
EMOJI_TAG = "tg-emoji"
BLOCKQUOTE_TAG = "blockquote"
def link(self, value: str, link: str) -> str:
return f'<a href="{link}">{value}</a>'
def bold(self, value: str) -> str:
return f"<{self.BOLD_TAG}>{value}</{self.BOLD_TAG}>"
def italic(self, value: str) -> str:
return f"<{self.ITALIC_TAG}>{value}</{self.ITALIC_TAG}>"
def code(self, value: str) -> str:
return f"<code>{value}</code>"
def pre(self, value: str) -> str:
return f"<pre>{value}</pre>"
def pre_language(self, value: str, language: str) -> str:
return f'<pre><code class="language-{language}">{value}</code></pre>'
def underline(self, value: str) -> str:
return f"<{self.UNDERLINE_TAG}>{value}</{self.UNDERLINE_TAG}>"
def strikethrough(self, value: str) -> str:
return f"<{self.STRIKETHROUGH_TAG}>{value}</{self.STRIKETHROUGH_TAG}>"
def spoiler(self, value: str) -> str:
return f"<{self.SPOILER_TAG}>{value}</{self.SPOILER_TAG}>"
def quote(self, value: str) -> str:
return html.escape(value, quote=False)
def custom_emoji(self, value: str, custom_emoji_id: str) -> str:
return f'<{self.EMOJI_TAG} emoji-id="{custom_emoji_id}">{value}</{self.EMOJI_TAG}>'
def blockquote(self, value: str) -> str:
return f"<{self.BLOCKQUOTE_TAG}>{value}</{self.BLOCKQUOTE_TAG}>"
def expandable_blockquote(self, value: str) -> str:
return f"<{self.BLOCKQUOTE_TAG} expandable>{value}</{self.BLOCKQUOTE_TAG}>"
class MarkdownDecoration(TextDecoration):
MARKDOWN_QUOTE_PATTERN: Pattern[str] = re.compile(r"([_*\[\]()~`>#+\-=|{}.!\\])")
def link(self, value: str, link: str) -> str:
return f"[{value}]({link})"
def bold(self, value: str) -> str:
return f"*{value}*"
def italic(self, value: str) -> str:
return f"_\r{value}_\r"
def code(self, value: str) -> str:
return f"`{value}`"
def pre(self, value: str) -> str:
return f"```\n{value}\n```"
def pre_language(self, value: str, language: str) -> str:
return f"```{language}\n{value}\n```"
def underline(self, value: str) -> str:
return f"__\r{value}__\r"
def strikethrough(self, value: str) -> str:
return f"~{value}~"
def spoiler(self, value: str) -> str:
return f"||{value}||"
def quote(self, value: str) -> str:
return re.sub(pattern=self.MARKDOWN_QUOTE_PATTERN, repl=r"\\\1", string=value)
def custom_emoji(self, value: str, custom_emoji_id: str) -> str:
return f'!{self.link(value=value, link=f"tg://emoji?id={custom_emoji_id}")}'
def blockquote(self, value: str) -> str:
return "\n".join(f">{line}" for line in value.splitlines())
def expandable_blockquote(self, value: str) -> str:
return "\n".join(f">{line}" for line in value.splitlines()) + "||"
html_decoration = HtmlDecoration()
markdown_decoration = MarkdownDecoration()

View File

@@ -0,0 +1,42 @@
from functools import lru_cache
class TokenValidationError(Exception):
pass
@lru_cache()
def validate_token(token: str) -> bool:
"""
Validate Telegram token
:param token:
:return:
"""
if not isinstance(token, str):
raise TokenValidationError(
f"Token is invalid! It must be 'str' type instead of {type(token)} type."
)
if any(x.isspace() for x in token):
message = "Token is invalid! It can't contains spaces."
raise TokenValidationError(message)
left, sep, right = token.partition(":")
if (not sep) or (not left.isdigit()) or (not right):
raise TokenValidationError("Token is invalid!")
return True
@lru_cache()
def extract_bot_id(token: str) -> int:
"""
Extract bot ID from Telegram token
:param token:
:return:
"""
validate_token(token)
raw_bot_id, *_ = token.split(":")
return int(raw_bot_id)

View File

@@ -0,0 +1,6 @@
class AiogramWarning(Warning):
pass
class Recommendation(AiogramWarning):
pass

View File

@@ -0,0 +1,183 @@
import hashlib
import hmac
import json
from datetime import datetime
from operator import itemgetter
from typing import Any, Callable, Optional
from urllib.parse import parse_qsl
from aiogram.types import TelegramObject
class WebAppChat(TelegramObject):
"""
This object represents a chat.
Source: https://core.telegram.org/bots/webapps#webappchat
"""
id: int
"""Unique identifier for this chat. This number may have more than 32 significant bits
and some programming languages may have difficulty/silent defects in interpreting it.
But it has at most 52 significant bits, so a signed 64-bit integer or double-precision
float type are safe for storing this identifier."""
type: str
"""Type of chat, can be either “group”, “supergroup” or “channel”"""
title: str
"""Title of the chat"""
username: Optional[str] = None
"""Username of the chat"""
photo_url: Optional[str] = None
"""URL of the chats photo. The photo can be in .jpeg or .svg formats.
Only returned for Web Apps launched from the attachment menu."""
class WebAppUser(TelegramObject):
"""
This object contains the data of the Web App user.
Source: https://core.telegram.org/bots/webapps#webappuser
"""
id: int
"""A unique identifier for the user or bot. This number may have more than 32 significant bits
and some programming languages may have difficulty/silent defects in interpreting it.
It has at most 52 significant bits, so a 64-bit integer or a double-precision float type
is safe for storing this identifier."""
is_bot: Optional[bool] = None
"""True, if this user is a bot. Returns in the receiver field only."""
first_name: str
"""First name of the user or bot."""
last_name: Optional[str] = None
"""Last name of the user or bot."""
username: Optional[str] = None
"""Username of the user or bot."""
language_code: Optional[str] = None
"""IETF language tag of the user's language. Returns in user field only."""
is_premium: Optional[bool] = None
"""True, if this user is a Telegram Premium user."""
added_to_attachment_menu: Optional[bool] = None
"""True, if this user added the bot to the attachment menu."""
allows_write_to_pm: Optional[bool] = None
"""True, if this user allowed the bot to message them."""
photo_url: Optional[str] = None
"""URL of the users profile photo. The photo can be in .jpeg or .svg formats.
Only returned for Web Apps launched from the attachment menu."""
class WebAppInitData(TelegramObject):
"""
This object contains data that is transferred to the Web App when it is opened.
It is empty if the Web App was launched from a keyboard button.
Source: https://core.telegram.org/bots/webapps#webappinitdata
"""
query_id: Optional[str] = None
"""A unique identifier for the Web App session, required for sending messages
via the answerWebAppQuery method."""
user: Optional[WebAppUser] = None
"""An object containing data about the current user."""
receiver: Optional[WebAppUser] = None
"""An object containing data about the chat partner of the current user in the chat where
the bot was launched via the attachment menu.
Returned only for Web Apps launched via the attachment menu."""
chat: Optional[WebAppChat] = None
"""An object containing data about the chat where the bot was launched via the attachment menu.
Returned for supergroups, channels, and group chats only for Web Apps launched via the
attachment menu."""
chat_type: Optional[str] = None
"""Type of the chat from which the Web App was opened.
Can be either “sender” for a private chat with the user opening the link,
“private”, “group”, “supergroup”, or “channel”.
Returned only for Web Apps launched from direct links."""
chat_instance: Optional[str] = None
"""Global identifier, uniquely corresponding to the chat from which the Web App was opened.
Returned only for Web Apps launched from a direct link."""
start_param: Optional[str] = None
"""The value of the startattach parameter, passed via link.
Only returned for Web Apps when launched from the attachment menu via link.
The value of the start_param parameter will also be passed in the GET-parameter
tgWebAppStartParam, so the Web App can load the correct interface right away."""
can_send_after: Optional[int] = None
"""Time in seconds, after which a message can be sent via the answerWebAppQuery method."""
auth_date: datetime
"""Unix time when the form was opened."""
hash: str
"""A hash of all passed parameters, which the bot server can use to check their validity."""
def check_webapp_signature(token: str, init_data: str) -> bool:
"""
Check incoming WebApp init data signature
Source: https://core.telegram.org/bots/webapps#validating-data-received-via-the-web-app
:param token: bot Token
:param init_data: data from frontend to be validated
:return:
"""
try:
parsed_data = dict(parse_qsl(init_data, strict_parsing=True))
except ValueError: # pragma: no cover
# Init data is not a valid query string
return False
if "hash" not in parsed_data:
# Hash is not present in init data
return False
hash_ = parsed_data.pop("hash")
data_check_string = "\n".join(
f"{k}={v}" for k, v in sorted(parsed_data.items(), key=itemgetter(0))
)
secret_key = hmac.new(key=b"WebAppData", msg=token.encode(), digestmod=hashlib.sha256)
calculated_hash = hmac.new(
key=secret_key.digest(), msg=data_check_string.encode(), digestmod=hashlib.sha256
).hexdigest()
return calculated_hash == hash_
def parse_webapp_init_data(
init_data: str,
*,
loads: Callable[..., Any] = json.loads,
) -> WebAppInitData:
"""
Parse WebApp init data and return it as WebAppInitData object
This method doesn't make any security check, so you shall not trust to this data,
use :code:`safe_parse_webapp_init_data` instead.
:param init_data: data from frontend to be parsed
:param loads:
:return:
"""
result = {}
for key, value in parse_qsl(init_data):
if (value.startswith("[") and value.endswith("]")) or (
value.startswith("{") and value.endswith("}")
):
value = loads(value)
result[key] = value
return WebAppInitData(**result)
def safe_parse_webapp_init_data(
token: str,
init_data: str,
*,
loads: Callable[..., Any] = json.loads,
) -> WebAppInitData:
"""
Validate raw WebApp init data and return it as WebAppInitData object
Raise :obj:`ValueError` when data is invalid
:param token: bot token
:param init_data: data from frontend to be parsed and validated
:param loads:
:return:
"""
if check_webapp_signature(token, init_data):
return parse_webapp_init_data(init_data, loads=loads)
raise ValueError("Invalid init data signature")