""" Utility functions for VNDB Telegram Bot """ from typing import Dict, List, Any, Optional, Tuple import json from telegram import Update class Formatter: """Utility class for formatting API responses""" @staticmethod def truncate_text(text: str, max_length: int = 200) -> str: """Truncate text to max length""" if len(text) > max_length: return text[:max_length-3] + "..." return text @staticmethod def format_vn_item(vn: Dict[str, Any]) -> str: """Format a VN item for display""" vn_id = vn.get("id", "Unknown") title = vn.get("title", "Unknown") original = vn.get("original", "") released = vn.get("released", "Unknown") rating = vn.get("rating", 0) votecount = vn.get("votecount", 0) text = f"**{title}** (`{vn_id}`)\n" if original: text += f"Оригинал: {original}\n" text += f"Дата: {released}\n" if rating > 0: text += f"Рейтинг: {rating/10:.1f}/10 ({votecount} голосов)\n" return text @staticmethod def format_character_item(char: Dict[str, Any]) -> str: """Format a character item for display""" char_id = char.get("id", "Unknown") name = char.get("name", "Unknown") original = char.get("original", "") gender = char.get("gender", "Unknown") vns = char.get("vn", []) text = f"**{name}** (`{char_id}`)\n" if original: text += f"Оригинал: {original}\n" text += f"Пол: {gender}\n" if vns: text += f"Появляется в: {len(vns)} VN\n" return text @staticmethod def format_release_item(release: Dict[str, Any]) -> str: """Format a release item for display""" release_id = release.get("id", "Unknown") title = release.get("title", "Unknown") original = release.get("original", "") released = release.get("released", "Unknown") platform = release.get("platform", "Unknown") release_type = release.get("type", "Unknown") text = f"**{title}** (`{release_id}`)\n" if original: text += f"Оригинал: {original}\n" text += f"Дата: {released}\n" text += f"Платформа: {platform} | Тип: {release_type}\n" return text @staticmethod def format_staff_item(staff: Dict[str, Any]) -> str: """Format a staff member item for display""" staff_id = staff.get("id", "Unknown") name = staff.get("name", "Unknown") original = staff.get("original", "") gender = staff.get("gender", "Unknown") role = staff.get("role", "") text = f"**{name}** (`{staff_id}`)\n" if original: text += f"Оригинал: {original}\n" text += f"Пол: {gender}\n" if role: text += f"Роль: {role}\n" return text @staticmethod def format_producer_item(producer: Dict[str, Any]) -> str: """Format a producer item for display""" producer_id = producer.get("id", "Unknown") name = producer.get("name", "Unknown") original = producer.get("original", "") producer_type = producer.get("type", "Unknown") text = f"**{name}** (`{producer_id}`)\n" if original: text += f"Оригинал: {original}\n" if producer_type: text += f"Тип: {producer_type}\n" return text @staticmethod def format_tag_item(tag: Dict[str, Any]) -> str: """Format a tag item for display""" tag_id = tag.get("id", "Unknown") name = tag.get("name", "Unknown") description = tag.get("description", "") vns = tag.get("vns", 0) text = f"**{name}** (`{tag_id}`)\n" if description: truncated = Formatter.truncate_text(description, 100) text += f"_{truncated}_\n" if vns: text += f"ВН: {vns}\n" return text @staticmethod def format_trait_item(trait: Dict[str, Any]) -> str: """Format a trait item for display""" trait_id = trait.get("id", "Unknown") name = trait.get("name", "Unknown") description = trait.get("description", "") chars = trait.get("chars", 0) text = f"**{name}** (`{trait_id}`)\n" if description: truncated = Formatter.truncate_text(description, 100) text += f"_{truncated}_\n" if chars: text += f"Персонажей: {chars}\n" return text class ErrorHandler: """Error handling utilities""" @staticmethod def format_error(error: Exception) -> str: """Format error message for user""" error_type = type(error).__name__ error_msg = str(error) # Map common errors to user-friendly messages error_messages = { "HTTPStatusError": "Ошибка сервера API. Попробуйте позже.", "ConnectError": "Ошибка подключения. Проверьте интернет соединение.", "Timeout": "Запрос истек. Сервер слишком долго отвечает.", "ValueError": f"Ошибка данных: {error_msg}", "JSONDecodeError": "Ошибка парсинга ответа сервера.", } return error_messages.get(error_type, f"❌ {error_msg}") class PaginationHelper: """Helper for pagination""" @staticmethod def get_page_info(page: int, results_count: int, total: Optional[int] = None) -> str: """Get pagination info string""" info = f"📄 Страница {page}" if total: info += f" | Всего: {total}" if results_count > 0: info += f" | Результатов: {results_count}" return info class FieldValidator: """Validate and clean fields""" @staticmethod def validate_fields(fields: List[str], allowed_fields: List[str]) -> List[str]: """Validate that requested fields are allowed""" return [f for f in fields if f in allowed_fields] @staticmethod def clean_field_list(fields_str: str) -> List[str]: """Parse and clean field list from string""" fields = [f.strip() for f in fields_str.split(",")] return [f for f in fields if f] # Remove empty strings class QueryBuilder: """Helper for building API queries""" @staticmethod def build_search_filter(search_term: str) -> List[str]: """Build a search filter""" return ["search", "=", search_term] @staticmethod def build_id_filter(vn_id: str) -> List[str]: """Build an ID filter""" return ["id", "=", vn_id] @staticmethod def build_tag_filter(tag_id: str, depth: int = 0) -> Dict[str, Any]: """Build a tag filter with optional depth""" filter_dict = { "tag": tag_id, } if depth > 0: filter_dict["depth"] = depth return filter_dict @staticmethod def build_complex_filter(*filters: List[str]) -> List[Any]: """Build a complex AND filter from multiple simple filters""" if len(filters) == 1: return filters[0] return ["and"] + list(filters) class ImageHandler: """Handle image processing and sending""" # VNDB Image CDN VNDB_CDN = "https://t.vndb.org" @staticmethod def get_image_url(image_data: Dict[str, Any]) -> Optional[str]: """ Extract image URL from image data Args: image_data: Image data from API Returns: Full image URL or None """ if not image_data or not isinstance(image_data, dict): return None image_path = image_data.get("url") if not image_path: return None # Construct full URL if image_path.startswith("http"): return image_path return f"{ImageHandler.VNDB_CDN}{image_path}" @staticmethod def format_item_with_image( item_type: str, item: Dict[str, Any], ) -> tuple[str, Optional[str]]: """ Format item with image information Args: item_type: Type of item (vn, character, release, etc.) item: Item data Returns: Tuple of (text, image_url) """ text = "" image_url = None if item_type == "vn": item_id = item.get("id", "Unknown") title = item.get("title", "Unknown") original = item.get("original", "") released = item.get("released", "Unknown") rating = item.get("rating", 0) votecount = item.get("votecount", 0) text = f"**{title}** (`{item_id}`)\n" if original: text += f"Оригинал: {original}\n" text += f"Дата релиза: {released}\n" if rating > 0: text += f"Рейтинг: {rating/10:.1f}/10 ({votecount} голосов)\n" elif item_type == "character": item_id = item.get("id", "Unknown") name = item.get("name", "Unknown") original = item.get("original", "") gender = item.get("gender", "Unknown") text = f"**{name}** (`{item_id}`)\n" if original: text += f"Оригинал: {original}\n" text += f"Пол: {gender}\n" elif item_type == "release": item_id = item.get("id", "Unknown") title = item.get("title", "Unknown") original = item.get("original", "") released = item.get("released", "Unknown") platform = item.get("platform", "Unknown") text = f"**{title}** (`{item_id}`)\n" if original: text += f"Оригинал: {original}\n" text += f"Дата: {released}\n" text += f"Платформа: {platform}\n" # Try to get image image_data = item.get("image") if image_data: image_url = ImageHandler.get_image_url(image_data) return text, image_url @staticmethod async def send_item_with_photo( update: Update, item_type: str, item: Dict[str, Any], emoji: str = "📦", ) -> None: """ Send item with photo if available Args: update: Telegram update item_type: Type of item item: Item data emoji: Emoji to use in caption """ text, image_url = ImageHandler.format_item_with_image(item_type, item) if image_url: try: title = item.get("title") or item.get("name", "Item") caption = f"{emoji} {title}" await update.message.reply_photo( photo=image_url, caption=caption, parse_mode="Markdown" ) except Exception as e: import logging logger = logging.getLogger(__name__) logger.warning(f"Could not send photo: {e}") await update.message.reply_text(text, parse_mode="Markdown")