Files
mikopbxbot/bot/main.py
2026-06-26 23:02:51 +03:00

758 lines
27 KiB
Python

import asyncio
import logging
import secrets
import string
from aiogram import Bot, Dispatcher, F
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.filters import Command, StateFilter
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.exceptions import TelegramBadRequest
from config import (
TELEGRAM_BOT_TOKEN, ADMIN_TELEGRAM_IDS,
DEFAULT_SIP_SERVER, DEFAULT_SIP_PORT,
DEFAULT_SIP_DOMAIN, DEFAULT_SIP_TRANSPORT,
DEFAULT_SIP_OUTBOUND_PROXY, DEFAULT_SIP_STUN
)
from services.database import Database
from services.mikopbx_service import MikoPBXService
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
bot = Bot(token=TELEGRAM_BOT_TOKEN)
dp = Dispatcher(storage=MemoryStorage())
db = Database()
mikopbx = MikoPBXService()
class RegistrationStates(StatesGroup):
waiting_for_number = State()
waiting_for_username = State()
waiting_for_custom_secret = State()
class AdminEditStates(StatesGroup):
waiting_for_new_number = State()
waiting_for_new_secret = State()
waiting_for_existing_number = State()
waiting_for_telegram_id = State()
def is_admin(telegram_id: int) -> bool:
return telegram_id in ADMIN_TELEGRAM_IDS
def generate_secret(length: int = 16) -> str:
alphabet = string.ascii_letters + string.digits
return ''.join(secrets.choice(alphabet) for _ in range(length))
async def safe_edit_text(callback: CallbackQuery, text: str, reply_markup=None, parse_mode=None):
"""Edit message text, ignoring 'message is not modified' errors."""
try:
await callback.message.edit_text(text, reply_markup=reply_markup, parse_mode=parse_mode)
except TelegramBadRequest as e:
if "message is not modified" in str(e).lower():
pass
else:
raise
await callback.answer()
# ============== KEYBOARDS ==============
def main_menu_keyboard(is_admin_user: bool = False):
buttons = [
[InlineKeyboardButton(text="Register new number", callback_data="register")],
[InlineKeyboardButton(text="My accounts", callback_data="my_accounts")],
]
if is_admin_user:
buttons.append([InlineKeyboardButton(text="Admin panel", callback_data="admin_menu")])
return InlineKeyboardMarkup(inline_keyboard=buttons)
def admin_menu_keyboard():
return InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="All registered accounts", callback_data="admin_list")],
[InlineKeyboardButton(text="Link existing MikoPBX number", callback_data="admin_link_existing")],
[InlineKeyboardButton(text="Back to main menu", callback_data="back_main")]
])
def account_list_keyboard(accounts: list, prefix: str = "account"):
buttons = []
for acc in accounts[:10]: # Limit to 10
text = f"{acc['extension_number']}"
if acc.get('telegram_username'):
text += f" (@{acc['telegram_username']})"
buttons.append([InlineKeyboardButton(
text=text,
callback_data=f"{prefix}_{acc['extension_number']}"
)])
buttons.append([InlineKeyboardButton(text="Back", callback_data="back_main")])
return InlineKeyboardMarkup(inline_keyboard=buttons)
# ============== HANDLERS ==============
@dp.message(Command("start"))
async def cmd_start(message: Message):
await db.init_db()
user = await db.add_user(
message.from_user.id,
message.from_user.username,
message.from_user.full_name
)
is_admin_user = is_admin(message.from_user.id)
keyboard = main_menu_keyboard(is_admin_user)
await message.answer(
"Welcome to public SIP telephony service.\n\n"
"Here you can register your own SIP number and get connection parameters.\n\n"
"Please select an option:",
reply_markup=keyboard
)
@dp.callback_query(F.data == "register")
async def start_registration(callback: CallbackQuery, state: FSMContext):
await safe_edit_text(
callback,
"Enter the desired extension number (digits only, e.g. 1001):\n"
"Or send /cancel to abort."
)
await state.set_state(RegistrationStates.waiting_for_number)
@dp.message(RegistrationStates.waiting_for_number)
async def process_number(message: Message, state: FSMContext):
number = message.text.strip()
# Strict validation: exactly 7 digits
if not number.isdigit() or len(number) != 7:
await message.answer("The number must be exactly 7 digits (e.g. 1001001).")
return
# Check if number already exists in our DB
existing = await db.get_sip_account_by_number(number)
if existing:
await message.answer("This number is already taken. Please choose another one.")
return
# Check if number already exists in MikoPBX
miko_existing = await mikopbx.get_employee_by_number(number)
if miko_existing:
await message.answer("This number is already occupied in MikoPBX. Please choose another one.")
return
# Check if this user already has an account (one user = one number)
user_accounts = await db.get_user_sip_accounts(message.from_user.id)
if user_accounts:
await message.answer(
"You already have a registered number.\n"
"Each user can have only one number."
)
await state.clear()
is_admin_user = is_admin(message.from_user.id)
await message.answer("Main menu:", reply_markup=main_menu_keyboard(is_admin_user))
return
await state.update_data(desired_number=number)
await message.answer("Enter your name (optional, for the account):")
await state.set_state(RegistrationStates.waiting_for_username)
@dp.message(RegistrationStates.waiting_for_username)
async def process_username(message: Message, state: FSMContext):
data = await state.get_data()
number = data["desired_number"]
username = message.text.strip() if message.text.strip() else f"User {number}"
await state.update_data(username=username)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Generate random password", callback_data="gen_secret")],
[InlineKeyboardButton(text="Enter my own password", callback_data="custom_secret")]
])
await message.answer(
"Would you like to use a generated password or enter your own?",
reply_markup=keyboard
)
await state.set_state(RegistrationStates.waiting_for_custom_secret)
@dp.callback_query(F.data == "gen_secret", StateFilter(RegistrationStates.waiting_for_custom_secret))
async def use_generated_secret(callback: CallbackQuery, state: FSMContext):
data = await state.get_data()
number = data["desired_number"]
username = data.get("username", f"User {number}")
secret = generate_secret(12)
await callback.answer()
await create_account_and_reply(callback.message, state, number, username, secret, callback.from_user.id)
@dp.callback_query(F.data == "custom_secret", StateFilter(RegistrationStates.waiting_for_custom_secret))
async def ask_for_custom_secret(callback: CallbackQuery, state: FSMContext):
await safe_edit_text(
callback,
"Please enter the password you want to use for this SIP account (min 6 characters):"
)
@dp.message(RegistrationStates.waiting_for_custom_secret)
async def process_custom_secret(message: Message, state: FSMContext):
secret = message.text.strip()
if len(secret) < 6:
await message.answer("Password must be at least 6 characters long. Please try again.")
return
data = await state.get_data()
number = data["desired_number"]
username = data.get("username", f"User {number}")
await create_account_and_reply(message, state, number, username, secret, message.from_user.id)
async def create_account_and_reply(message_obj, state, number: str, username: str, secret: str, telegram_id: int):
await message_obj.answer("Creating your SIP account with the chosen password... Please wait.")
# Create via MikoPBX with custom secret (API v3)
result = await mikopbx.create_extension(
number=number,
secret=secret,
username=username
)
if result.get("success"):
# Save to our database
user = await db.get_user(telegram_id)
if user:
await db.add_sip_account(
user_id=user["id"],
extension_number=number,
sip_secret=secret,
username=username
)
connection_info = (
f"Your SIP account has been created!\n\n"
f"Number: {number}\n"
f"Secret (password): {secret}\n\n"
f"--- Full connection parameters ---\n"
f"Domain: {DEFAULT_SIP_DOMAIN}\n"
f"Server / Outbound Proxy: {DEFAULT_SIP_SERVER}\n"
f"Port: {DEFAULT_SIP_PORT}\n"
f"Transport: {DEFAULT_SIP_TRANSPORT}\n"
f"Username / Auth ID: {number}\n"
f"Password: {secret}\n"
)
if DEFAULT_SIP_OUTBOUND_PROXY:
connection_info += f"Outbound Proxy: {DEFAULT_SIP_OUTBOUND_PROXY}\n"
if DEFAULT_SIP_STUN:
connection_info += f"STUN Server: {DEFAULT_SIP_STUN}\n"
connection_info += (
f"\nRecommended clients: Zoiper, MicroSIP, Linphone, Bria\n"
f"Use these credentials in your SIP client."
)
await message_obj.answer(connection_info)
else:
error_msg = result.get("error", "Unknown error")
await message_obj.answer(f"Failed to create account: {error_msg}\n\n"
f"Please try again later or contact administrator.")
await state.clear()
# Show main menu again
is_admin_user = is_admin(telegram_id)
keyboard = main_menu_keyboard(is_admin_user)
await message_obj.answer("What would you like to do next?", reply_markup=keyboard)
@dp.callback_query(F.data == "my_accounts")
async def show_my_accounts(callback: CallbackQuery):
accounts = await db.get_user_sip_accounts(callback.from_user.id)
if not accounts:
await safe_edit_text(
callback,
"You don't have any registered accounts yet.",
reply_markup=main_menu_keyboard(is_admin(callback.from_user.id))
)
else:
keyboard = InlineKeyboardMarkup(inline_keyboard=[])
for acc in accounts:
keyboard.inline_keyboard.append([
InlineKeyboardButton(
text=f"📞 {acc['extension_number']}",
callback_data=f"my_account_{acc['extension_number']}"
)
])
keyboard.inline_keyboard.append([
InlineKeyboardButton(text="Back", callback_data="back_main")
])
await safe_edit_text(
callback,
"Your SIP accounts. Select one to view details:",
reply_markup=keyboard
)
@dp.callback_query(F.data.startswith("my_account_"))
async def show_my_account_details(callback: CallbackQuery):
number = callback.data.replace("my_account_", "")
account = await db.get_sip_account_by_number(number)
if not account:
await callback.answer("Account not found.")
return
text = (
f"Account: {account['extension_number']}\n"
f"Password: {account['sip_secret'] or 'Not set'}\n"
f"Created: {account['created_at']}\n\n"
"What would you like to do?"
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Show connection parameters", callback_data=f"show_params_{number}")],
[InlineKeyboardButton(text="Delete this account", callback_data=f"delete_my_account_{number}")],
[InlineKeyboardButton(text="Back to my accounts", callback_data="my_accounts")]
])
await safe_edit_text(callback, text, reply_markup=keyboard)
@dp.callback_query(F.data.startswith("show_params_"))
async def show_connection_parameters(callback: CallbackQuery):
number = callback.data.replace("show_params_", "")
account = await db.get_sip_account_by_number(number)
if not account:
await callback.answer("Account not found.")
return
connection_info = (
f"Connection parameters for {number}:\n\n"
f"Domain: {DEFAULT_SIP_DOMAIN}\n"
f"Server: {DEFAULT_SIP_SERVER}\n"
f"Port: {DEFAULT_SIP_PORT}\n"
f"Transport: {DEFAULT_SIP_TRANSPORT}\n"
f"Username: {number}\n"
f"Password: {account['sip_secret']}\n"
)
if DEFAULT_SIP_OUTBOUND_PROXY:
connection_info += f"Outbound Proxy: {DEFAULT_SIP_OUTBOUND_PROXY}\n"
if DEFAULT_SIP_STUN:
connection_info += f"STUN: {DEFAULT_SIP_STUN}\n"
connection_info += "\nRecommended clients: Zoiper, MicroSIP, Linphone"
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Back", callback_data=f"my_account_{number}")]
])
await safe_edit_text(callback, connection_info, reply_markup=keyboard)
@dp.callback_query(F.data.startswith("delete_my_account_"))
async def confirm_delete_my_account(callback: CallbackQuery):
number = callback.data.replace("delete_my_account_", "")
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Yes, delete", callback_data=f"confirm_delete_my_{number}")],
[InlineKeyboardButton(text="Cancel", callback_data=f"my_account_{number}")]
])
await safe_edit_text(
callback,
f"Are you sure you want to delete extension {number}?\n"
"This action cannot be undone.",
reply_markup=keyboard
)
@dp.callback_query(F.data.startswith("confirm_delete_my_"))
async def delete_my_account(callback: CallbackQuery):
number = callback.data.replace("confirm_delete_my_", "")
# Delete from MikoPBX
mikopbx_result = await mikopbx.delete_extension(number)
if mikopbx_result.get("success"):
# Delete from local DB only if MikoPBX deletion succeeded
await db.delete_sip_account(number)
await safe_edit_text(
callback,
f"Extension {number} has been deleted.",
reply_markup=main_menu_keyboard(is_admin(callback.from_user.id))
)
else:
error = mikopbx_result.get("error", "Unknown error")
await safe_edit_text(
callback,
f"Failed to delete {number} in MikoPBX:\n{error}",
reply_markup=main_menu_keyboard(is_admin(callback.from_user.id))
)
# ============== ADMIN HANDLERS ==============
@dp.callback_query(F.data == "admin_menu")
async def show_admin_menu(callback: CallbackQuery):
if not is_admin(callback.from_user.id):
await callback.answer("Access denied.")
return
await safe_edit_text(
callback,
"Administrator panel\n\n"
"Select an action:",
reply_markup=admin_menu_keyboard()
)
# ============== LINK EXISTING MIKOPBX ACCOUNT ==============
@dp.callback_query(F.data == "admin_link_existing")
async def admin_start_link_existing(callback: CallbackQuery, state: FSMContext):
if not is_admin(callback.from_user.id):
await callback.answer("Access denied.")
return
await safe_edit_text(
callback,
"Enter the existing MikoPBX extension number (exactly 7 digits) that you want to link to a Telegram user:"
)
await state.set_state(AdminEditStates.waiting_for_existing_number)
@dp.message(AdminEditStates.waiting_for_existing_number)
async def admin_process_existing_number(message: Message, state: FSMContext):
if not is_admin(message.from_user.id):
await message.answer("Access denied.")
await state.clear()
return
number = message.text.strip()
if not number.isdigit() or len(number) != 7:
await message.answer("Number must be exactly 7 digits.")
return
# Check if already linked in our DB
existing = await db.get_sip_account_by_number(number)
if existing:
await message.answer(
f"Number {number} is already linked to Telegram ID {existing.get('telegram_id', 'unknown')}."
)
await state.clear()
return
await state.update_data(existing_number=number)
await message.answer(
"Enter the Telegram user ID to link this number to (or forward a message from the user):"
)
await state.set_state(AdminEditStates.waiting_for_telegram_id)
@dp.message(AdminEditStates.waiting_for_telegram_id)
async def admin_process_telegram_id(message: Message, state: FSMContext):
if not is_admin(message.from_user.id):
await message.answer("Access denied.")
await state.clear()
return
data = await state.get_data()
number = data["existing_number"]
try:
telegram_id = int(message.text.strip())
except ValueError:
await message.answer("Please enter a valid numeric Telegram ID.")
return
# Check if this Telegram user already has an account
user_accounts = await db.get_user_sip_accounts(telegram_id)
if user_accounts:
await message.answer(
f"Telegram user {telegram_id} already has a registered number."
)
await state.clear()
return
# Add user to database if not exists
user_id = await db.add_user(telegram_id)
# Link the existing number (secret will be empty - admin can set it later)
await db.add_sip_account(
user_id=user_id,
extension_number=number,
sip_secret=None, # Admin can set password later
username=f"Linked user {telegram_id}"
)
await message.answer(
f"Successfully linked existing MikoPBX number {number} to Telegram user {telegram_id}.\n\n"
f"The user can now view their account via the bot.\n"
f"You can set the password using 'Change password' in admin panel."
)
await state.clear()
# Return to admin menu
await message.answer(
"Administrator panel",
reply_markup=admin_menu_keyboard()
)
@dp.callback_query(F.data == "admin_list")
async def admin_list_accounts(callback: CallbackQuery):
if not is_admin(callback.from_user.id):
await callback.answer("Access denied.")
return
accounts = await db.get_all_sip_accounts()
if not accounts:
await safe_edit_text(
callback,
"No accounts registered yet.",
reply_markup=admin_menu_keyboard()
)
else:
await safe_edit_text(
callback,
"Registered accounts (click to view details):",
reply_markup=account_list_keyboard(accounts, prefix="admin_view")
)
@dp.callback_query(F.data.startswith("admin_view_"))
async def admin_view_account(callback: CallbackQuery):
if not is_admin(callback.from_user.id):
await callback.answer("Access denied.")
return
number = callback.data.replace("admin_view_", "")
account = await db.get_sip_account_by_number(number)
if not account:
await callback.answer("Account not found.")
return
# Get owner info
owner = await db.get_user(account.get('telegram_id', 0)) if account.get('telegram_id') else None
owner_info = ""
if owner:
owner_info = f"Owner: @{owner.get('telegram_username', 'N/A')} (ID: {owner['telegram_id']})\n"
else:
owner_info = f"Owner Telegram ID: {account.get('telegram_id', 'N/A')}\n"
text = (
f"Account details:\n\n"
f"Number: {account['extension_number']}\n"
f"Password: {account['sip_secret'] or 'Not set'}\n"
f"Username: {account.get('username', 'N/A')}\n"
f"{owner_info}"
f"Created: {account['created_at']}\n\n"
f"Actions:"
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Change number", callback_data=f"admin_change_number_{number}")],
[InlineKeyboardButton(text="Change password", callback_data=f"admin_change_secret_{number}")],
[InlineKeyboardButton(text="Delete account", callback_data=f"admin_delete_{number}")],
[InlineKeyboardButton(text="Back to list", callback_data="admin_list")]
])
await safe_edit_text(callback, text, reply_markup=keyboard)
@dp.callback_query(F.data.startswith("admin_change_number_"))
async def admin_start_change_number(callback: CallbackQuery, state: FSMContext):
if not is_admin(callback.from_user.id):
await callback.answer("Access denied.")
return
number = callback.data.replace("admin_change_number_", "")
await state.update_data(old_number=number)
await safe_edit_text(
callback,
f"Enter new number for extension {number}:"
)
await state.set_state(AdminEditStates.waiting_for_new_number)
@dp.message(AdminEditStates.waiting_for_new_number)
async def admin_process_new_number(message: Message, state: FSMContext):
if not is_admin(message.from_user.id):
await message.answer("Access denied.")
await state.clear()
return
new_number = message.text.strip()
data = await state.get_data()
old_number = data["old_number"]
if not new_number.isdigit() or len(new_number) < 3:
await message.answer("Please enter a valid number (at least 3 digits).")
return
# Check if new number already exists
existing = await db.get_sip_account_by_number(new_number)
if existing:
await message.answer("This number is already taken.")
return
account = await db.get_sip_account_by_number(old_number)
if not account:
await message.answer("Account not found in local database.")
await state.clear()
return
await message.answer("Changing number in MikoPBX and local database...")
# Try to change number in MikoPBX
mikopbx_result = await mikopbx.change_extension_number(old_number, new_number)
if mikopbx_result.get("success"):
# Update local database
await db.update_extension_number(old_number, new_number)
await message.answer(
f"Number successfully changed from {old_number} to {new_number} in both MikoPBX and database."
)
else:
# Fallback: update only local DB + inform admin
await db.update_extension_number(old_number, new_number)
error = mikopbx_result.get("error", "Unknown error")
await message.answer(
f"Number changed only in local database.\n"
f"MikoPBX sync failed: {error}\n\n"
f"You may need to manually adjust the number in the MikoPBX web interface."
)
await state.clear()
# Return to admin menu
await message.answer(
"Administrator panel",
reply_markup=admin_menu_keyboard()
)
@dp.callback_query(F.data.startswith("admin_delete_"))
async def admin_delete_account(callback: CallbackQuery):
if not is_admin(callback.from_user.id):
await callback.answer("Access denied.")
return
number = callback.data.replace("admin_delete_", "")
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Yes, delete", callback_data=f"confirm_delete_{number}")],
[InlineKeyboardButton(text="Cancel", callback_data="admin_list")]
])
await safe_edit_text(
callback,
f"Are you sure you want to delete extension {number}?\n"
f"This will remove it from both the database and MikoPBX.",
reply_markup=keyboard
)
@dp.callback_query(F.data.startswith("confirm_delete_"))
async def admin_confirm_delete(callback: CallbackQuery):
if not is_admin(callback.from_user.id):
await callback.answer("Access denied.")
return
number = callback.data.replace("confirm_delete_", "")
await safe_edit_text(callback, "Deleting account...")
# Delete from MikoPBX
mikopbx_result = await mikopbx.delete_extension(number)
if mikopbx_result.get("success"):
# Delete from local DB only if MikoPBX deletion succeeded
await db.delete_sip_account(number)
await safe_edit_text(
callback,
f"Extension {number} has been deleted.",
reply_markup=admin_menu_keyboard()
)
else:
error = mikopbx_result.get("error", "Unknown error")
await safe_edit_text(
callback,
f"Failed to delete {number} in MikoPBX:\n{error}",
reply_markup=admin_menu_keyboard()
)
@dp.callback_query(F.data.startswith("admin_change_secret_"))
async def admin_start_change_secret(callback: CallbackQuery, state: FSMContext):
if not is_admin(callback.from_user.id):
await callback.answer("Access denied.")
return
number = callback.data.replace("admin_change_secret_", "")
await state.update_data(edit_number=number)
await safe_edit_text(
callback,
f"Enter new password (secret) for extension {number}:"
)
await state.set_state(AdminEditStates.waiting_for_new_secret)
@dp.message(AdminEditStates.waiting_for_new_secret)
async def admin_process_new_secret(message: Message, state: FSMContext):
if not is_admin(message.from_user.id):
await message.answer("Access denied.")
await state.clear()
return
new_secret = message.text.strip()
if len(new_secret) < 6:
await message.answer("Password must be at least 6 characters.")
return
data = await state.get_data()
number = data["edit_number"]
await message.answer("Updating password in MikoPBX and database...")
# Update in MikoPBX
mikopbx_result = await mikopbx.set_extension_secret(number, new_secret)
# Update local database regardless
db_success = await db.update_sip_secret(number, new_secret)
if mikopbx_result.get("success") and db_success:
await message.answer(f"Password for {number} successfully updated in both MikoPBX and database.")
elif db_success:
await message.answer(
f"Password updated in local database.\n"
f"MikoPBX sync failed: {mikopbx_result.get('error', 'Unknown error')}"
)
else:
await message.answer("Failed to update password.")
await state.clear()
await message.answer(
"Administrator panel",
reply_markup=admin_menu_keyboard()
)
@dp.callback_query(F.data == "back_main")
async def back_to_main(callback: CallbackQuery):
is_admin_user = is_admin(callback.from_user.id)
keyboard = main_menu_keyboard(is_admin_user)
await safe_edit_text(
callback,
"Main menu:",
reply_markup=keyboard
)
@dp.message(Command("cancel"))
async def cancel_handler(message: Message, state: FSMContext):
current_state = await state.get_state()
if current_state is not None:
await state.clear()
await message.answer("Operation cancelled.")
async def main():
await db.init_db()
logger.info("Starting MikoPBX Telegram Bot...")
await dp.start_polling(bot)
if __name__ == "__main__":
asyncio.run(main())