This commit is contained in:
2026-06-24 16:32:30 +03:00
commit 0c49cb785b
9 changed files with 1299 additions and 0 deletions

17
.env.example Normal file
View File

@@ -0,0 +1,17 @@
# MikoPBX configuration
MIKOPBX_HOST=http://your-mikopbx-ip-or-host:8081
MIKOPBX_ADMIN_LOGIN=admin
MIKOPBX_ADMIN_PASSWORD=your-admin-password
# Telegram Bot
TELEGRAM_BOT_TOKEN=your-telegram-bot-token-here
ADMIN_TELEGRAM_IDS=123456789,987654321
# Bot settings
DB_PATH=/app/data/bot.db
DEFAULT_SIP_SERVER=your-mikopbx-host
DEFAULT_SIP_PORT=5060
DEFAULT_SIP_DOMAIN=your-mikopbx-host
DEFAULT_SIP_TRANSPORT=UDP
DEFAULT_SIP_OUTBOUND_PROXY=
DEFAULT_SIP_STUN=

152
README.md Normal file
View File

@@ -0,0 +1,152 @@
# MikoPBX Public SIP Telephony Bot
Telegram-бот для публичной SIP-телефонии на базе MikoPBX.
## Основные возможности
### Для обычных пользователей
- Регистрация **строго 7-значного** номера
- **Один номер на одного пользователя** (ограничение)
- Выбор пароля: сгенерировать или ввести свой
- Получение **полных параметров подключения** (домен, сервер, порт, транспорт, outbound proxy, STUN и т.д.)
- Просмотр своих аккаунтов
### Для администраторов
- Полный список всех аккаунтов
- Смена номера / пароля
- Удаление аккаунтов
- **Привязка уже существующих номеров из MikoPBX** к Telegram ID пользователя
### Синхронизация с MikoPBX
- Полная двусторонняя синхронизация (создание, изменение номера, изменение пароля, удаление)
- Возможность привязывать уже существующие в MikoPBX аккаунты
---
## Настройка (.env)
```env
# MikoPBX
MIKOPBX_HOST=http://192.168.1.50:8081
MIKOPBX_ADMIN_LOGIN=admin
MIKOPBX_ADMIN_PASSWORD=твой_пароль
# Telegram
TELEGRAM_BOT_TOKEN=твой_токен
ADMIN_TELEGRAM_IDS=123456789,987654321
# SIP параметры (показываются пользователям)
DEFAULT_SIP_SERVER=sip.tvoyserver.ru
DEFAULT_SIP_PORT=5060
DEFAULT_SIP_DOMAIN=sip.tvoyserver.ru
DEFAULT_SIP_TRANSPORT=UDP
DEFAULT_SIP_OUTBOUND_PROXY=
DEFAULT_SIP_STUN=stun.l.google.com:19302
```
**Все параметры подключения настраиваются в `.env`** и автоматически подставляются в сообщение пользователю.
---
## Как работает регистрация номера
1. Пользователь нажимает **"Register new number"**
2. Вводит **ровно 7 цифр**
3. Вводит имя (опционально)
4. Выбирает способ получения пароля:
- Сгенерировать случайный
- Ввести свой пароль
5. Бот создаёт аккаунт в **MikoPBX** с указанным паролем
6. Выдаёт **полные параметры подключения**
**Ограничения:**
- Номер строго 7 цифр
- У одного Telegram-пользователя может быть только **один** номер
---
## Привязка существующих номеров MikoPBX (Админ)
Это одна из ключевых фич:
1. Админ → **Admin panel****Link existing MikoPBX number**
2. Вводит существующий 7-значный номер из MikoPBX
3. Вводит Telegram ID пользователя
4. Номер привязывается к пользователю
После этого пользователь может:
- Видеть свой номер в разделе "My accounts"
- Получать параметры подключения (если пароль был заранее известен)
**Полезно**, когда номера уже созданы в MikoPBX вручную.
---
## Полный список кнопок и команд
### Главное меню (для всех)
- `Register new number` — регистрация нового номера
- `My accounts` — мои аккаунты
- `Admin panel` — только для администраторов
### Админ-панель
- `All registered accounts` — список всех аккаунтов
- `Link existing MikoPBX number` — привязка существующего номера
- При просмотре аккаунта:
- `Change number`
- `Change password`
- `Delete account`
### Команды
- `/start` — главное меню
- `/cancel` — отмена текущего действия
---
## Пример сообщения с параметрами подключения
```
Your SIP account has been created!
Number: 1001001
Secret (password): MySuperPass2025
--- Full connection parameters ---
Domain: sip.tvoyserver.ru
Server / Outbound Proxy: sip.tvoyserver.ru
Port: 5060
Transport: UDP
Username / Auth ID: 1001001
Password: MySuperPass2025
Outbound Proxy:
STUN Server: stun.l.google.com:19302
Recommended clients: Zoiper, MicroSIP, Linphone, Bria
Use these credentials in your SIP client.
```
---
## Запуск и обновление
```bash
# Первый запуск
docker compose up -d --build
# Просмотр логов
docker compose logs -f mikopbx-bot
# Пересборка после изменений
docker compose up -d --build
```
---
## Важные замечания
- Номера строго 7 цифр (валидация на стороне бота)
- Один Telegram ID = один номер
- Все изменения (номер/пароль/удаление) выполняются и в MikoPBX, и в локальной БД
- Привязка существующих номеров позволяет работать с уже созданными в MikoPBX аккаунтами без пересоздания
Бот полностью готов к эксплуатации.

