Files
mikopbxbot/bot/services/mikopbx_service.py
2026-06-24 17:09:24 +03:00

216 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import aiohttp
import logging
from typing import Optional, Dict, Any
from config import (
MIKOPBX_HOST,
MIKOPBX_API_TOKEN,
MIKOPBX_ADMIN_LOGIN,
MIKOPBX_ADMIN_PASSWORD
)
logger = logging.getLogger(__name__)
class MikoPBXService:
def __init__(self):
self.host = MIKOPBX_HOST.rstrip('/')
self.api_token = MIKOPBX_API_TOKEN
self.admin_login = MIKOPBX_ADMIN_LOGIN
self.admin_password = MIKOPBX_ADMIN_PASSWORD
self.session = None
async def _get_session(self):
if self.session is None:
self.session = aiohttp.ClientSession()
return self.session
def _get_auth_headers(self) -> Dict[str, str]:
"""Возвращает заголовки авторизации (приоритет — Bearer Token)"""
headers = {
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"X-Requested-With": "XMLHttpRequest"
}
if self.api_token:
headers["Authorization"] = f"Bearer {self.api_token}"
logger.debug("Using Bearer Token authentication")
elif self.admin_login and self.admin_password:
# Fallback на Basic Auth
import base64
credentials = f"{self.admin_login}:{self.admin_password}"
encoded = base64.b64encode(credentials.encode()).decode()
headers["Authorization"] = f"Basic {encoded}"
logger.debug("Using Basic Auth (fallback)")
return headers
async def _make_request(self, method: str, endpoint: str, data: Optional[Dict] = None, params: Optional[Dict] = None):
"""Выполняет запрос к MikoPBX API"""
session = await self._get_session()
url = f"{self.host}{endpoint}"
headers = self._get_auth_headers()
try:
if method.upper() == "GET":
async with session.get(url, params=params, headers=headers) as resp:
if resp.status == 401:
logger.error("Unauthorized (401). Check your API token.")
return await resp.json()
else:
async with session.post(url, data=data, headers=headers) as resp:
if resp.status == 401:
logger.error("Unauthorized (401). Check your API token.")
return await resp.json()
except Exception as e:
logger.error(f"API request error: {e}")
return {"result": False, "messages": [str(e)]}
async def get_extension_template(self) -> Optional[Dict]:
"""Получить шаблон для создания нового сотрудника"""
response = await self._make_request(
"GET",
"/pbxcore/api/extensions/getRecord",
params={"id": ""}
)
if response.get("result"):
return response.get("data")
logger.error(f"Failed to get template: {response.get('messages')}")
return None
async def create_extension(self, number: str, secret: str = None, username: str = "", email: str = "") -> Dict[str, Any]:
"""Создать нового SIP пользователя"""
template = await self.get_extension_template()
if not template:
return {"success": False, "error": "Failed to get template from MikoPBX"}
template["number"] = number
if username:
template["user_username"] = username
if email:
template["user_email"] = email
template["type"] = "SIP"
template["show_in_phonebook"] = "1"
template["is_general_user_number"] = "1"
if secret:
template["sip_secret"] = secret
response = await self._make_request(
"POST",
"/pbxcore/api/extensions/saveRecord",
data=template
)
if response.get("result"):
created_data = response.get("data", {})
return {
"success": True,
"number": number,
"mikopbx_id": created_data.get("id") or created_data.get("sip_uniqid"),
"data": created_data
}
else:
return {
"success": False,
"error": response.get("messages", ["Unknown error"])
}
async def set_extension_secret(self, number: str, new_secret: str) -> Dict[str, Any]:
"""Обновить пароль у существующего номера"""
current = await self._make_request(
"GET",
"/pbxcore/api/extensions/getRecord",
params={"id": number}
)
if not current.get("result"):
return {"success": False, "error": "Extension not found"}
record = current.get("data", {})
record["sip_secret"] = new_secret
response = await self._make_request(
"POST",
"/pbxcore/api/extensions/saveRecord",
data=record
)
return {
"success": response.get("result", False),
"messages": response.get("messages", [])
}
async def change_extension_number(self, old_number: str, new_number: str) -> Dict[str, Any]:
"""Сменить номер (удалить старый + создать новый)"""
current = await self._make_request(
"GET",
"/pbxcore/api/extensions/getRecord",
params={"id": old_number}
)
if not current.get("result"):
return {"success": False, "error": "Extension not found"}
record = current.get("data", {})
old_secret = record.get("sip_secret", "")
# Удаляем старый
delete_resp = await self._make_request(
"POST",
"/pbxcore/api/extensions/deleteRecord",
data={"id": record.get("id", old_number)}
)
if not delete_resp.get("result"):
return {"success": False, "error": "Failed to delete old extension"}
# Создаём новый
record["number"] = new_number
record["id"] = ""
record.pop("sip_uniqid", None)
create_resp = await self._make_request(
"POST",
"/pbxcore/api/extensions/saveRecord",
data=record
)
if create_resp.get("result"):
return {
"success": True,
"old_number": old_number,
"new_number": new_number,
"secret": old_secret
}
return {"success": False, "error": "Failed to create new extension"}
async def delete_extension(self, number: str) -> Dict[str, Any]:
"""Удалить номер"""
current = await self._make_request(
"GET",
"/pbxcore/api/extensions/getRecord",
params={"id": number}
)
if not current.get("result"):
return {"success": False, "error": "Extension not found"}
record = current.get("data", {})
record_id = record.get("id") or number
response = await self._make_request(
"POST",
"/pbxcore/api/extensions/deleteRecord",
data={"id": record_id}
)
return {
"success": response.get("result", False),
"messages": response.get("messages", [])
}
async def close(self):
if self.session:
await self.session.close()