354 lines
12 KiB
Python
354 lines
12 KiB
Python
"""
|
||
Utility functions for VNDB Telegram Bot
|
||
"""
|
||
from typing import Dict, List, Any, Optional, Tuple
|
||
import json
|
||
from telegram import Update
|
||
|
||
|
||
class Formatter:
|
||
"""Utility class for formatting API responses"""
|
||
|
||
@staticmethod
|
||
def truncate_text(text: str, max_length: int = 200) -> str:
|
||
"""Truncate text to max length"""
|
||
if len(text) > max_length:
|
||
return text[:max_length-3] + "..."
|
||
return text
|
||
|
||
@staticmethod
|
||
def format_vn_item(vn: Dict[str, Any]) -> str:
|
||
"""Format a VN item for display"""
|
||
vn_id = vn.get("id", "Unknown")
|
||
title = vn.get("title", "Unknown")
|
||
original = vn.get("original", "")
|
||
released = vn.get("released", "Unknown")
|
||
rating = vn.get("rating", 0)
|
||
votecount = vn.get("votecount", 0)
|
||
|
||
text = f"**{title}** (`{vn_id}`)\n"
|
||
if original:
|
||
text += f"Оригинал: {original}\n"
|
||
text += f"Дата: {released}\n"
|
||
if rating > 0:
|
||
text += f"Рейтинг: {rating/10:.1f}/10 ({votecount} голосов)\n"
|
||
|
||
return text
|
||
|
||
@staticmethod
|
||
def format_character_item(char: Dict[str, Any]) -> str:
|
||
"""Format a character item for display"""
|
||
char_id = char.get("id", "Unknown")
|
||
name = char.get("name", "Unknown")
|
||
original = char.get("original", "")
|
||
gender = char.get("gender", "Unknown")
|
||
vns = char.get("vn", [])
|
||
|
||
text = f"**{name}** (`{char_id}`)\n"
|
||
if original:
|
||
text += f"Оригинал: {original}\n"
|
||
text += f"Пол: {gender}\n"
|
||
if vns:
|
||
text += f"Появляется в: {len(vns)} VN\n"
|
||
|
||
return text
|
||
|
||
@staticmethod
|
||
def format_release_item(release: Dict[str, Any]) -> str:
|
||
"""Format a release item for display"""
|
||
release_id = release.get("id", "Unknown")
|
||
title = release.get("title", "Unknown")
|
||
original = release.get("original", "")
|
||
released = release.get("released", "Unknown")
|
||
platform = release.get("platform", "Unknown")
|
||
release_type = release.get("type", "Unknown")
|
||
|
||
text = f"**{title}** (`{release_id}`)\n"
|
||
if original:
|
||
text += f"Оригинал: {original}\n"
|
||
text += f"Дата: {released}\n"
|
||
text += f"Платформа: {platform} | Тип: {release_type}\n"
|
||
|
||
return text
|
||
|
||
@staticmethod
|
||
def format_staff_item(staff: Dict[str, Any]) -> str:
|
||
"""Format a staff member item for display"""
|
||
staff_id = staff.get("id", "Unknown")
|
||
name = staff.get("name", "Unknown")
|
||
original = staff.get("original", "")
|
||
gender = staff.get("gender", "Unknown")
|
||
role = staff.get("role", "")
|
||
|
||
text = f"**{name}** (`{staff_id}`)\n"
|
||
if original:
|
||
text += f"Оригинал: {original}\n"
|
||
text += f"Пол: {gender}\n"
|
||
if role:
|
||
text += f"Роль: {role}\n"
|
||
|
||
return text
|
||
|
||
@staticmethod
|
||
def format_producer_item(producer: Dict[str, Any]) -> str:
|
||
"""Format a producer item for display"""
|
||
producer_id = producer.get("id", "Unknown")
|
||
name = producer.get("name", "Unknown")
|
||
original = producer.get("original", "")
|
||
producer_type = producer.get("type", "Unknown")
|
||
|
||
text = f"**{name}** (`{producer_id}`)\n"
|
||
if original:
|
||
text += f"Оригинал: {original}\n"
|
||
if producer_type:
|
||
text += f"Тип: {producer_type}\n"
|
||
|
||
return text
|
||
|
||
@staticmethod
|
||
def format_tag_item(tag: Dict[str, Any]) -> str:
|
||
"""Format a tag item for display"""
|
||
tag_id = tag.get("id", "Unknown")
|
||
name = tag.get("name", "Unknown")
|
||
description = tag.get("description", "")
|
||
vns = tag.get("vns", 0)
|
||
|
||
text = f"**{name}** (`{tag_id}`)\n"
|
||
if description:
|
||
truncated = Formatter.truncate_text(description, 100)
|
||
text += f"_{truncated}_\n"
|
||
if vns:
|
||
text += f"ВН: {vns}\n"
|
||
|
||
return text
|
||
|
||
@staticmethod
|
||
def format_trait_item(trait: Dict[str, Any]) -> str:
|
||
"""Format a trait item for display"""
|
||
trait_id = trait.get("id", "Unknown")
|
||
name = trait.get("name", "Unknown")
|
||
description = trait.get("description", "")
|
||
chars = trait.get("chars", 0)
|
||
|
||
text = f"**{name}** (`{trait_id}`)\n"
|
||
if description:
|
||
truncated = Formatter.truncate_text(description, 100)
|
||
text += f"_{truncated}_\n"
|
||
if chars:
|
||
text += f"Персонажей: {chars}\n"
|
||
|
||
return text
|
||
|
||
|
||
class ErrorHandler:
|
||
"""Error handling utilities"""
|
||
|
||
@staticmethod
|
||
def format_error(error: Exception) -> str:
|
||
"""Format error message for user"""
|
||
error_type = type(error).__name__
|
||
error_msg = str(error)
|
||
|
||
# Map common errors to user-friendly messages
|
||
error_messages = {
|
||
"HTTPStatusError": "Ошибка сервера API. Попробуйте позже.",
|
||
"ConnectError": "Ошибка подключения. Проверьте интернет соединение.",
|
||
"Timeout": "Запрос истек. Сервер слишком долго отвечает.",
|
||
"ValueError": f"Ошибка данных: {error_msg}",
|
||
"JSONDecodeError": "Ошибка парсинга ответа сервера.",
|
||
}
|
||
|
||
return error_messages.get(error_type, f"❌ {error_msg}")
|
||
|
||
|
||
class PaginationHelper:
|
||
"""Helper for pagination"""
|
||
|
||
@staticmethod
|
||
def get_page_info(page: int, results_count: int, total: Optional[int] = None) -> str:
|
||
"""Get pagination info string"""
|
||
info = f"📄 Страница {page}"
|
||
|
||
if total:
|
||
info += f" | Всего: {total}"
|
||
|
||
if results_count > 0:
|
||
info += f" | Результатов: {results_count}"
|
||
|
||
return info
|
||
|
||
|
||
class FieldValidator:
|
||
"""Validate and clean fields"""
|
||
|
||
@staticmethod
|
||
def validate_fields(fields: List[str], allowed_fields: List[str]) -> List[str]:
|
||
"""Validate that requested fields are allowed"""
|
||
return [f for f in fields if f in allowed_fields]
|
||
|
||
@staticmethod
|
||
def clean_field_list(fields_str: str) -> List[str]:
|
||
"""Parse and clean field list from string"""
|
||
fields = [f.strip() for f in fields_str.split(",")]
|
||
return [f for f in fields if f] # Remove empty strings
|
||
|
||
|
||
class QueryBuilder:
|
||
"""Helper for building API queries"""
|
||
|
||
@staticmethod
|
||
def build_search_filter(search_term: str) -> List[str]:
|
||
"""Build a search filter"""
|
||
return ["search", "=", search_term]
|
||
|
||
@staticmethod
|
||
def build_id_filter(vn_id: str) -> List[str]:
|
||
"""Build an ID filter"""
|
||
return ["id", "=", vn_id]
|
||
|
||
@staticmethod
|
||
def build_tag_filter(tag_id: str, depth: int = 0) -> Dict[str, Any]:
|
||
"""Build a tag filter with optional depth"""
|
||
filter_dict = {
|
||
"tag": tag_id,
|
||
}
|
||
if depth > 0:
|
||
filter_dict["depth"] = depth
|
||
return filter_dict
|
||
|
||
@staticmethod
|
||
def build_complex_filter(*filters: List[str]) -> List[Any]:
|
||
"""Build a complex AND filter from multiple simple filters"""
|
||
if len(filters) == 1:
|
||
return filters[0]
|
||
return ["and"] + list(filters)
|
||
|
||
|
||
class ImageHandler:
|
||
"""Handle image processing and sending"""
|
||
|
||
# VNDB Image CDN
|
||
VNDB_CDN = "https://t.vndb.org"
|
||
|
||
@staticmethod
|
||
def get_image_url(image_data: Dict[str, Any]) -> Optional[str]:
|
||
"""
|
||
Extract image URL from image data
|
||
|
||
Args:
|
||
image_data: Image data from API
|
||
|
||
Returns:
|
||
Full image URL or None
|
||
"""
|
||
if not image_data or not isinstance(image_data, dict):
|
||
return None
|
||
|
||
image_path = image_data.get("url")
|
||
if not image_path:
|
||
return None
|
||
|
||
# Construct full URL
|
||
if image_path.startswith("http"):
|
||
return image_path
|
||
|
||
return f"{ImageHandler.VNDB_CDN}{image_path}"
|
||
|
||
@staticmethod
|
||
def format_item_with_image(
|
||
item_type: str,
|
||
item: Dict[str, Any],
|
||
) -> tuple[str, Optional[str]]:
|
||
"""
|
||
Format item with image information
|
||
|
||
Args:
|
||
item_type: Type of item (vn, character, release, etc.)
|
||
item: Item data
|
||
|
||
Returns:
|
||
Tuple of (text, image_url)
|
||
"""
|
||
text = ""
|
||
image_url = None
|
||
|
||
if item_type == "vn":
|
||
item_id = item.get("id", "Unknown")
|
||
title = item.get("title", "Unknown")
|
||
original = item.get("original", "")
|
||
released = item.get("released", "Unknown")
|
||
rating = item.get("rating", 0)
|
||
votecount = item.get("votecount", 0)
|
||
|
||
text = f"**{title}** (`{item_id}`)\n"
|
||
if original:
|
||
text += f"Оригинал: {original}\n"
|
||
text += f"Дата релиза: {released}\n"
|
||
if rating > 0:
|
||
text += f"Рейтинг: {rating/10:.1f}/10 ({votecount} голосов)\n"
|
||
|
||
elif item_type == "character":
|
||
item_id = item.get("id", "Unknown")
|
||
name = item.get("name", "Unknown")
|
||
original = item.get("original", "")
|
||
gender = item.get("gender", "Unknown")
|
||
|
||
text = f"**{name}** (`{item_id}`)\n"
|
||
if original:
|
||
text += f"Оригинал: {original}\n"
|
||
text += f"Пол: {gender}\n"
|
||
|
||
elif item_type == "release":
|
||
item_id = item.get("id", "Unknown")
|
||
title = item.get("title", "Unknown")
|
||
original = item.get("original", "")
|
||
released = item.get("released", "Unknown")
|
||
platform = item.get("platform", "Unknown")
|
||
|
||
text = f"**{title}** (`{item_id}`)\n"
|
||
if original:
|
||
text += f"Оригинал: {original}\n"
|
||
text += f"Дата: {released}\n"
|
||
text += f"Платформа: {platform}\n"
|
||
|
||
# Try to get image
|
||
image_data = item.get("image")
|
||
if image_data:
|
||
image_url = ImageHandler.get_image_url(image_data)
|
||
|
||
return text, image_url
|
||
|
||
@staticmethod
|
||
async def send_item_with_photo(
|
||
update: Update,
|
||
item_type: str,
|
||
item: Dict[str, Any],
|
||
emoji: str = "📦",
|
||
) -> None:
|
||
"""
|
||
Send item with photo if available
|
||
|
||
Args:
|
||
update: Telegram update
|
||
item_type: Type of item
|
||
item: Item data
|
||
emoji: Emoji to use in caption
|
||
"""
|
||
text, image_url = ImageHandler.format_item_with_image(item_type, item)
|
||
|
||
if image_url:
|
||
try:
|
||
title = item.get("title") or item.get("name", "Item")
|
||
caption = f"{emoji} {title}"
|
||
|
||
await update.message.reply_photo(
|
||
photo=image_url,
|
||
caption=caption,
|
||
parse_mode="Markdown"
|
||
)
|
||
except Exception as e:
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
logger.warning(f"Could not send photo: {e}")
|
||
await update.message.reply_text(text, parse_mode="Markdown")
|