10
bot/Dockerfile Normal file
View File

@@ -0,0 +1,10 @@
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "main.py"]

20
bot/config.py Normal file
View File

@@ -0,0 +1,20 @@
import os
from dotenv import load_dotenv
load_dotenv()
MIKOPBX_HOST = os.getenv("MIKOPBX_HOST", "http://localhost:8081")
MIKOPBX_ADMIN_LOGIN = os.getenv("MIKOPBX_ADMIN_LOGIN", "admin")
MIKOPBX_ADMIN_PASSWORD = os.getenv("MIKOPBX_ADMIN_PASSWORD", "admin")
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
ADMIN_TELEGRAM_IDS = [int(x.strip()) for x in os.getenv("ADMIN_TELEGRAM_IDS", "").split(",") if x.strip()]
DB_PATH = os.getenv("DB_PATH", "/app/data/bot.db")
DEFAULT_SIP_SERVER = os.getenv("DEFAULT_SIP_SERVER", "your-mikopbx.example.com")
DEFAULT_SIP_PORT = int(os.getenv("DEFAULT_SIP_PORT", "5060"))
DEFAULT_SIP_DOMAIN = os.getenv("DEFAULT_SIP_DOMAIN", DEFAULT_SIP_SERVER)
DEFAULT_SIP_TRANSPORT = os.getenv("DEFAULT_SIP_TRANSPORT", "UDP")
DEFAULT_SIP_OUTBOUND_PROXY = os.getenv("DEFAULT_SIP_OUTBOUND_PROXY", "")
DEFAULT_SIP_STUN = os.getenv("DEFAULT_SIP_STUN", "")

626
bot/main.py Normal file
View File

