171 lines
6.4 KiB
Python
171 lines
6.4 KiB
Python
import aiohttp
|
|
import logging
|
|
import json
|
|
from typing import Optional, Dict, Any
|
|
from config import (
|
|
MIKOPBX_HOST,
|
|
MIKOPBX_API_TOKEN,
|
|
MIKOPBX_ADMIN_LOGIN,
|
|
MIKOPBX_ADMIN_PASSWORD
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
class MikoPBXService:
|
|
def __init__(self):
|
|
self.host = MIKOPBX_HOST.rstrip('/')
|
|
self.api_token = MIKOPBX_API_TOKEN
|
|
self.admin_login = MIKOPBX_ADMIN_LOGIN
|
|
self.admin_password = MIKOPBX_ADMIN_PASSWORD
|
|
self.session = None
|
|
|
|
async def _get_session(self):
|
|
if self.session is None:
|
|
self.session = aiohttp.ClientSession()
|
|
return self.session
|
|
|
|
def _get_auth_headers(self) -> Dict[str, str]:
|
|
headers = {
|
|
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
|
|
"Accept": "application/json",
|
|
"X-Requested-With": "XMLHttpRequest"
|
|
}
|
|
if self.api_token:
|
|
headers["Authorization"] = f"Bearer {self.api_token}"
|
|
logger.info(">>> Using Bearer Token authentication")
|
|
elif self.admin_login and self.admin_password:
|
|
import base64
|
|
credentials = f"{self.admin_login}:{self.admin_password}"
|
|
encoded = base64.b64encode(credentials.encode()).decode()
|
|
headers["Authorization"] = f"Basic {encoded}"
|
|
logger.info(">>> Using Basic Auth (fallback)")
|
|
return headers
|
|
|
|
async def _make_request(self, method: str, endpoint: str, data: Optional[Dict] = None, params: Optional[Dict] = None):
|
|
session = await self._get_session()
|
|
url = f"{self.host}{endpoint}"
|
|
headers = self._get_auth_headers()
|
|
|
|
logger.info(f"\n{'='*70}")
|
|
logger.info(f">>> REQUEST: {method.upper()} {url}")
|
|
if params:
|
|
logger.info(f">>> PARAMS: {params}")
|
|
if data:
|
|
logger.info(f">>> BODY: {data}")
|
|
|
|
try:
|
|
if method.upper() == "GET":
|
|
async with session.get(url, params=params, headers=headers) as resp:
|
|
text = await resp.text()
|
|
logger.info(f"<<< RESPONSE [{resp.status}]:\n{text[:2500]}")
|
|
try:
|
|
return await resp.json()
|
|
except:
|
|
return {"result": False, "raw": text}
|
|
else:
|
|
async with session.post(url, data=data, headers=headers) as resp:
|
|
text = await resp.text()
|
|
logger.info(f"<<< RESPONSE [{resp.status}]:\n{text[:2500]}")
|
|
try:
|
|
return await resp.json()
|
|
except:
|
|
return {"result": False, "raw": text}
|
|
except Exception as e:
|
|
logger.error(f"!!! REQUEST ERROR: {e}")
|
|
return {"result": False, "messages": [str(e)]}
|
|
|
|
# ==================== EXTENSIONS (legacy but working) ====================
|
|
|
|
async def get_extension_template(self) -> Optional[Dict]:
|
|
"""Получаем шаблон через старый, но рабочий эндпоинт"""
|
|
logger.info(">>> Getting extension template via legacy endpoint...")
|
|
response = await self._make_request(
|
|
"GET",
|
|
"/pbxcore/api/extensions/getRecord",
|
|
params={"id": ""}
|
|
)
|
|
if response.get("result"):
|
|
logger.info(">>> Template received successfully")
|
|
return response.get("data")
|
|
logger.error(f">>> Failed to get template: {response.get('messages')}")
|
|
return None
|
|
|
|
async def create_extension(self, number: str, secret: str = None, username: str = "", email: str = "") -> Dict[str, Any]:
|
|
logger.info(f"\n>>> Creating extension {number}...")
|
|
|
|
template = await self.get_extension_template()
|
|
if not template:
|
|
return {"success": False, "error": "Failed to get template from MikoPBX"}
|
|
|
|
# Заполняем шаблон
|
|
template["number"] = number
|
|
if username:
|
|
template["user_username"] = username
|
|
if email:
|
|
template["user_email"] = email
|
|
template["type"] = "SIP"
|
|
template["show_in_phonebook"] = "1"
|
|
template["is_general_user_number"] = "1"
|
|
|
|
if secret:
|
|
template["sip_secret"] = secret
|
|
|
|
logger.info(f">>> Sending saveRecord request...")
|
|
|
|
response = await self._make_request(
|
|
"POST",
|
|
"/pbxcore/api/extensions/saveRecord",
|
|
data=template
|
|
)
|
|
|
|
if response.get("result"):
|
|
logger.info(">>> Extension created successfully!")
|
|
return {
|
|
"success": True,
|
|
"number": number,
|
|
"data": response.get("data", {})
|
|
}
|
|
else:
|
|
error = response.get("messages", ["Unknown error"])
|
|
logger.error(f">>> Creation failed: {error}")
|
|
return {
|
|
"success": False,
|
|
"error": error
|
|
}
|
|
|
|
async def get_extension(self, number: str) -> Optional[Dict]:
|
|
response = await self._make_request(
|
|
"GET",
|
|
"/pbxcore/api/extensions/getRecord",
|
|
params={"id": number}
|
|
)
|
|
return response.get("data") if response.get("result") else None
|
|
|
|
async def set_extension_secret(self, number: str, new_secret: str) -> Dict[str, Any]:
|
|
current = await self.get_extension(number)
|
|
if not current:
|
|
return {"success": False, "error": "Extension not found"}
|
|
current["sip_secret"] = new_secret
|
|
response = await self._make_request(
|
|
"POST",
|
|
"/pbxcore/api/extensions/saveRecord",
|
|
data=current
|
|
)
|
|
return {"success": response.get("result", False), "messages": response.get("messages", [])}
|
|
|
|
async def delete_extension(self, number: str) -> Dict[str, Any]:
|
|
current = await self.get_extension(number)
|
|
if not current:
|
|
return {"success": False, "error": "Extension not found"}
|
|
record_id = current.get("id") or number
|
|
response = await self._make_request(
|
|
"POST",
|
|
"/pbxcore/api/extensions/deleteRecord",
|
|
data={"id": record_id}
|
|
)
|
|
return {"success": response.get("result", False), "messages": response.get("messages", [])}
|
|
|
|
async def close(self):
|
|
if self.session:
|
|
await self.session.close() |