import asyncio import logging import secrets import string from aiogram import Bot, Dispatcher, F from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton from aiogram.filters import Command, StateFilter from aiogram.fsm.context import FSMContext from aiogram.fsm.state import State, StatesGroup from aiogram.fsm.storage.memory import MemoryStorage from config import ( TELEGRAM_BOT_TOKEN, ADMIN_TELEGRAM_IDS, DEFAULT_SIP_SERVER, DEFAULT_SIP_PORT, DEFAULT_SIP_DOMAIN, DEFAULT_SIP_TRANSPORT, DEFAULT_SIP_OUTBOUND_PROXY, DEFAULT_SIP_STUN ) from services.database import Database from services.mikopbx_service import MikoPBXService logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) bot = Bot(token=TELEGRAM_BOT_TOKEN) dp = Dispatcher(storage=MemoryStorage()) db = Database() mikopbx = MikoPBXService() class RegistrationStates(StatesGroup): waiting_for_number = State() waiting_for_username = State() waiting_for_custom_secret = State() class AdminEditStates(StatesGroup): waiting_for_new_number = State() waiting_for_new_secret = State() waiting_for_existing_number = State() waiting_for_telegram_id = State() def is_admin(telegram_id: int) -> bool: return telegram_id in ADMIN_TELEGRAM_IDS def generate_secret(length: int = 16) -> str: alphabet = string.ascii_letters + string.digits return ''.join(secrets.choice(alphabet) for _ in range(length)) # ============== KEYBOARDS ============== def main_menu_keyboard(is_admin_user: bool = False): buttons = [ [InlineKeyboardButton(text="Register new number", callback_data="register")], [InlineKeyboardButton(text="My accounts", callback_data="my_accounts")], ] if is_admin_user: buttons.append([InlineKeyboardButton(text="Admin panel", callback_data="admin_menu")]) return InlineKeyboardMarkup(inline_keyboard=buttons) def admin_menu_keyboard(): return InlineKeyboardMarkup(inline_keyboard=[ [InlineKeyboardButton(text="All registered accounts", callback_data="admin_list")], [InlineKeyboardButton(text="Link existing MikoPBX number", callback_data="admin_link_existing")], [InlineKeyboardButton(text="Back to main menu", callback_data="back_main")] ]) def account_list_keyboard(accounts: list, prefix: str = "account"): buttons = [] for acc in accounts[:10]: # Limit to 10 text = f"{acc['extension_number']}" if acc.get('telegram_username'): text += f" (@{acc['telegram_username']})" buttons.append([InlineKeyboardButton( text=text, callback_data=f"{prefix}_{acc['extension_number']}" )]) buttons.append([InlineKeyboardButton(text="Back", callback_data="back_main")]) return InlineKeyboardMarkup(inline_keyboard=buttons) # ============== HANDLERS ============== @dp.message(Command("start")) async def cmd_start(message: Message): await db.init_db() user = await db.add_user( message.from_user.id, message.from_user.username, message.from_user.full_name ) is_admin_user = is_admin(message.from_user.id) keyboard = main_menu_keyboard(is_admin_user) await message.answer( "Welcome to public SIP telephony service.\n\n" "Here you can register your own SIP number and get connection parameters.\n\n" "Please select an option:", reply_markup=keyboard ) @dp.callback_query(F.data == "register") async def start_registration(callback: CallbackQuery, state: FSMContext): await callback.message.edit_text( "Enter the desired extension number (digits only, e.g. 1001):\n" "Or send /cancel to abort." ) await state.set_state(RegistrationStates.waiting_for_number) await callback.answer() @dp.message(RegistrationStates.waiting_for_number) async def process_number(message: Message, state: FSMContext): number = message.text.strip() # Strict validation: exactly 7 digits if not number.isdigit() or len(number) != 7: await message.answer("The number must be exactly 7 digits (e.g. 1001001).") return # Check if number already exists in our DB existing = await db.get_sip_account_by_number(number) if existing: await message.answer("This number is already taken. Please choose another one.") return # Check if this user already has an account (one user = one number) user_accounts = await db.get_user_sip_accounts(message.from_user.id) if user_accounts: await message.answer( "You already have a registered number.\n" "Each user can have only one number." ) await state.clear() is_admin_user = is_admin(message.from_user.id) await message.answer("Main menu:", reply_markup=main_menu_keyboard(is_admin_user)) return await state.update_data(desired_number=number) await message.answer("Enter your name (optional, for the account):") await state.set_state(RegistrationStates.waiting_for_username) @dp.message(RegistrationStates.waiting_for_username) async def process_username(message: Message, state: FSMContext): data = await state.get_data() number = data["desired_number"] username = message.text.strip() if message.text.strip() else f"User {number}" await state.update_data(username=username) keyboard = InlineKeyboardMarkup(inline_keyboard=[ [InlineKeyboardButton(text="Generate random password", callback_data="gen_secret")], [InlineKeyboardButton(text="Enter my own password", callback_data="custom_secret")] ]) await message.answer( "Would you like to use a generated password or enter your own?", reply_markup=keyboard ) await state.set_state(RegistrationStates.waiting_for_custom_secret) @dp.callback_query(F.data == "gen_secret", StateFilter(RegistrationStates.waiting_for_custom_secret)) async def use_generated_secret(callback: CallbackQuery, state: FSMContext): data = await state.get_data() number = data["desired_number"] username = data.get("username", f"User {number}") secret = generate_secret(12) await create_account_and_reply(callback.message, state, number, username, secret, callback.from_user.id) @dp.callback_query(F.data == "custom_secret", StateFilter(RegistrationStates.waiting_for_custom_secret)) async def ask_for_custom_secret(callback: CallbackQuery, state: FSMContext): await callback.message.edit_text( "Please enter the password you want to use for this SIP account (min 6 characters):" ) await callback.answer() @dp.message(RegistrationStates.waiting_for_custom_secret) async def process_custom_secret(message: Message, state: FSMContext): secret = message.text.strip() if len(secret) < 6: await message.answer("Password must be at least 6 characters long. Please try again.") return data = await state.get_data() number = data["desired_number"] username = data.get("username", f"User {number}") await create_account_and_reply(message, state, number, username, secret, message.from_user.id) async def create_account_and_reply(message_obj, state, number: str, username: str, secret: str, telegram_id: int): await message_obj.answer("Creating your SIP account with the chosen password... Please wait.") # Create via MikoPBX with custom secret (API v3) result = await mikopbx.create_extension( number=number, secret=secret, username=username ) if result.get("success"): # Save to our database user = await db.get_user(telegram_id) if user: await db.add_sip_account( user_id=user["id"], extension_number=number, sip_secret=secret, username=username ) connection_info = ( f"Your SIP account has been created!\n\n" f"Number: {number}\n" f"Secret (password): {secret}\n\n" f"--- Full connection parameters ---\n" f"Domain: {DEFAULT_SIP_DOMAIN}\n" f"Server / Outbound Proxy: {DEFAULT_SIP_SERVER}\n" f"Port: {DEFAULT_SIP_PORT}\n" f"Transport: {DEFAULT_SIP_TRANSPORT}\n" f"Username / Auth ID: {number}\n" f"Password: {secret}\n" ) if DEFAULT_SIP_OUTBOUND_PROXY: connection_info += f"Outbound Proxy: {DEFAULT_SIP_OUTBOUND_PROXY}\n" if DEFAULT_SIP_STUN: connection_info += f"STUN Server: {DEFAULT_SIP_STUN}\n" connection_info += ( f"\nRecommended clients: Zoiper, MicroSIP, Linphone, Bria\n" f"Use these credentials in your SIP client." ) await message_obj.answer(connection_info) else: error_msg = result.get("error", "Unknown error") await message_obj.answer(f"Failed to create account: {error_msg}\n\n" f"Please try again later or contact administrator.") await state.clear() # Show main menu again is_admin_user = is_admin(telegram_id) keyboard = main_menu_keyboard(is_admin_user) await message_obj.answer("What would you like to do next?", reply_markup=keyboard) @dp.callback_query(F.data == "my_accounts") async def show_my_accounts(callback: CallbackQuery): accounts = await db.get_user_sip_accounts(callback.from_user.id) if not accounts: await callback.message.edit_text( "You don't have any registered accounts yet.", reply_markup=main_menu_keyboard(is_admin(callback.from_user.id)) ) else: text = "Your SIP accounts:\n\n" for acc in accounts: text += f"Number: {acc['extension_number']}\n" if acc['sip_secret']: text += f"Password: {acc['sip_secret']}\n" text += f"Created: {acc['created_at']}\n\n" await callback.message.edit_text( text, reply_markup=main_menu_keyboard(is_admin(callback.from_user.id)) ) await callback.answer() # ============== ADMIN HANDLERS ============== @dp.callback_query(F.data == "admin_menu") async def show_admin_menu(callback: CallbackQuery): if not is_admin(callback.from_user.id): await callback.answer("Access denied.") return await callback.message.edit_text( "Administrator panel\n\n" "Select an action:", reply_markup=admin_menu_keyboard() ) await callback.answer() # ============== LINK EXISTING MIKOPBX ACCOUNT ============== @dp.callback_query(F.data == "admin_link_existing") async def admin_start_link_existing(callback: CallbackQuery, state: FSMContext): if not is_admin(callback.from_user.id): await callback.answer("Access denied.") return await callback.message.edit_text( "Enter the existing MikoPBX extension number (exactly 7 digits) that you want to link to a Telegram user:" ) await state.set_state(AdminEditStates.waiting_for_existing_number) await callback.answer() @dp.message(AdminEditStates.waiting_for_existing_number) async def admin_process_existing_number(message: Message, state: FSMContext): if not is_admin(message.from_user.id): await message.answer("Access denied.") await state.clear() return number = message.text.strip() if not number.isdigit() or len(number) != 7: await message.answer("Number must be exactly 7 digits.") return # Check if already linked in our DB existing = await db.get_sip_account_by_number(number) if existing: await message.answer( f"Number {number} is already linked to Telegram ID {existing.get('telegram_id', 'unknown')}." ) await state.clear() return await state.update_data(existing_number=number) await message.answer( "Enter the Telegram user ID to link this number to (or forward a message from the user):" ) await state.set_state(AdminEditStates.waiting_for_telegram_id) @dp.message(AdminEditStates.waiting_for_telegram_id) async def admin_process_telegram_id(message: Message, state: FSMContext): if not is_admin(message.from_user.id): await message.answer("Access denied.") await state.clear() return data = await state.get_data() number = data["existing_number"] try: telegram_id = int(message.text.strip()) except ValueError: await message.answer("Please enter a valid numeric Telegram ID.") return # Check if this Telegram user already has an account user_accounts = await db.get_user_sip_accounts(telegram_id) if user_accounts: await message.answer( f"Telegram user {telegram_id} already has a registered number." ) await state.clear() return # Add user to database if not exists user_id = await db.add_user(telegram_id) # Link the existing number (secret will be empty - admin can set it later) await db.add_sip_account( user_id=user_id, extension_number=number, sip_secret=None, # Admin can set password later username=f"Linked user {telegram_id}" ) await message.answer( f"Successfully linked existing MikoPBX number {number} to Telegram user {telegram_id}.\n\n" f"The user can now view their account via the bot.\n" f"You can set the password using 'Change password' in admin panel." ) await state.clear() # Return to admin menu await message.answer( "Administrator panel", reply_markup=admin_menu_keyboard() ) @dp.callback_query(F.data == "admin_list") async def admin_list_accounts(callback: CallbackQuery): if not is_admin(callback.from_user.id): await callback.answer("Access denied.") return accounts = await db.get_all_sip_accounts() if not accounts: await callback.message.edit_text( "No accounts registered yet.", reply_markup=admin_menu_keyboard() ) else: await callback.message.edit_text( "Registered accounts (click to view details):", reply_markup=account_list_keyboard(accounts, prefix="admin_view") ) await callback.answer() @dp.callback_query(F.data.startswith("admin_view_")) async def admin_view_account(callback: CallbackQuery): if not is_admin(callback.from_user.id): await callback.answer("Access denied.") return number = callback.data.replace("admin_view_", "") account = await db.get_sip_account_by_number(number) if not account: await callback.answer("Account not found.") return text = ( f"Account details:\n\n" f"Number: {account['extension_number']}\n" f"Password: {account['sip_secret'] or 'Not set'}\n" f"Username: {account.get('username', 'N/A')}\n" f"Owner Telegram ID: {account.get('telegram_id', 'N/A')}\n" f"Created: {account['created_at']}\n\n" f"Actions:" ) keyboard = InlineKeyboardMarkup(inline_keyboard=[ [InlineKeyboardButton(text="Change number", callback_data=f"admin_change_number_{number}")], [InlineKeyboardButton(text="Change password", callback_data=f"admin_change_secret_{number}")], [InlineKeyboardButton(text="Delete account", callback_data=f"admin_delete_{number}")], [InlineKeyboardButton(text="Back to list", callback_data="admin_list")] ]) await callback.message.edit_text(text, reply_markup=keyboard) await callback.answer() @dp.callback_query(F.data.startswith("admin_change_number_")) async def admin_start_change_number(callback: CallbackQuery, state: FSMContext): if not is_admin(callback.from_user.id): await callback.answer("Access denied.") return number = callback.data.replace("admin_change_number_", "") await state.update_data(old_number=number) await callback.message.edit_text( f"Enter new number for extension {number}:" ) await state.set_state(AdminEditStates.waiting_for_new_number) await callback.answer() @dp.message(AdminEditStates.waiting_for_new_number) async def admin_process_new_number(message: Message, state: FSMContext): if not is_admin(message.from_user.id): await message.answer("Access denied.") await state.clear() return new_number = message.text.strip() data = await state.get_data() old_number = data["old_number"] if not new_number.isdigit() or len(new_number) < 3: await message.answer("Please enter a valid number (at least 3 digits).") return # Check if new number already exists existing = await db.get_sip_account_by_number(new_number) if existing: await message.answer("This number is already taken.") return account = await db.get_sip_account_by_number(old_number) if not account: await message.answer("Account not found in local database.") await state.clear() return await message.answer("Changing number in MikoPBX and local database...") # Try to change number in MikoPBX mikopbx_result = await mikopbx.change_extension_number(old_number, new_number) if mikopbx_result.get("success"): # Update local database await db.update_extension_number(old_number, new_number) await message.answer( f"Number successfully changed from {old_number} to {new_number} in both MikoPBX and database." ) else: # Fallback: update only local DB + inform admin await db.update_extension_number(old_number, new_number) error = mikopbx_result.get("error", "Unknown error") await message.answer( f"Number changed only in local database.\n" f"MikoPBX sync failed: {error}\n\n" f"You may need to manually adjust the number in the MikoPBX web interface." ) await state.clear() # Return to admin menu await message.answer( "Administrator panel", reply_markup=admin_menu_keyboard() ) @dp.callback_query(F.data.startswith("admin_delete_")) async def admin_delete_account(callback: CallbackQuery): if not is_admin(callback.from_user.id): await callback.answer("Access denied.") return number = callback.data.replace("admin_delete_", "") keyboard = InlineKeyboardMarkup(inline_keyboard=[ [InlineKeyboardButton(text="Yes, delete", callback_data=f"confirm_delete_{number}")], [InlineKeyboardButton(text="Cancel", callback_data="admin_list")] ]) await callback.message.edit_text( f"Are you sure you want to delete extension {number}?\n" f"This will remove it from both the database and MikoPBX.", reply_markup=keyboard ) await callback.answer() @dp.callback_query(F.data.startswith("confirm_delete_")) async def admin_confirm_delete(callback: CallbackQuery): if not is_admin(callback.from_user.id): await callback.answer("Access denied.") return number = callback.data.replace("confirm_delete_", "") await callback.message.edit_text("Deleting account...") # Delete from MikoPBX mikopbx_result = await mikopbx.delete_extension(number) # Delete from local DB db_success = await db.delete_sip_account(number) if mikopbx_result.get("success") or db_success: await callback.message.edit_text( f"Extension {number} has been deleted.", reply_markup=admin_menu_keyboard() ) else: await callback.message.edit_text( f"Failed to delete {number}.", reply_markup=admin_menu_keyboard() ) await callback.answer() @dp.callback_query(F.data.startswith("admin_change_secret_")) async def admin_start_change_secret(callback: CallbackQuery, state: FSMContext): if not is_admin(callback.from_user.id): await callback.answer("Access denied.") return number = callback.data.replace("admin_change_secret_", "") await state.update_data(edit_number=number) await callback.message.edit_text( f"Enter new password (secret) for extension {number}:" ) await state.set_state(AdminEditStates.waiting_for_new_secret) await callback.answer() @dp.message(AdminEditStates.waiting_for_new_secret) async def admin_process_new_secret(message: Message, state: FSMContext): if not is_admin(message.from_user.id): await message.answer("Access denied.") await state.clear() return new_secret = message.text.strip() if len(new_secret) < 6: await message.answer("Password must be at least 6 characters.") return data = await state.get_data() number = data["edit_number"] await message.answer("Updating password in MikoPBX and database...") # Update in MikoPBX mikopbx_result = await mikopbx.set_extension_secret(number, new_secret) # Update local database regardless db_success = await db.update_sip_secret(number, new_secret) if mikopbx_result.get("success") and db_success: await message.answer(f"Password for {number} successfully updated in both MikoPBX and database.") elif db_success: await message.answer( f"Password updated in local database.\n" f"MikoPBX sync failed: {mikopbx_result.get('error', 'Unknown error')}" ) else: await message.answer("Failed to update password.") await state.clear() await message.answer( "Administrator panel", reply_markup=admin_menu_keyboard() ) @dp.callback_query(F.data == "back_main") async def back_to_main(callback: CallbackQuery): is_admin_user = is_admin(callback.from_user.id) keyboard = main_menu_keyboard(is_admin_user) await callback.message.edit_text( "Main menu:", reply_markup=keyboard ) await callback.answer() @dp.message(Command("cancel")) async def cancel_handler(message: Message, state: FSMContext): current_state = await state.get_state() if current_state is not None: await state.clear() await message.answer("Operation cancelled.") async def main(): await db.init_db() logger.info("Starting MikoPBX Telegram Bot...") await dp.start_polling(bot) if __name__ == "__main__": asyncio.run(main())