@@ -0,0 +1,626 @@
import asyncio
import logging
import secrets
import string
from aiogram import Bot, Dispatcher, F
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.filters import Command, StateFilter
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.fsm.storage.memory import MemoryStorage
from config import (
TELEGRAM_BOT_TOKEN, ADMIN_TELEGRAM_IDS,
DEFAULT_SIP_SERVER, DEFAULT_SIP_PORT,
DEFAULT_SIP_DOMAIN, DEFAULT_SIP_TRANSPORT,
DEFAULT_SIP_OUTBOUND_PROXY, DEFAULT_SIP_STUN
)
from services.database import Database
from services.mikopbx_service import MikoPBXService
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
bot = Bot(token=TELEGRAM_BOT_TOKEN)
dp = Dispatcher(storage=MemoryStorage())
db = Database()
mikopbx = MikoPBXService()
class RegistrationStates(StatesGroup):
waiting_for_number = State()
waiting_for_username = State()
waiting_for_custom_secret = State()
class AdminEditStates(StatesGroup):
waiting_for_new_number = State()
waiting_for_new_secret = State()
waiting_for_existing_number = State()
waiting_for_telegram_id = State()
def is_admin(telegram_id: int) -> bool:
return telegram_id in ADMIN_TELEGRAM_IDS
def generate_secret(length: int = 16) -> str:
alphabet = string.ascii_letters + string.digits
return ''.join(secrets.choice(alphabet) for _ in range(length))
# ============== KEYBOARDS ==============
def main_menu_keyboard(is_admin_user: bool = False):
buttons = [
[InlineKeyboardButton(text="Register new number", callback_data="register")],
[InlineKeyboardButton(text="My accounts", callback_data="my_accounts")],
]
if is_admin_user:
buttons.append([InlineKeyboardButton(text="Admin panel", callback_data="admin_menu")])
return InlineKeyboardMarkup(inline_keyboard=buttons)
def admin_menu_keyboard():
return InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="All registered accounts", callback_data="admin_list")],
[InlineKeyboardButton(text="Link existing MikoPBX number", callback_data="admin_link_existing")],
[InlineKeyboardButton(text="Back to main menu", callback_data="back_main")]
])
def account_list_keyboard(accounts: list, prefix: str = "account"):
buttons = []
for acc in accounts[:10]: # Limit to 10
text = f"{acc['extension_number']}"
if acc.get('telegram_username'):
text += f" (@{acc['telegram_username']})"
buttons.append([InlineKeyboardButton(
text=text,
callback_data=f"{prefix}_{acc['extension_number']}"
)])
buttons.append([InlineKeyboardButton(text="Back", callback_data="back_main")])
return InlineKeyboardMarkup(inline_keyboard=buttons)
# ============== HANDLERS ==============
@dp.message(Command("start"))
async def cmd_start(message: Message):
await db.init_db()
user = await db.add_user(
message.from_user.id,
message.from_user.username,
message.from_user.full_name
)
is_admin_user = is_admin(message.from_user.id)
keyboard = main_menu_keyboard(is_admin_user)
await message.answer(
"Welcome to public SIP telephony service.\n\n"
"Here you can register your own SIP number and get connection parameters.\n\n"
"Please select an option:",
reply_markup=keyboard
)
@dp.callback_query(F.data == "register")
async def start_registration(callback: CallbackQuery, state: FSMContext):
await callback.message.edit_text(
"Enter the desired extension number (digits only, e.g. 1001):\n"
"Or send /cancel to abort."
)
await state.set_state(RegistrationStates.waiting_for_number)
await callback.answer()
@dp.message(RegistrationStates.waiting_for_number)
async def process_number(message: Message, state: FSMContext):
number = message.text.strip()
# Strict validation: exactly 7 digits
if not number.isdigit() or len(number) != 7:
await message.answer("The number must be exactly 7 digits (e.g. 1001001).")
return
# Check if number already exists in our DB
existing = await db.get_sip_account_by_number(number)
if existing:
await message.answer("This number is already taken. Please choose another one.")
return
# Check if this user already has an account (one user = one number)
user_accounts = await db.get_user_sip_accounts(message.from_user.id)
if user_accounts:
await message.answer(
"You already have a registered number.\n"
"Each user can have only one number."
)
await state.clear()
is_admin_user = is_admin(message.from_user.id)
await message.answer("Main menu:", reply_markup=main_menu_keyboard(is_admin_user))
return
await state.update_data(desired_number=number)
await message.answer("Enter your name (optional, for the account):")
await state.set_state(RegistrationStates.waiting_for_username)
@dp.message(RegistrationStates.waiting_for_username)
async def process_username(message: Message, state: FSMContext):
data = await state.get_data()
number = data["desired_number"]
username = message.text.strip() if message.text.strip() else f"User {number}"
await state.update_data(username=username)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Generate random password", callback_data="gen_secret")],
[InlineKeyboardButton(text="Enter my own password", callback_data="custom_secret")]
])
await message.answer(
"Would you like to use a generated password or enter your own?",
reply_markup=keyboard
)
await state.set_state(RegistrationStates.waiting_for_custom_secret)
@dp.callback_query(F.data == "gen_secret", StateFilter(RegistrationStates.waiting_for_custom_secret))
async def use_generated_secret(callback: CallbackQuery, state: FSMContext):
data = await state.get_data()
number = data["desired_number"]
username = data.get("username", f"User {number}")
secret = generate_secret(12)
await create_account_and_reply(callback.message, state, number, username, secret, callback.from_user.id)
@dp.callback_query(F.data == "custom_secret", StateFilter(RegistrationStates.waiting_for_custom_secret))
async def ask_for_custom_secret(callback: CallbackQuery, state: FSMContext):
await callback.message.edit_text(
"Please enter the password you want to use for this SIP account (min 6 characters):"
)
await callback.answer()
@dp.message(RegistrationStates.waiting_for_custom_secret)
async def process_custom_secret(message: Message, state: FSMContext):
secret = message.text.strip()
if len(secret) < 6:
await message.answer("Password must be at least 6 characters long. Please try again.")
return
data = await state.get_data()
number = data["desired_number"]
username = data.get("username", f"User {number}")
await create_account_and_reply(message, state, number, username, secret, message.from_user.id)
async def create_account_and_reply(message_obj, state, number: str, username: str, secret: str, telegram_id: int):
await message_obj.answer("Creating your SIP account with the chosen password... Please wait.")
# Create via MikoPBX with custom secret
result = await mikopbx.create_extension(
number=number,
secret=secret,
username=username
)
if result.get("success"):
# Save to our database
user = await db.get_user(telegram_id)
if user:
await db.add_sip_account(
user_id=user["id"],
extension_number=number,
sip_secret=secret,
username=username
)
connection_info = (
f"Your SIP account has been created!\n\n"
f"Number: {number}\n"
f"Secret (password): {secret}\n\n"
f"--- Full connection parameters ---\n"
f"Domain: {DEFAULT_SIP_DOMAIN}\n"
f"Server / Outbound Proxy: {DEFAULT_SIP_SERVER}\n"
f"Port: {DEFAULT_SIP_PORT}\n"
f"Transport: {DEFAULT_SIP_TRANSPORT}\n"
f"Username / Auth ID: {number}\n"
f"Password: {secret}\n"
)
if DEFAULT_SIP_OUTBOUND_PROXY:
connection_info += f"Outbound Proxy: {DEFAULT_SIP_OUTBOUND_PROXY}\n"
if DEFAULT_SIP_STUN:
connection_info += f"STUN Server: {DEFAULT_SIP_STUN}\n"
connection_info += (
f"\nRecommended clients: Zoiper, MicroSIP, Linphone, Bria\n"
f"Use these credentials in your SIP client."
)
await message_obj.answer(connection_info)
else:
error_msg = result.get("error", "Unknown error")
await message_obj.answer(f"Failed to create account: {error_msg}\n\n"
f"Please try again later or contact administrator.")
await state.clear()
# Show main menu again
is_admin_user = is_admin(telegram_id)
keyboard = main_menu_keyboard(is_admin_user)
await message_obj.answer("What would you like to do next?", reply_markup=keyboard)
@dp.callback_query(F.data == "my_accounts")
async def show_my_accounts(callback: CallbackQuery):
accounts = await db.get_user_sip_accounts(callback.from_user.id)
if not accounts:
await callback.message.edit_text(
"You don't have any registered accounts yet.",
reply_markup=main_menu_keyboard(is_admin(callback.from_user.id))
)
else:
text = "Your SIP accounts:\n\n"
for acc in accounts:
text += f"Number: {acc['extension_number']}\n"
if acc['sip_secret']:
text += f"Password: {acc['sip_secret']}\n"
text += f"Created: {acc['created_at']}\n\n"
await callback.message.edit_text(
text,
reply_markup=main_menu_keyboard(is_admin(callback.from_user.id))
)
await callback.answer()
# ============== ADMIN HANDLERS ==============
@dp.callback_query(F.data == "admin_menu")
async def show_admin_menu(callback: CallbackQuery):
if not is_admin(callback.from_user.id):
await callback.answer("Access denied.")
return
await callback.message.edit_text(
"Administrator panel\n\n"
"Select an action:",
reply_markup=admin_menu_keyboard()
)
await callback.answer()
# ============== LINK EXISTING MIKOPBX ACCOUNT ==============
@dp.callback_query(F.data == "admin_link_existing")
async def admin_start_link_existing(callback: CallbackQuery, state: FSMContext):
if not is_admin(callback.from_user.id):
await callback.answer("Access denied.")
return
await callback.message.edit_text(
"Enter the existing MikoPBX extension number (exactly 7 digits) that you want to link to a Telegram user:"
)
await state.set_state(AdminEditStates.waiting_for_existing_number)
await callback.answer()
@dp.message(AdminEditStates.waiting_for_existing_number)
async def admin_process_existing_number(message: Message, state: FSMContext):
if not is_admin(message.from_user.id):
await message.answer("Access denied.")
await state.clear()
return
number = message.text.strip()
if not number.isdigit() or len(number) != 7:
await message.answer("Number must be exactly 7 digits.")
return
# Check if already linked in our DB
existing = await db.get_sip_account_by_number(number)
if existing:
await message.answer(
f"Number {number} is already linked to Telegram ID {existing.get('telegram_id', 'unknown')}."
)
await state.clear()
return
await state.update_data(existing_number=number)
await message.answer(
"Enter the Telegram user ID to link this number to (or forward a message from the user):"
)
await state.set_state(AdminEditStates.waiting_for_telegram_id)
@dp.message(AdminEditStates.waiting_for_telegram_id)
async def admin_process_telegram_id(message: Message, state: FSMContext):
if not is_admin(message.from_user.id):
await message.answer("Access denied.")
await state.clear()
return
data = await state.get_data()
number = data["existing_number"]
try:
telegram_id = int(message.text.strip())
except ValueError:
await message.answer("Please enter a valid numeric Telegram ID.")
return
# Check if this Telegram user already has an account
user_accounts = await db.get_user_sip_accounts(telegram_id)
if user_accounts:
await message.answer(
f"Telegram user {telegram_id} already has a registered number."
)
await state.clear()
return
# Add user to database if not exists
user_id = await db.add_user(telegram_id)
# Link the existing number (secret will be empty - admin can set it later)
await db.add_sip_account(
user_id=user_id,
extension_number=number,
sip_secret=None, # Admin can set password later
username=f"Linked user {telegram_id}"
)
await message.answer(
f"Successfully linked existing MikoPBX number {number} to Telegram user {telegram_id}.\n\n"
f"The user can now view their account via the bot.\n"
f"You can set the password using 'Change password' in admin panel."
)
await state.clear()
# Return to admin menu
await message.answer(
"Administrator panel",
reply_markup=admin_menu_keyboard()
)
@dp.callback_query(F.data == "admin_list")
async def admin_list_accounts(callback: CallbackQuery):
if not is_admin(callback.from_user.id):
await callback.answer("Access denied.")
return
accounts = await db.get_all_sip_accounts()
if not accounts:
await callback.message.edit_text(
"No accounts registered yet.",
reply_markup=admin_menu_keyboard()
)
else:
await callback.message.edit_text(
"Registered accounts (click to view details):",
reply_markup=account_list_keyboard(accounts, prefix="admin_view")
)
await callback.answer()
@dp.callback_query(F.data.startswith("admin_view_"))
async def admin_view_account(callback: CallbackQuery):
if not is_admin(callback.from_user.id):
await callback.answer("Access denied.")
return
number = callback.data.replace("admin_view_", "")
account = await db.get_sip_account_by_number(number)
if not account:
await callback.answer("Account not found.")
return
text = (
f"Account details:\n\n"
f"Number: {account['extension_number']}\n"
f"Password: {account['sip_secret'] or 'Not set'}\n"
f"Username: {account.get('username', 'N/A')}\n"
f"Owner Telegram ID: {account.get('telegram_id', 'N/A')}\n"
f"Created: {account['created_at']}\n\n"
f"Actions:"
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Change number", callback_data=f"admin_change_number_{number}")],
[InlineKeyboardButton(text="Change password", callback_data=f"admin_change_secret_{number}")],
[InlineKeyboardButton(text="Delete account", callback_data=f"admin_delete_{number}")],
[InlineKeyboardButton(text="Back to list", callback_data="admin_list")]
])
await callback.message.edit_text(text, reply_markup=keyboard)
await callback.answer()
@dp.callback_query(F.data.startswith("admin_change_number_"))
async def admin_start_change_number(callback: CallbackQuery, state: FSMContext):
if not is_admin(callback.from_user.id):
await callback.answer("Access denied.")
return
number = callback.data.replace("admin_change_number_", "")
await state.update_data(old_number=number)
await callback.message.edit_text(
f"Enter new number for extension {number}:"
)
await state.set_state(AdminEditStates.waiting_for_new_number)
await callback.answer()
@dp.message(AdminEditStates.waiting_for_new_number)
async def admin_process_new_number(message: Message, state: FSMContext):
if not is_admin(message.from_user.id):
await message.answer("Access denied.")
await state.clear()
return
new_number = message.text.strip()
data = await state.get_data()
old_number = data["old_number"]
if not new_number.isdigit() or len(new_number) < 3:
await message.answer("Please enter a valid number (at least 3 digits).")
return
# Check if new number already exists
existing = await db.get_sip_account_by_number(new_number)
if existing:
await message.answer("This number is already taken.")
return
account = await db.get_sip_account_by_number(old_number)
if not account:
await message.answer("Account not found in local database.")
await state.clear()
return
await message.answer("Changing number in MikoPBX and local database...")
# Try to change number in MikoPBX
mikopbx_result = await mikopbx.change_extension_number(old_number, new_number)
if mikopbx_result.get("success"):
# Update local database
await db.update_extension_number(old_number, new_number)
await message.answer(
f"Number successfully changed from {old_number} to {new_number} in both MikoPBX and database."
)
else:
# Fallback: update only local DB + inform admin
await db.update_extension_number(old_number, new_number)
error = mikopbx_result.get("error", "Unknown error")
await message.answer(
f"Number changed only in local database.\n"
f"MikoPBX sync failed: {error}\n\n"
f"You may need to manually adjust the number in the MikoPBX web interface."
)
await state.clear()
# Return to admin menu
await message.answer(
"Administrator panel",
reply_markup=admin_menu_keyboard()
)
@dp.callback_query(F.data.startswith("admin_delete_"))
async def admin_delete_account(callback: CallbackQuery):
if not is_admin(callback.from_user.id):
await callback.answer("Access denied.")
return
number = callback.data.replace("admin_delete_", "")
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Yes, delete", callback_data=f"confirm_delete_{number}")],
[InlineKeyboardButton(text="Cancel", callback_data="admin_list")]
])
await callback.message.edit_text(
f"Are you sure you want to delete extension {number}?\n"
f"This will remove it from both the database and MikoPBX.",
reply_markup=keyboard
)
await callback.answer()
@dp.callback_query(F.data.startswith("confirm_delete_"))
async def admin_confirm_delete(callback: CallbackQuery):
if not is_admin(callback.from_user.id):
await callback.answer("Access denied.")
return
number = callback.data.replace("confirm_delete_", "")
await callback.message.edit_text("Deleting account...")
# Delete from MikoPBX
mikopbx_result = await mikopbx.delete_extension(number)
# Delete from local DB
db_success = await db.delete_sip_account(number)
if mikopbx_result.get("success") or db_success:
await callback.message.edit_text(
f"Extension {number} has been deleted.",
reply_markup=admin_menu_keyboard()
)
else:
await callback.message.edit_text(
f"Failed to delete {number}.",
reply_markup=admin_menu_keyboard()
)
await callback.answer()
@dp.callback_query(F.data.startswith("admin_change_secret_"))
async def admin_start_change_secret(callback: CallbackQuery, state: FSMContext):
if not is_admin(callback.from_user.id):
await callback.answer("Access denied.")
return
number = callback.data.replace("admin_change_secret_", "")
await state.update_data(edit_number=number)
await callback.message.edit_text(
f"Enter new password (secret) for extension {number}:"
)
await state.set_state(AdminEditStates.waiting_for_new_secret)
await callback.answer()
@dp.message(AdminEditStates.waiting_for_new_secret)
async def admin_process_new_secret(message: Message, state: FSMContext):
if not is_admin(message.from_user.id):
await message.answer("Access denied.")
await state.clear()
return
new_secret = message.text.strip()
if len(new_secret) < 6:
await message.answer("Password must be at least 6 characters.")
return
data = await state.get_data()
number = data["edit_number"]
await message.answer("Updating password in MikoPBX and database...")
# Update in MikoPBX
mikopbx_result = await mikopbx.set_extension_secret(number, new_secret)
# Update local database regardless
db_success = await db.update_sip_secret(number, new_secret)
if mikopbx_result.get("success") and db_success:
await message.answer(f"Password for {number} successfully updated in both MikoPBX and database.")
elif db_success:
await message.answer(
f"Password updated in local database.\n"
f"MikoPBX sync failed: {mikopbx_result.get('error', 'Unknown error')}"
)
else:
await message.answer("Failed to update password.")
await state.clear()
await message.answer(
"Administrator panel",
reply_markup=admin_menu_keyboard()
)
@dp.callback_query(F.data == "back_main")
async def back_to_main(callback: CallbackQuery):
is_admin_user = is_admin(callback.from_user.id)
keyboard = main_menu_keyboard(is_admin_user)
await callback.message.edit_text(
"Main menu:",
reply_markup=keyboard
)
await callback.answer()
@dp.message(Command("cancel"))
async def cancel_handler(message: Message, state: FSMContext):
current_state = await state.get_state()
if current_state is not None:
await state.clear()
await message.answer("Operation cancelled.")
async def main():
await db.init_db()
logger.info("Starting MikoPBX Telegram Bot...")
await dp.start_polling(bot)
if __name__ == "__main__":
asyncio.run(main())

