200 lines
8.7 KiB
Python
200 lines
8.7 KiB
Python
import os
|
|
import logging
|
|
import re
|
|
import httpx
|
|
import random
|
|
from aiogram import Bot, Dispatcher, types
|
|
from aiogram.filters import Command, CommandObject
|
|
from aiogram.client.default import DefaultBotProperties
|
|
from aiogram.enums import ParseMode
|
|
|
|
TOKEN = os.getenv("TELEGRAM_TOKEN")
|
|
API_URL = os.getenv("VNDB_API_URL", "https://api.vndb.org/kana")
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
bot = Bot(token=TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
|
|
dp = Dispatcher()
|
|
|
|
# --- UTILS ---
|
|
|
|
def clean_text(text: str, limit: int = 900) -> str:
|
|
if not text: return "No description available."
|
|
# Удаляем все теги [b], [i], [url] и т.д.
|
|
text = re.sub(r'\[.*?\]', '', text)
|
|
text = text.replace('"', "'")
|
|
if len(text) > limit:
|
|
return text[:limit].rsplit(' ', 1)[0] + "..."
|
|
return text
|
|
|
|
def safe_list_to_str(data) -> str:
|
|
if not data: return "N/A"
|
|
if isinstance(data, list):
|
|
processed = []
|
|
for x in data:
|
|
if isinstance(x, str): processed.append(x.upper())
|
|
elif isinstance(x, dict):
|
|
# Ищем любое человекочитаемое поле
|
|
val = x.get('lang') or x.get('name') or x.get('title') or x.get('label')
|
|
if val: processed.append(str(val).upper())
|
|
return ", ".join(processed) if processed else "N/A"
|
|
return str(data).upper()
|
|
|
|
async def fetch_vndb(endpoint: str, filters: list, fields: str, sort: str = "id", results: int = 10):
|
|
payload = {"filters": filters, "fields": fields, "sort": sort, "results": results}
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
try:
|
|
response = await client.post(f"{API_URL}/{endpoint}", json=payload)
|
|
if response.status_code == 200:
|
|
return response.json().get("results", [])
|
|
logger.error(f"VNDB Error {response.status_code}: {response.text}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
return None
|
|
|
|
async def send_result(message: types.Message, text: str, image_url: str = None):
|
|
try:
|
|
if image_url:
|
|
await message.answer_photo(photo=image_url, caption=text[:1024])
|
|
else:
|
|
await message.answer(text)
|
|
except Exception as e:
|
|
await message.answer(text)
|
|
|
|
def find_exact_match(results, query, attr):
|
|
"""Ищет 100% совпадение по имени/названию в результатах."""
|
|
for item in results:
|
|
if item.get(attr, "").lower() == query.lower():
|
|
return item
|
|
return None
|
|
|
|
# --- HANDLERS ---
|
|
|
|
@dp.message(Command("vn"))
|
|
async def handle_vn(message: types.Message, command: CommandObject):
|
|
if not command.args: return await message.answer("Usage: /vn <name/id>")
|
|
|
|
is_id = command.args.startswith('v') and command.args[1:].isdigit()
|
|
filt = ["id", "=", command.args] if is_id else ["search", "=", command.args]
|
|
fields = "id, title, alttitle, released, rating, votecount, description, languages, platforms, developers{name}, image{url}, relations{title, id, relation}"
|
|
|
|
res = await fetch_vndb("vn", filt, fields)
|
|
if not res: return await message.answer("❌ Not found.")
|
|
|
|
# Логика умного поиска
|
|
target = res[0]
|
|
if not is_id and len(res) > 1:
|
|
exact = find_exact_match(res, command.args, 'title')
|
|
if exact:
|
|
target = exact
|
|
else:
|
|
out = [f"• {i['title']} — <code>{i['id']}</code>" for i in res[:10]]
|
|
return await message.answer("🔍 <b>Select VN:</b>\n" + "\n".join(out))
|
|
|
|
img = target.get('image', {}).get('url') if target.get('image') else None
|
|
rel_str = "\n".join([f"• {r['title']} (<code>{r['id']}</code>) - {r['relation']}" for r in target.get('relations', [])[:3]])
|
|
|
|
text = (
|
|
f"<b>TITLE:</b> {target['title']}\n"
|
|
f"<b>ORIGINAL:</b> {target.get('alttitle') or 'N/A'}\n"
|
|
f"<b>DEVELOPER:</b> {safe_list_to_str(target.get('developers'))}\n"
|
|
f"<b>RATING:</b> {target['rating']/10 if target.get('rating') else 'N/A'} ({target.get('votecount', 0)} votes)\n"
|
|
f"<b>LANGS:</b> {safe_list_to_str(target.get('languages'))}\n"
|
|
f"<b>PLATFORMS:</b> {safe_list_to_str(target.get('platforms'))}\n\n"
|
|
f"<b>RELATIONS:</b>\n{rel_str or 'None'}\n\n"
|
|
f"<b>DESC:</b>\n<i>{clean_text(target.get('description'), 450)}</i>\n\n"
|
|
f"<b>VNDB:</b> https://vndb.org/{target['id']}"
|
|
)
|
|
await send_result(message, text, img)
|
|
|
|
@dp.message(Command("char"))
|
|
async def handle_char(message: types.Message, command: CommandObject):
|
|
if not command.args: return await message.answer("Usage: /char <name/id>")
|
|
|
|
is_id = command.args.startswith('c') and command.args[1:].isdigit()
|
|
filt = ["id", "=", command.args] if is_id else ["search", "=", command.args]
|
|
fields = "id, name, original, description, gender, age, blood_type, image{url}"
|
|
|
|
res = await fetch_vndb("character", filt, fields)
|
|
if not res: return await message.answer("❌ Not found.")
|
|
|
|
target = res[0]
|
|
if not is_id and len(res) > 1:
|
|
exact = find_exact_match(res, command.args, 'name')
|
|
if exact: target = exact
|
|
else:
|
|
out = [f"• {i['name']} — <code>{i['id']}</code>" for i in res[:10]]
|
|
return await message.answer("👤 <b>Select Character:</b>\n" + "\n".join(out))
|
|
|
|
img = target.get('image', {}).get('url') if target.get('image') else None
|
|
raw_gender = target.get('gender')
|
|
if isinstance(raw_gender, list): raw_gender = raw_gender[0] if raw_gender else None
|
|
|
|
text = (
|
|
f"<b>NAME:</b> {target['name']}\n"
|
|
f"<b>ORIGINAL:</b> {target.get('original') or 'N/A'}\n"
|
|
f"<b>GENDER:</b> {str(raw_gender).upper()}\n"
|
|
f"<b>AGE:</b> {target.get('age') or 'N/A'} | <b>BLOOD:</b> {target.get('blood_type') or 'N/A'}\n\n"
|
|
f"<b>DESCRIPTION:</b>\n<i>{clean_text(target.get('description'), 800)}</i>\n\n"
|
|
f"<b>VNDB:</b> https://vndb.org/{target['id']}"
|
|
)
|
|
await send_result(message, text, img)
|
|
|
|
@dp.message(Command("release"))
|
|
async def handle_release(message: types.Message, command: CommandObject):
|
|
if not command.args: return await message.answer("Usage: /release <name/id>")
|
|
|
|
is_id = command.args.startswith('r') and command.args[1:].isdigit()
|
|
filt = ["id", "=", command.args] if is_id else ["search", "=", command.args]
|
|
fields = "id, title, alttitle, released, languages{lang}, platforms, extlinks{url, label}, images{url}"
|
|
|
|
res = await fetch_vndb("release", filt, fields)
|
|
if not res: return await message.answer("❌ Not found.")
|
|
|
|
target = res[0]
|
|
if not is_id and len(res) > 1:
|
|
exact = find_exact_match(res, command.args, 'title')
|
|
if exact: target = exact
|
|
else:
|
|
out = [f"• {i['title']} — <code>{i['id']}</code>" for i in res[:10]]
|
|
return await message.answer("💿 <b>Select Release:</b>\n" + "\n".join(out))
|
|
|
|
img_list = target.get('images', [])
|
|
img = img_list[0].get('url') if img_list else None
|
|
links = "\n".join([f"• <a href='{l['url']}'>{l['label']}</a>" for l in target.get('extlinks', [])[:8]])
|
|
|
|
text = (
|
|
f"<b>RELEASE:</b> {target['title']}\n"
|
|
f"<b>ORIGINAL:</b> {target.get('alttitle') or 'N/A'}\n\n"
|
|
f"<b>DATE:</b> {target.get('released', 'N/A')}\n"
|
|
f"<b>LANGS:</b> {safe_list_to_str(target.get('languages'))}\n"
|
|
f"<b>PLATFORMS:</b> {safe_list_to_str(target.get('platforms'))}\n\n"
|
|
f"<b>LINKS / STORES:</b>\n{links or 'N/A'}\n\n"
|
|
f"<b>VNDB:</b> https://vndb.org/{target['id']}"
|
|
)
|
|
await send_result(message, text, img)
|
|
|
|
@dp.message(Command("random"))
|
|
async def handle_random(message: types.Message):
|
|
res = await fetch_vndb("vn", ["rating", ">=", 80], "id, title, image{url}, rating, description", sort="votecount", results=100)
|
|
if not res: return await message.answer("❌ API error")
|
|
v = random.choice(res)
|
|
text = f"🎲 <b>RANDOM VN</b>\n\n<b>{v['title']}</b>\nRating: {v['rating']/10} ⭐\n\n<i>{clean_text(v.get('description'), 400)}</i>\n\nhttps://vndb.org/{v['id']}"
|
|
await send_result(message, text, v.get('image', {}).get('url'))
|
|
|
|
@dp.message(Command("top"))
|
|
async def handle_top(message: types.Message):
|
|
res = await fetch_vndb("vn", ["votecount", ">", 2000], "id, title, rating", sort="rating", results=10)
|
|
out = ["🏆 <b>TOP 10 VISUAL NOVELS</b>"]
|
|
for i, v in enumerate(res, 1):
|
|
out.append(f"{i}. {v['title']} — <b>{v['rating']/10}</b>")
|
|
await message.answer("\n".join(out))
|
|
|
|
@dp.message(Command("start", "help", "search"))
|
|
async def cmd_start(message: types.Message):
|
|
await message.answer("🤖 <b>VNDB Professional</b>\n/vn, /char, /release, /random, /top")
|
|
|
|
if __name__ == "__main__":
|
|
dp.run_polling(bot) |