226 lines
9.9 KiB
Python
226 lines
9.9 KiB
Python
import os
|
||
import logging
|
||
import re
|
||
import httpx
|
||
import random
|
||
from aiogram import Bot, Dispatcher, types
|
||
from aiogram.filters import Command, CommandObject
|
||
from aiogram.client.default import DefaultBotProperties
|
||
from aiogram.enums import ParseMode
|
||
|
||
# Конфиг
|
||
TOKEN = os.getenv("TELEGRAM_TOKEN")
|
||
API_URL = os.getenv("VNDB_API_URL", "https://api.vndb.org/kana")
|
||
|
||
logging.basicConfig(level=logging.INFO)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
bot = Bot(token=TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
|
||
dp = Dispatcher()
|
||
|
||
# --- ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ ---
|
||
|
||
def clean_text(text: str, limit: int = 800) -> str:
|
||
"""Очистка текста от VNDB тегов и лимит символов."""
|
||
if not text: return "No description available."
|
||
text = re.sub(r'\[.*?\]', '', text) # Удаляем [b], [url] и т.д.
|
||
text = text.replace('"', "'")
|
||
if len(text) > limit:
|
||
return text[:limit].rsplit(' ', 1)[0] + "..."
|
||
return text
|
||
|
||
def safe_list_to_str(data) -> str:
|
||
"""Универсальный парсер списков для VNDB API."""
|
||
if not data: return "N/A"
|
||
if isinstance(data, list):
|
||
processed = []
|
||
for x in data:
|
||
if isinstance(x, str): processed.append(x.upper())
|
||
elif isinstance(x, dict):
|
||
# Извлекаем lang для релизов или name для разработчиков
|
||
val = x.get('lang') or x.get('name') or x.get('title') or x.get('label')
|
||
if val: processed.append(str(val).upper())
|
||
return ", ".join(processed) if processed else "N/A"
|
||
return str(data).upper()
|
||
|
||
async def fetch_vndb(endpoint: str, filters: list, fields: str, sort: str = "id", results: int = 10):
|
||
"""Базовый POST запрос к VNDB API."""
|
||
payload = {"filters": filters, "fields": fields, "sort": sort, "results": results}
|
||
async with httpx.AsyncClient(timeout=20.0) as client:
|
||
try:
|
||
response = await client.post(f"{API_URL}/{endpoint}", json=payload)
|
||
if response.status_code == 200:
|
||
return response.json().get("results", [])
|
||
logger.error(f"VNDB Error {response.status_code}: {response.text}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"Request failed: {e}")
|
||
return None
|
||
|
||
async def send_result(message: types.Message, text: str, image_url: str = None):
|
||
"""Отправка сообщения: с фото (caption) или просто текст."""
|
||
try:
|
||
if image_url:
|
||
await message.answer_photo(photo=image_url, caption=text[:1024])
|
||
else:
|
||
await message.answer(text)
|
||
except Exception as e:
|
||
logger.warning(f"Photo send failed: {e}")
|
||
await message.answer(text)
|
||
|
||
def find_exact_match(results, query, attr):
|
||
"""Проверка на 100% совпадение названия в результатах поиска."""
|
||
if not results or not query: return None
|
||
for item in results:
|
||
if item.get(attr, "").lower() == query.lower():
|
||
return item
|
||
return None
|
||
|
||
# --- ОБРАБОТЧИКИ КОМАНД ---
|
||
|
||
@dp.message(Command("vn"))
|
||
async def handle_vn(message: types.Message, command: CommandObject):
|
||
if not command.args: return await message.answer("Example: /vn Steins Gate")
|
||
|
||
is_id = command.args.startswith('v') and command.args[1:].isdigit()
|
||
filt = ["id", "=", command.args] if is_id else ["search", "=", command.args]
|
||
fields = "id, title, alttitle, released, rating, votecount, description, languages, platforms, developers{name}, image{url}, relations{title, id, relation}"
|
||
|
||
res = await fetch_vndb("vn", filt, fields)
|
||
if not res: return await message.answer("❌ VN not found.")
|
||
|
||
target = res[0]
|
||
if not is_id and len(res) > 1:
|
||
exact = find_exact_match(res, command.args, 'title')
|
||
if exact: target = exact
|
||
else:
|
||
out = [f"• {i['title']} — <code>{i['id']}</code>" for i in res[:10]]
|
||
return await message.answer("🔍 <b>Select VN by ID:</b>\n" + "\n".join(out))
|
||
|
||
img = target.get('image', {}).get('url') if target.get('image') else None
|
||
rel_str = "\n".join([f"• {r['title']} (<code>{r['id']}</code>) - {r['relation']}" for r in target.get('relations', [])[:3]])
|
||
|
||
text = (
|
||
f"<b>TITLE:</b> {target['title']}\n"
|
||
f"<b>ORIGINAL:</b> {target.get('alttitle') or 'N/A'}\n"
|
||
f"<b>RELEASED:</b> {target.get('released', 'N/A')}\n"
|
||
f"<b>DEVELOPER:</b> {safe_list_to_str(target.get('developers'))}\n"
|
||
f"<b>RATING:</b> {target['rating']/10 if target.get('rating') else 'N/A'} ({target.get('votecount', 0)} votes)\n"
|
||
f"<b>LANGS:</b> {safe_list_to_str(target.get('languages'))}\n"
|
||
f"<b>PLATFORMS:</b> {safe_list_to_str(target.get('platforms'))}\n\n"
|
||
f"<b>RELATIONS:</b>\n{rel_str or 'None'}\n\n"
|
||
f"<b>DESC:</b>\n<i>{clean_text(target.get('description'), 400)}</i>\n\n"
|
||
f"<b>VNDB:</b> https://vndb.org/{target['id']}"
|
||
)
|
||
await send_result(message, text, img)
|
||
|
||
@dp.message(Command("char"))
|
||
async def handle_char(message: types.Message, command: CommandObject):
|
||
if not command.args: return await message.answer("Example: /char Kurisu")
|
||
|
||
is_id = command.args.startswith('c') and command.args[1:].isdigit()
|
||
filt = ["id", "=", command.args] if is_id else ["search", "=", command.args]
|
||
fields = "id, name, original, description, gender, age, blood_type, image{url}"
|
||
|
||
res = await fetch_vndb("character", filt, fields)
|
||
if not res: return await message.answer("❌ Character not found.")
|
||
|
||
target = res[0]
|
||
if not is_id and len(res) > 1:
|
||
exact = find_exact_match(res, command.args, 'name')
|
||
if exact: target = exact
|
||
else:
|
||
out = [f"• {i['name']} — <code>{i['id']}</code>" for i in res[:10]]
|
||
return await message.answer("👤 <b>Select Character by ID:</b>\n" + "\n".join(out))
|
||
|
||
img = target.get('image', {}).get('url') if target.get('image') else None
|
||
raw_gender = target.get('gender')
|
||
if isinstance(raw_gender, list): raw_gender = raw_gender[0] if raw_gender else None
|
||
|
||
text = (
|
||
f"<b>NAME:</b> {target['name']}\n"
|
||
f"<b>ORIGINAL:</b> {target.get('original') or 'N/A'}\n"
|
||
f"<b>GENDER:</b> {str(raw_gender).upper()}\n"
|
||
f"<b>AGE:</b> {target.get('age') or 'N/A'} | <b>BLOOD:</b> {target.get('blood_type') or 'N/A'}\n\n"
|
||
f"<b>DESCRIPTION:</b>\n<i>{clean_text(target.get('description'), 800)}</i>\n\n"
|
||
f"<b>VNDB:</b> https://vndb.org/{target['id']}"
|
||
)
|
||
await send_result(message, text, img)
|
||
|
||
@dp.message(Command("release"))
|
||
async def handle_release(message: types.Message, command: CommandObject):
|
||
if not command.args: return await message.answer("Example: /release Steins Gate")
|
||
|
||
is_id = command.args.startswith('r') and command.args[1:].isdigit()
|
||
filt = ["id", "=", command.args] if is_id else ["search", "=", command.args]
|
||
fields = "id, title, alttitle, released, languages{lang}, platforms, extlinks{url, label}, images{url}"
|
||
|
||
res = await fetch_vndb("release", filt, fields)
|
||
if not res: return await message.answer("❌ Release not found.")
|
||
|
||
target = res[0]
|
||
if not is_id and len(res) > 1:
|
||
exact = find_exact_match(res, command.args, 'title')
|
||
if exact: target = exact
|
||
else:
|
||
out = [f"• {i['title']} — <code>{i['id']}</code>" for i in res[:10]]
|
||
return await message.answer("💿 <b>Select Release by ID:</b>\n" + "\n".join(out))
|
||
|
||
img_list = target.get('images', [])
|
||
img = img_list[0].get('url') if img_list else None
|
||
links = "\n".join([f"• <a href='{l['url']}'>{l['label']}</a>" for l in target.get('extlinks', [])[:8]])
|
||
|
||
text = (
|
||
f"<b>RELEASE:</b> {target['title']}\n"
|
||
f"<b>ORIGINAL:</b> {target.get('alttitle') or 'N/A'}\n\n"
|
||
f"<b>DATE:</b> {target.get('released', 'N/A')}\n"
|
||
f"<b>LANGS:</b> {safe_list_to_str(target.get('languages'))}\n"
|
||
f"<b>PLATFORMS:</b> {safe_list_to_str(target.get('platforms'))}\n\n"
|
||
f"<b>STORES:</b>\n{links or 'N/A'}\n\n"
|
||
f"<b>VNDB:</b> https://vndb.org/{target['id']}"
|
||
)
|
||
await send_result(message, text, img)
|
||
|
||
@dp.message(Command("random"))
|
||
async def handle_random(message: types.Message):
|
||
"""Выдает случайную популярную VN с высоким рейтингом."""
|
||
# Получаем 100 популярных новелл с рейтингом > 8.0
|
||
res = await fetch_vndb("vn", ["rating", ">=", 80], "id, title, image{url}, rating, description", sort="votecount", results=100)
|
||
if not res: return await message.answer("❌ API error")
|
||
|
||
v = random.choice(res)
|
||
text = (
|
||
f"🎲 <b>RANDOM VN PICK</b>\n\n"
|
||
f"<b>{v['title']}</b>\n"
|
||
f"Rating: {v['rating']/10} ⭐\n\n"
|
||
f"<i>{clean_text(v.get('description'), 400)}</i>\n\n"
|
||
f"https://vndb.org/{v['id']}"
|
||
)
|
||
await send_result(message, text, v.get('image', {}).get('url'))
|
||
|
||
@dp.message(Command("top"))
|
||
async def handle_top(message: types.Message):
|
||
"""Выводит Топ-10 по рейтингу (среди популярных)."""
|
||
res = await fetch_vndb("vn", ["votecount", ">", 2500], "id, title, rating", sort="rating", results=10)
|
||
if not res: return await message.answer("❌ API error")
|
||
|
||
out = ["🏆 <b>TOP 10 VISUAL NOVELS</b>\n"]
|
||
for i, v in enumerate(res, 1):
|
||
out.append(f"{i}. {v['title']} — <b>{v['rating']/10}</b>")
|
||
|
||
await message.answer("\n".join(out))
|
||
|
||
@dp.message(Command("start", "help", "search"))
|
||
async def cmd_start(message: types.Message):
|
||
await message.answer(
|
||
"🤖 <b>VNDB Professional Bot</b>\n\n"
|
||
"<b>Commands:</b>\n"
|
||
"• /vn [name/id] - VN Details\n"
|
||
"• /char [name/id] - Characters\n"
|
||
"• /release [name/id] - Release Info\n"
|
||
"• /random - Random high-rated VN\n"
|
||
"• /top - Top 10 by Rating"
|
||
)
|
||
|
||
if __name__ == "__main__":
|
||
dp.run_polling(bot) |