4
bot/requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
aiogram==3.7.0
aiohttp==3.9.5
aiosqlite==0.19.0
python-dotenv==1.0.1

176
bot/services/database.py Normal file
View File

@@ -0,0 +1,176 @@
import aiosqlite
import logging
from typing import List, Dict, Optional
from config import DB_PATH
logger = logging.getLogger(__name__)
class Database:
def __init__(self):
self.db_path = DB_PATH
async def init_db(self):
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
telegram_id INTEGER UNIQUE NOT NULL,
telegram_username TEXT,
full_name TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS sip_accounts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
extension_number TEXT UNIQUE NOT NULL,
sip_secret TEXT,
username TEXT,
email TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
await db.commit()
logger.info("Database initialized")
async def add_user(self, telegram_id: int, username: str = None, full_name: str = None) -> int:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"INSERT OR IGNORE INTO users (telegram_id, telegram_username, full_name) VALUES (?, ?, ?)",
(telegram_id, username, full_name)
)
await db.commit()
if cursor.lastrowid == 0:
# User already exists, get their id
cursor = await db.execute(
"SELECT id FROM users WHERE telegram_id = ?",
(telegram_id,)
)
row = await cursor.fetchone()
return row[0] if row else None
return cursor.lastrowid
async def get_user(self, telegram_id: int) -> Optional[Dict]:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT * FROM users WHERE telegram_id = ?",
(telegram_id,)
)
row = await cursor.fetchone()
if row:
return {
"id": row[0],
"telegram_id": row[1],
"telegram_username": row[2],
"full_name": row[3],
"created_at": row[4]
}
return None
async def add_sip_account(self, user_id: int, extension_number: str, sip_secret: str = None,
username: str = None, email: str = None) -> int:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"""INSERT INTO sip_accounts
(user_id, extension_number, sip_secret, username, email)
VALUES (?, ?, ?, ?, ?)""",
(user_id, extension_number, sip_secret, username, email)
)
await db.commit()
return cursor.lastrowid
async def get_user_sip_accounts(self, telegram_id: int) -> List[Dict]:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("""
SELECT sa.* FROM sip_accounts sa
JOIN users u ON sa.user_id = u.id
WHERE u.telegram_id = ?
ORDER BY sa.created_at DESC
""", (telegram_id,))
rows = await cursor.fetchall()
return [
{
"id": row[0],
"user_id": row[1],
"extension_number": row[2],
"sip_secret": row[3],
"username": row[4],
"email": row[5],
"created_at": row[6]
}
for row in rows
]
async def get_all_sip_accounts(self) -> List[Dict]:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute("""
SELECT sa.*, u.telegram_id, u.telegram_username, u.full_name
FROM sip_accounts sa
JOIN users u ON sa.user_id = u.id
ORDER BY sa.created_at DESC
""")
rows = await cursor.fetchall()
return [
{
"id": row[0],
"user_id": row[1],
"extension_number": row[2],
"sip_secret": row[3],
"username": row[4],
"email": row[5],
"created_at": row[6],
"telegram_id": row[7],
"telegram_username": row[8],
"full_name": row[9]
}
for row in rows
]
async def update_sip_secret(self, extension_number: str, new_secret: str) -> bool:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"UPDATE sip_accounts SET sip_secret = ? WHERE extension_number = ?",
(new_secret, extension_number)
)
await db.commit()
return cursor.rowcount > 0
async def update_extension_number(self, old_number: str, new_number: str) -> bool:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"UPDATE sip_accounts SET extension_number = ? WHERE extension_number = ?",
(new_number, old_number)
)
await db.commit()
return cursor.rowcount > 0
async def delete_sip_account(self, extension_number: str) -> bool:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"DELETE FROM sip_accounts WHERE extension_number = ?",
(extension_number,)
)
await db.commit()
return cursor.rowcount > 0
async def get_sip_account_by_number(self, number: str) -> Optional[Dict]:
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT * FROM sip_accounts WHERE extension_number = ?",
(number,)
)
row = await cursor.fetchone()
if row:
return {
"id": row[0],
"user_id": row[1],
"extension_number": row[2],
"sip_secret": row[3],
"username": row[4],
"email": row[5],
"created_at": row[6]
}
return None

