166 lines
7.0 KiB
Python
166 lines
7.0 KiB
Python
import os
|
||
import logging
|
||
import re
|
||
import httpx
|
||
from aiogram import Bot, Dispatcher, types
|
||
from aiogram.filters import Command, CommandObject
|
||
from aiogram.client.default import DefaultBotProperties
|
||
from aiogram.enums import ParseMode
|
||
|
||
TOKEN = os.getenv("TELEGRAM_TOKEN")
|
||
API_URL = os.getenv("VNDB_API_URL", "https://api.vndb.org/kana")
|
||
|
||
logging.basicConfig(level=logging.INFO)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
bot = Bot(token=TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
|
||
dp = Dispatcher()
|
||
|
||
# --- UTILS ---
|
||
|
||
def clean_text(text: str) -> str:
|
||
if not text: return "No description available."
|
||
text = re.sub(r'\[url=?.*?\](.*?)\[/url\]', r'\1', text)
|
||
text = re.sub(r'\[/?b\]', '', text)
|
||
text = re.sub(r'\[/?i\]', '', text)
|
||
return text[:600] + "..." if len(text) > 600 else text
|
||
|
||
def safe_list_to_str(data) -> str:
|
||
if not data: return "N/A"
|
||
if isinstance(data, list):
|
||
processed = []
|
||
for x in data:
|
||
if isinstance(x, str): processed.append(x.upper())
|
||
elif isinstance(x, dict):
|
||
val = x.get('lang') or x.get('name')
|
||
if val: processed.append(str(val).upper())
|
||
return ", ".join(processed) if processed else "N/A"
|
||
return str(data).upper()
|
||
|
||
async def send_result(message: types.Message, text: str, image_url: str = None):
|
||
"""Отправляет фото с описанием или просто текст, если фото нет."""
|
||
try:
|
||
if image_url:
|
||
await message.answer_photo(photo=image_url, caption=text[:1024])
|
||
else:
|
||
await message.answer(text)
|
||
except Exception as e:
|
||
logger.warning(f"Failed to send photo: {e}. Falling back to text.")
|
||
await message.answer(text)
|
||
|
||
async def fetch_vndb(endpoint: str, filters: list, fields: str):
|
||
payload = {"filters": filters, "fields": fields}
|
||
async with httpx.AsyncClient(timeout=15.0) as client:
|
||
try:
|
||
response = await client.post(f"{API_URL}/{endpoint}", json=payload)
|
||
if response.status_code == 200:
|
||
return response.json().get("results", [])
|
||
logger.error(f"VNDB Error {response.status_code}: {response.text}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"Connection error: {e}")
|
||
return None
|
||
|
||
# --- HANDLERS ---
|
||
|
||
@dp.message(Command("vn"))
|
||
async def handle_vn(message: types.Message, command: CommandObject):
|
||
if not command.args: return await message.answer("Usage: /vn <name/id>")
|
||
|
||
is_id = command.args.startswith('v') and command.args[1:].isdigit()
|
||
filt = ["id", "=", command.args] if is_id else ["search", "=", command.args]
|
||
|
||
# image{url} - правильный синтаксис для Kana API
|
||
fields = "id, title, alttitle, released, rating, votecount, description, languages, platforms, developers{name}, image{url}"
|
||
res = await fetch_vndb("vn", filt, fields)
|
||
|
||
if not res: return await message.answer("❌ Not found.")
|
||
if len(res) > 1 and not is_id:
|
||
out = [f"• {i['title']} — <code>{i['id']}</code>" for i in res[:10]]
|
||
return await message.answer("🔍 <b>Select VN by ID:</b>\n" + "\n".join(out))
|
||
|
||
v = res[0]
|
||
img = v.get('image', {}).get('url') if v.get('image') else None
|
||
|
||
text = (
|
||
f"<b>TITLE:</b> {v['title']}\n"
|
||
f"<b>ORIGINAL:</b> {v.get('alttitle') or 'N/A'}\n"
|
||
f"<b>DEVELOPER:</b> {safe_list_to_str(v.get('developers'))}\n"
|
||
f"<b>RATING:</b> {v['rating']/10 if v.get('rating') else 'N/A'} ({v.get('votecount', 0)} votes)\n"
|
||
f"<b>LANGUAGES:</b> {safe_list_to_str(v.get('languages'))}\n"
|
||
f"<b>PLATFORMS:</b> {safe_list_to_str(v.get('platforms'))}\n\n"
|
||
f"<b>DESCRIPTION:</b>\n<i>{clean_text(v.get('description'))}</i>\n\n"
|
||
f"<b>VNDB:</b> https://vndb.org/{v['id']}"
|
||
)
|
||
await send_result(message, text, img)
|
||
|
||
@dp.message(Command("char"))
|
||
async def handle_char(message: types.Message, command: CommandObject):
|
||
if not command.args: return await message.answer("Usage: /char <name/id>")
|
||
|
||
is_id = command.args.startswith('c') and command.args[1:].isdigit()
|
||
filt = ["id", "=", command.args] if is_id else ["search", "=", command.args]
|
||
|
||
fields = "id, name, original, description, gender, age, blood_type, image{url}"
|
||
res = await fetch_vndb("character", filt, fields)
|
||
|
||
if not res: return await message.answer("❌ Not found.")
|
||
if len(res) > 1 and not is_id:
|
||
out = [f"• {i['name']} — <code>{i['id']}</code>" for i in res[:10]]
|
||
return await message.answer("👤 <b>Select Character by ID:</b>\n" + "\n".join(out))
|
||
|
||
c = res[0]
|
||
img = c.get('image', {}).get('url') if c.get('image') else None
|
||
raw_gender = c.get('gender')
|
||
if isinstance(raw_gender, list): raw_gender = raw_gender[0] if raw_gender else None
|
||
|
||
text = (
|
||
f"<b>NAME:</b> {c['name']}\n"
|
||
f"<b>ORIGINAL:</b> {c.get('original') or 'N/A'}\n"
|
||
f"<b>GENDER:</b> {str(raw_gender).upper()}\n"
|
||
f"<b>AGE:</b> {c.get('age') or 'N/A'}\n\n"
|
||
f"<b>DESCRIPTION:</b>\n<i>{clean_text(c.get('description'))}</i>\n\n"
|
||
f"<b>VNDB:</b> https://vndb.org/{c['id']}"
|
||
)
|
||
await send_result(message, text, img)
|
||
|
||
@dp.message(Command("release"))
|
||
async def handle_rel(message: types.Message, command: CommandObject):
|
||
if not command.args: return await message.answer("Usage: /release <name/id>")
|
||
|
||
is_id = command.args.startswith('r') and command.args[1:].isdigit()
|
||
filt = ["id", "=", command.args] if is_id else ["search", "=", command.args]
|
||
|
||
# images{url} для релизов (это список)
|
||
fields = "id, title, alttitle, released, languages{lang}, platforms, extlinks{url, label}, images{url}"
|
||
res = await fetch_vndb("release", filt, fields)
|
||
|
||
if not res: return await message.answer("❌ Not found.")
|
||
if len(res) > 1 and not is_id:
|
||
out = [f"• {i['title']} — <code>{i['id']}</code>" for i in res[:10]]
|
||
return await message.answer("💿 <b>Select Release by ID:</b>\n" + "\n".join(out))
|
||
|
||
r = res[0]
|
||
# Берем первое изображение из списка images
|
||
img_list = r.get('images', [])
|
||
img = img_list[0].get('url') if img_list else None
|
||
|
||
links = "\n".join([f"• <a href='{l['url']}'>{l['label']}</a>" for l in r.get('extlinks', [])[:5]])
|
||
|
||
text = (
|
||
f"<b>RELEASE:</b> {r['title']}\n"
|
||
f"<b>ORIGINAL:</b> {r.get('alttitle') or 'N/A'}\n"
|
||
f"<b>DATE:</b> {r.get('released', 'N/A')}\n"
|
||
f"<b>LANGUAGES:</b> {safe_list_to_str(r.get('languages'))}\n"
|
||
f"<b>PLATFORMS:</b> {safe_list_to_str(r.get('platforms'))}\n\n"
|
||
f"<b>STORES:</b>\n{links or 'N/A'}\n\n"
|
||
f"<b>VNDB:</b> https://vndb.org/{r['id']}"
|
||
)
|
||
await send_result(message, text, img)
|
||
|
||
@dp.message(Command("start", "help"))
|
||
async def cmd_start(message: types.Message):
|
||
await message.answer("🤖 <b>VNDB Bot with Posters</b>\n\n/vn [name/id]\n/char [name/id]\n/release [name/id]")
|
||
|
||
if __name__ == "__main__":
|
||
dp.run_polling(bot) |