Files
ayako/bot.py

697 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
from typing import Optional
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
CallbackQueryHandler,
ConversationHandler,
ContextTypes,
filters,
)
from vndb_client import VndbClient
from config import Config
from utils import Formatter, ErrorHandler, QueryBuilder
from detailed_handlers import get_detail_handlers
# Setup logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=Config.LOG_LEVEL
)
logger = logging.getLogger(__name__)
# States for conversation handlers
SEARCH_VN, SELECT_VN, VN_DETAILS = range(3)
SEARCH_CHARACTER, SELECT_CHARACTER = range(2)
SEARCH_RELEASE, SELECT_RELEASE = range(2)
SEARCH_STAFF, SELECT_STAFF = range(2)
# Global VNDB client
vndb_client = VndbClient(use_sandbox=Config.USE_SANDBOX)
class BotHandlers:
"""Telegram bot command and message handlers"""
@staticmethod
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Start command handler"""
welcome_text = """
**Добро пожаловать в VNDB Telegram Бот!**
Этот бот позволяет искать информацию о визуальных новеллах, персонажах, релизах и многом другом из базы данных VNDB.
**Доступные команды:**
/search - Поиск визуальных новелл
/char - Поиск персонажей
/release - Поиск релизов
/staff - Поиск сотрудников
/producer - Поиск продюсеров
/tag - Поиск тегов
/trait - Поиск черт характера
/quote - Поиск цитат
/stats - Статистика базы данных
/schema - Информация о схеме API
/help - Справка по командам
__Используйте /help для получения подробной информации__
__Также можете ознакомится с примерами команд по ссылке: https://git.kotac.ru/King-of-the-all-Cookies/ayako/src/branch/main/EXAMPLES.md__
"""
await update.message.reply_text(welcome_text, parse_mode="Markdown")
@staticmethod
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Help command handler"""
help_text = """
**Справка по командам VNDB Бота**
**Поиск информации:**
/search <название> - Поиск визуальных новелл по названию
/char <название> - Поиск персонажей по имени
/release <название> - Поиск релизов
/staff <название> - Поиск сотрудников (сценаристы, художники и т.д.)
/producer <название> - Поиск продюсеров
/tag - Список популярных тегов
/trait - Список черт характера
/quote <количество> - Получить случайные цитаты
**Подробный просмотр (с картинками):**
/vn_detail <ID> - Просмотр полной информации о ВН с обложкой
__Пример: /vn_detail v17__
/char_detail <ID> - Просмотр информации о персонаже с аватаром
__Пример: /char_detail c1__
/release_detail <ID> - Просмотр информации о релизе с картинкой
__Пример: /release_detail r1__
**Информация:**
/stats - Показать статистику базы данных VNDB
/schema - Получить информацию о доступных полях API
/authinfo - Информация об авторизации (если настроена)
**Функции пользователя (требуют токена):**
Чтобы использовать функции списка, установите токен в переменной окружения VNDB_TOKEN
**Примеры использования:**
/search Steins Gate
/char Okabe
/release Windows
/vn_detail v17
/char_detail c25
/stats
**Важно:**
- Бот работает в асинхронном режиме
- Результаты ограничены 10 элементами по умолчанию
- При поиске автоматически отправляются картинки (первые 3 результата)
- Для просмотра полной информации с картинкой используйте /vn_detail, /char_detail и т.д.
"""
await update.message.reply_text(help_text, parse_mode="Markdown")
@staticmethod
async def stats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Get database statistics"""
try:
stats = await vndb_client.get_stats()
stats_text = f"""
**Статистика базы данных VNDB:**
Визуальные новеллы: {stats.get('vn', 0):,}
Персонажи: {stats.get('chars', 0):,}
Релизы: {stats.get('releases', 0):,}
Продюсеры: {stats.get('producers', 0):,}
Сотрудники: {stats.get('staff', 0):,}
Теги: {stats.get('tags', 0):,}
Черты характера: {stats.get('traits', 0):,}
"""
await update.message.reply_text(stats_text, parse_mode="Markdown")
except Exception as e:
logger.error(f"Error getting stats: {e}")
error_msg = ErrorHandler.format_error(e)
await update.message.reply_text(f"{error_msg}")
@staticmethod
async def schema(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Get API schema information"""
try:
await update.message.reply_text(
"⏳ Загружаю информацию о схеме... (это может занять некоторое время)",
parse_mode="Markdown"
)
schema = await vndb_client.get_schema()
# Build schema info
schema_text = "**Информация о схеме VNDB API:**\n\n"
# Database types
if "db_types" in schema:
schema_text += "**Типы данных:**\n"
for db_type, info in list(schema["db_types"].items())[:5]:
schema_text += f"{db_type}\n"
schema_text += "\n"
# Search fields
if "fields" in schema:
schema_text += "**Доступные поля для запросов:**\n"
for field_type, fields in list(schema["fields"].items())[:3]:
schema_text += f"{field_type}\n"
schema_text += "\n"
schema_text += "**Для полного списка полей и типов посетите: https://api.vndb.org/kana**"
await update.message.reply_text(schema_text, parse_mode="Markdown")
except Exception as e:
logger.error(f"Error getting schema: {e}")
error_msg = ErrorHandler.format_error(e)
await update.message.reply_text(f"{error_msg}")
@staticmethod
async def search_vn(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Search for visual novels"""
try:
args = " ".join(context.args) if context.args else ""
if not args:
await update.message.reply_text(
"Пожалуйста, укажите название для поиска\n"
"Пример: /search Steins Gate"
)
return ConversationHandler.END
await update.message.reply_text(f"Поиск визуальных новелл: **{args}**\nЗагрузка...", parse_mode="Markdown")
# Search for VN
filters = ["search", "=", args]
results = await vndb_client.query_vn(
filters=[filters],
fields=["title", "original", "released", "rating", "votecount", "image{url}"],
results=10
)
if not results.get("results"):
await update.message.reply_text("Ничего не найдено")
return ConversationHandler.END
# Format results
response_text = f"**Результаты поиска: {args}**\n\n"
for i, vn in enumerate(results["results"], 1):
vn_id = vn.get("id", "Unknown")
title = vn.get("title", "Unknown")
original = vn.get("original", "")
released = vn.get("released", "Unknown")
rating = vn.get("rating", 0)
votecount = vn.get("votecount", 0)
response_text += (
f"{i}. **{title}**\n"
f" ID: {vn_id}\n"
)
if original:
response_text += f" Оригинал: {original}\n"
response_text += (
f" Релиз: {released}\n"
f" Рейтинг: {rating/10:.1f}/10 ({votecount} голосов)\n\n"
)
response_text += f"\nВсего найдено: {len(results['results'])} результатов"
if results.get("more"):
response_text += " (есть еще результаты)"
await update.message.reply_text(response_text, parse_mode="Markdown")
# Send images if available
for vn in results["results"][:3]: # Send images for first 3 results
image = vn.get("image")
if image and isinstance(image, dict):
image_url = image.get("url")
if image_url:
try:
title = vn.get("title", "VN")
await update.message.reply_photo(
photo=f"https://t.vndb.org{image_url}",
caption=f"{title}",
parse_mode="Markdown"
)
except Exception as e:
logger.warning(f"Could not send image: {e}")
# Store results for detail view
context.user_data["vn_results"] = results["results"]
except Exception as e:
logger.error(f"Error searching VN: {e}")
error_msg = ErrorHandler.format_error(e)
await update.message.reply_text(f"{error_msg}")
return ConversationHandler.END
@staticmethod
async def search_character(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Search for characters"""
try:
args = " ".join(context.args) if context.args else ""
if not args:
await update.message.reply_text(
"Пожалуйста, укажите имя персонажа\n"
"Пример: /char Okabe"
)
return ConversationHandler.END
await update.message.reply_text(f"Поиск персонажей: **{args}**\n⏳ Загрузка...", parse_mode="Markdown")
filters = ["search", "=", args]
results = await vndb_client.query_character(
filters=[filters],
fields=["name", "original", "gender", "vn", "image{url}"],
results=10
)
if not results.get("results"):
await update.message.reply_text("Ничего не найдено")
return ConversationHandler.END
response_text = f"**Результаты поиска персонажей: {args}**\n\n"
for i, char in enumerate(results["results"], 1):
char_id = char.get("id", "Unknown")
name = char.get("name", "Unknown")
original = char.get("original", "")
gender = char.get("gender", "Unknown")
vns = char.get("vn", [])
response_text += (
f"{i}. **{name}**\n"
f" ID: {char_id}\n"
)
if original:
response_text += f" Оригинал: {original}\n"
response_text += f" Пол: {gender}\n"
if vns:
response_text += f" Появляется в {len(vns)} VN\n\n"
else:
response_text += "\n"
await update.message.reply_text(response_text, parse_mode="Markdown")
# Send character images if available
for char in results["results"][:3]: # Send images for first 3 results
image = char.get("image")
if image and isinstance(image, dict):
image_url = image.get("url")
if image_url:
try:
name = char.get("name", "Character")
await update.message.reply_photo(
photo=f"https://t.vndb.org{image_url}",
caption=f"{name}",
parse_mode="Markdown"
)
except Exception as e:
logger.warning(f"Could not send character image: {e}")
context.user_data["char_results"] = results["results"]
except Exception as e:
logger.error(f"Error searching characters: {e}")
error_msg = ErrorHandler.format_error(e)
await update.message.reply_text(f"{error_msg}")
return ConversationHandler.END
@staticmethod
async def search_release(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Search for releases"""
try:
args = " ".join(context.args) if context.args else ""
if not args:
await update.message.reply_text(
"Пожалуйста, укажите название для поиска\n"
"Пример: /release Windows"
)
return ConversationHandler.END
await update.message.reply_text(f"Поиск релизов: **{args}**\nЗагрузка...", parse_mode="Markdown")
filters = ["search", "=", args]
results = await vndb_client.query_release(
filters=[filters],
fields=["title", "original", "released", "platform", "type", "image{url}"],
results=10
)
if not results.get("results"):
await update.message.reply_text("Ничего не найдено")
return ConversationHandler.END
response_text = f"**Результаты поиска релизов: {args}**\n\n"
for i, release in enumerate(results["results"], 1):
release_id = release.get("id", "Unknown")
title = release.get("title", "Unknown")
original = release.get("original", "")
released = release.get("released", "Unknown")
platform = release.get("platform", "Unknown")
release_type = release.get("type", "Unknown")
response_text += (
f"{i}. **{title}**\n"
f" ID: {release_id}\n"
)
if original:
response_text += f" Оригинал: {original}\n"
response_text += (
f" Дата: {released}\n"
f" Платформа: {platform}\n"
f" Тип: {release_type}\n\n"
)
await update.message.reply_text(response_text, parse_mode="Markdown")
# Send release images if available
for release in results["results"][:3]: # Send images for first 3 results
image = release.get("image")
if image and isinstance(image, dict):
image_url = image.get("url")
if image_url:
try:
title = release.get("title", "Release")
await update.message.reply_photo(
photo=f"https://t.vndb.org{image_url}",
caption=f"{title}",
parse_mode="Markdown"
)
except Exception as e:
logger.warning(f"Could not send release image: {e}")
context.user_data["release_results"] = results["results"]
except Exception as e:
logger.error(f"Error searching releases: {e}")
error_msg = ErrorHandler.format_error(e)
await update.message.reply_text(f"{error_msg}")
return ConversationHandler.END
@staticmethod
async def search_staff(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Search for staff members"""
try:
args = " ".join(context.args) if context.args else ""
if not args:
await update.message.reply_text(
"Пожалуйста, укажите имя\n"
"Пример: /staff Yoko"
)
return ConversationHandler.END
await update.message.reply_text(f"Поиск сотрудников: **{args}**\nЗагрузка...", parse_mode="Markdown")
filters = ["search", "=", args]
results = await vndb_client.query_staff(
filters=[filters],
fields=["name", "original", "gender", "role"],
results=10
)
if not results.get("results"):
await update.message.reply_text("Ничего не найдено")
return ConversationHandler.END
response_text = f"**Результаты поиска сотрудников: {args}**\n\n"
for i, staff in enumerate(results["results"], 1):
staff_id = staff.get("id", "Unknown")
name = staff.get("name", "Unknown")
original = staff.get("original", "")
gender = staff.get("gender", "Unknown")
response_text += (
f"{i}. **{name}**\n"
f" ID: {staff_id}\n"
)
if original:
response_text += f" Оригинал: {original}\n"
response_text += f" Пол: {gender}\n\n"
await update.message.reply_text(response_text, parse_mode="Markdown")
context.user_data["staff_results"] = results["results"]
except Exception as e:
logger.error(f"Error searching staff: {e}")
error_msg = ErrorHandler.format_error(e)
await update.message.reply_text(f"{error_msg}")
return ConversationHandler.END
@staticmethod
async def search_producer(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Search for producers"""
try:
args = " ".join(context.args) if context.args else ""
if not args:
await update.message.reply_text(
"Пожалуйста, укажите название\n"
"Пример: /producer Key"
)
return ConversationHandler.END
await update.message.reply_text(f"Поиск продюсеров: **{args}**\n⏳ Загрузка...", parse_mode="Markdown")
filters = ["search", "=", args]
results = await vndb_client.query_producer(
filters=[filters],
fields=["name", "original", "type"],
results=10
)
if not results.get("results"):
await update.message.reply_text("Ничего не найдено")
return ConversationHandler.END
response_text = f"**Результаты поиска продюсеров: {args}**\n\n"
for i, producer in enumerate(results["results"], 1):
producer_id = producer.get("id", "Unknown")
name = producer.get("name", "Unknown")
original = producer.get("original", "")
producer_type = producer.get("type", "Unknown")
response_text += (
f"{i}. **{name}**\n"
f" ID: {producer_id}\n"
)
if original:
response_text += f" Оригинал: {original}\n"
response_text += f" Тип: {producer_type}\n\n"
await update.message.reply_text(response_text, parse_mode="Markdown")
except Exception as e:
logger.error(f"Error searching producers: {e}")
error_msg = ErrorHandler.format_error(e)
await update.message.reply_text(f"{error_msg}")
return ConversationHandler.END
@staticmethod
async def list_tags(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""List popular tags"""
try:
await update.message.reply_text("Загружаю теги...", parse_mode="Markdown")
results = await vndb_client.query_tag(
fields=["name", "description"],
sort="vns",
reverse=True,
results=15
)
if not results.get("results"):
await update.message.reply_text("Ничего не найдено")
return
response_text = "**Популярные теги VNDB:**\n\n"
for i, tag in enumerate(results["results"], 1):
tag_id = tag.get("id", "Unknown")
name = tag.get("name", "Unknown")
description = tag.get("description", "")
response_text += f"{i}. **{name}** (`{tag_id}`)\n"
if description and len(description) < 50:
response_text += f" {description}\n"
response_text += "\n"
await update.message.reply_text(response_text, parse_mode="Markdown")
except Exception as e:
logger.error(f"Error listing tags: {e}")
error_msg = ErrorHandler.format_error(e)
await update.message.reply_text(f"{error_msg}")
@staticmethod
async def list_traits(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""List character traits"""
try:
await update.message.reply_text("Загружаю черты характера...", parse_mode="Markdown")
results = await vndb_client.query_trait(
fields=["name", "description"],
sort="chars",
reverse=True,
results=15
)
if not results.get("results"):
await update.message.reply_text("Ничего не найдено")
return
response_text = "**Популярные черты характера:**\n\n"
for i, trait in enumerate(results["results"], 1):
trait_id = trait.get("id", "Unknown")
name = trait.get("name", "Unknown")
description = trait.get("description", "")
response_text += f"{i}. **{name}** (`{trait_id}`)\n"
if description and len(description) < 50:
response_text += f" {description}\n"
response_text += "\n"
await update.message.reply_text(response_text, parse_mode="Markdown")
except Exception as e:
logger.error(f"Error listing traits: {e}")
error_msg = ErrorHandler.format_error(e)
await update.message.reply_text(f"{error_msg}")
@staticmethod
async def get_quote(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Get random quotes"""
try:
count = 1
if context.args and context.args[0].isdigit():
count = min(int(context.args[0]), 5) # Max 5 quotes
await update.message.reply_text(f"Загружаю {count} цитат...", parse_mode="Markdown")
results = await vndb_client.query_quote(
fields=["character", "quote"],
sort="id",
results=count
)
if not results.get("results"):
await update.message.reply_text("Ничего не найдено")
return
response_text = "**Случайные цитаты:**\n\n"
for quote in results["results"]:
quote_text = quote.get("quote", "")
character = quote.get("character", "Unknown")
if quote_text:
response_text += f"_{quote_text}_\n"
response_text += f"— **{character}**\n\n"
await update.message.reply_text(response_text, parse_mode="Markdown")
except Exception as e:
logger.error(f"Error getting quotes: {e}")
error_msg = ErrorHandler.format_error(e)
await update.message.reply_text(f"{error_msg}")
@staticmethod
async def authinfo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Get authentication info"""
try:
token = Config.VNDB_TOKEN
if not token:
await update.message.reply_text(
"Токен VNDB не установлен\n\n"
"Чтобы использовать функции авторизации:\n"
"1. Посетите https://vndb.org/u/tokens\n"
"2. Создайте новый токен\n"
"3. Установите переменную окружения VNDB_TOKEN"
)
return
client_with_token = VndbClient(token=token, use_sandbox=Config.USE_SANDBOX)
auth_info = await client_with_token.get_authinfo()
response_text = f"""
**Информация об авторизации:**
ID: {auth_info.get('id', 'Unknown')}
Пользователь: {auth_info.get('username', 'Unknown')}
**Разрешения:**
"""
permissions = auth_info.get("permissions", [])
if "listread" in permissions:
response_text += "Чтение списка (listread)\n"
if "listwrite" in permissions:
response_text += "Запись в список (listwrite)\n"
if not permissions:
response_text += "Нет разрешений"
await update.message.reply_text(response_text, parse_mode="Markdown")
except Exception as e:
logger.error(f"Error getting authinfo: {e}")
error_msg = ErrorHandler.format_error(e)
await update.message.reply_text(error_msg)
def main() -> None:
"""Start the bot"""
# Validate configuration
try:
Config.validate()
except ValueError as e:
logger.error(f"Configuration error: {e}")
raise
logger.info(f"Starting bot with config: {Config.to_dict()}")
# Create application
application = Application.builder().token(Config.TELEGRAM_BOT_TOKEN).build()
# Add handlers
application.add_handler(CommandHandler("start", BotHandlers.start))
application.add_handler(CommandHandler("help", BotHandlers.help_command))
application.add_handler(CommandHandler("stats", BotHandlers.stats))
application.add_handler(CommandHandler("schema", BotHandlers.schema))
application.add_handler(CommandHandler("search", BotHandlers.search_vn))
application.add_handler(CommandHandler("char", BotHandlers.search_character))
application.add_handler(CommandHandler("release", BotHandlers.search_release))
application.add_handler(CommandHandler("staff", BotHandlers.search_staff))
application.add_handler(CommandHandler("producer", BotHandlers.search_producer))
application.add_handler(CommandHandler("tag", BotHandlers.list_tags))
application.add_handler(CommandHandler("trait", BotHandlers.list_traits))
application.add_handler(CommandHandler("quote", BotHandlers.get_quote))
application.add_handler(CommandHandler("authinfo", BotHandlers.authinfo))
# Add detailed handlers for viewing with images
for handler in get_detail_handlers():
application.add_handler(handler)
# Start the bot
logger.info("Starting bot...")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()