View File

@@ -0,0 +1,268 @@
import aiohttp
import logging
from typing import Optional, Dict, Any
from config import MIKOPBX_HOST, MIKOPBX_ADMIN_LOGIN, MIKOPBX_ADMIN_PASSWORD
logger = logging.getLogger(__name__)
class MikoPBXService:
def __init__(self):
self.host = MIKOPBX_HOST.rstrip('/')
self.login = MIKOPBX_ADMIN_LOGIN
self.password = MIKOPBX_ADMIN_PASSWORD
self.session = None
self.auth_cookies = None
async def _get_session(self):
if self.session is None:
self.session = aiohttp.ClientSession()
return self.session
async def login(self) -> bool:
"""Login to MikoPBX admin panel to get session cookie"""
session = await self._get_session()
try:
url = f"{self.host}/admin-cabinet/session/start"
data = {
"login": self.login,
"password": self.password
}
async with session.post(url, data=data) as resp:
if resp.status == 200:
self.auth_cookies = resp.cookies
logger.info("Successfully logged into MikoPBX")
return True
else:
logger.error(f"Login failed: {resp.status}")
return False
except Exception as e:
logger.error(f"Login error: {e}")
return False
async def _make_request(self, method: str, endpoint: str, data: Optional[Dict] = None, params: Optional[Dict] = None):
"""Make authenticated request to MikoPBX API"""
session = await self._get_session()
if not self.auth_cookies:
await self.login()
url = f"{self.host}{endpoint}"
headers = {
"X-Requested-With": "XMLHttpRequest",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"
}
cookies = {}
if self.auth_cookies:
for key, cookie in self.auth_cookies.items():
cookies[key] = cookie.value
try:
if method.upper() == "GET":
async with session.get(url, params=params, cookies=cookies, headers=headers) as resp:
return await resp.json()
else:
async with session.post(url, data=data, cookies=cookies, headers=headers) as resp:
return await resp.json()
except Exception as e:
logger.error(f"API request error: {e}")
return {"result": False, "messages": [str(e)]}
async def get_extension_template(self) -> Optional[Dict]:
"""Get empty template for new extension"""
response = await self._make_request(
"GET",
"/pbxcore/api/extensions/getRecord",
params={"id": ""}
)
if response.get("result"):
return response.get("data")
return None
async def create_extension(self, number: str, secret: str = None, username: str = "", email: str = "") -> Dict[str, Any]:
"""Create new SIP extension with optional custom secret"""
template = await self.get_extension_template()
if not template:
return {"success": False, "error": "Failed to get template"}
# Fill template with our data
template["number"] = number
if username:
template["user_username"] = username
if email:
template["user_email"] = email
# Ensure SIP type and basic settings
template["type"] = "SIP"
template["show_in_phonebook"] = "1"
template["is_general_user_number"] = "1"
# Set custom secret if provided
if secret:
template["sip_secret"] = secret
response = await self._make_request(
"POST",
"/pbxcore/api/extensions/saveRecord",
data=template
)
if response.get("result"):
created_data = response.get("data", {})
return {
"success": True,
"number": number,
"mikopbx_id": created_data.get("id") or created_data.get("sip_uniqid"),
"data": created_data
}
else:
return {
"success": False,
"error": response.get("messages", ["Unknown error"])
}
async def update_extension(self, extension_id: str, data: Dict) -> Dict[str, Any]:
"""Update existing extension"""
# First get current record
current = await self._make_request(
"GET",
"/pbxcore/api/extensions/getRecord",
params={"id": extension_id}
)
if not current.get("result"):
return {"success": False, "error": "Extension not found"}
record = current.get("data", {})
record.update(data)
response = await self._make_request(
"POST",
"/pbxcore/api/extensions/saveRecord",
data=record
)
return {
"success": response.get("result", False),
"messages": response.get("messages", [])
}
async def change_extension_number(self, old_number: str, new_number: str) -> Dict[str, Any]:
"""Change extension number (delete old + create new with same secret)"""
# Get current record by number (we'll search via getList if available, else use template approach)
# For simplicity we get template and update the number
template = await self.get_extension_template()
if not template:
return {"success": False, "error": "Failed to get template"}
# We need the existing record. Since there's no direct "get by number" we use a workaround:
# Try to find by attempting getRecord with number as ID (MikoPBX sometimes accepts it)
current = await self._make_request(
"GET",
"/pbxcore/api/extensions/getRecord",
params={"id": old_number}
)
if current.get("result"):
record = current.get("data", {})
old_secret = record.get("sip_secret", "")
# Delete old extension first
delete_response = await self._make_request(
"POST",
"/pbxcore/api/extensions/deleteRecord",
data={"id": record.get("id", old_number)}
)
# Create new one with same settings but new number
record["number"] = new_number
record["id"] = "" # new record
record.pop("sip_uniqid", None)
create_response = await self._make_request(
"POST",
"/pbxcore/api/extensions/saveRecord",
data=record
)
if create_response.get("result"):
return {
"success": True,
"old_number": old_number,
"new_number": new_number,
"secret": old_secret
}
return {"success": False, "error": "Could not change number"}
async def set_extension_secret(self, number: str, new_secret: str) -> Dict[str, Any]:
"""Update secret for existing extension"""
current = await self._make_request(
"GET",
"/pbxcore/api/extensions/getRecord",
params={"id": number}
)
if not current.get("result"):
return {"success": False, "error": "Extension not found"}
record = current.get("data", {})
record["sip_secret"] = new_secret
response = await self._make_request(
"POST",
"/pbxcore/api/extensions/saveRecord",
data=record
)
return {
"success": response.get("result", False),
"messages": response.get("messages", [])
}
async def get_all_extensions(self) -> list:
"""Get list of all extensions (if supported by MikoPBX)"""
# Note: MikoPBX may have /pbxcore/api/extensions/getList endpoint
response = await self._make_request(
"GET",
"/pbxcore/api/extensions/getList"
)
if response.get("result"):
return response.get("data", [])
return []
async def delete_extension(self, number: str) -> Dict[str, Any]:
"""Delete extension"""
current = await self._make_request(
"GET",
"/pbxcore/api/extensions/getRecord",
params={"id": number}
)
if not current.get("result"):
return {"success": False, "error": "Extension not found"}
record = current.get("data", {})
record_id = record.get("id") or number
response = await self._make_request(
"POST",
"/pbxcore/api/extensions/deleteRecord",
data={"id": record_id}
)
return {
"success": response.get("result", False),
"messages": response.get("messages", [])
}
async def get_extension_by_number(self, number: str) -> Optional[Dict]:
"""Try to find extension by number"""
# This is a simplified approach - in real scenario we would use getList
# For now return None, or implement getList
return None
async def close(self):
if self.session:
await self.session.close()

26
docker-compose.yml Normal file
View File

@@ -0,0 +1,26 @@
version: '3.8'
services:
mikopbx-bot:
build: ./bot
container_name: mikopbx-bot
restart: unless-stopped
env_file:
- .env
volumes:
- ./bot/data:/app/data
depends_on:
- redis
networks:
- bot-net
redis:
image: redis:7-alpine
container_name: mikopbx-redis
restart: unless-stopped
networks:
- bot-net
networks:
bot-net:
driver: bridge