Files
mikopbxbot/bot/services/mikopbx_service.py
2026-06-24 17:23:37 +03:00

172 lines
6.7 KiB
Python

import aiohttp
import logging
import json
from typing import Optional, Dict, Any
from config import (
MIKOPBX_HOST,
MIKOPBX_API_TOKEN,
MIKOPBX_ADMIN_LOGIN,
MIKOPBX_ADMIN_PASSWORD
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class MikoPBXService:
def __init__(self):
self.host = MIKOPBX_HOST.rstrip('/')
self.api_token = MIKOPBX_API_TOKEN
self.admin_login = MIKOPBX_ADMIN_LOGIN
self.admin_password = MIKOPBX_ADMIN_PASSWORD
self.session = None
async def _get_session(self):
if self.session is None:
self.session = aiohttp.ClientSession()
return self.session
def _get_auth_headers(self) -> Dict[str, str]:
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
if self.api_token:
headers["Authorization"] = f"Bearer {self.api_token}"
logger.info(">>> Using Bearer Token authentication")
elif self.admin_login and self.admin_password:
import base64
credentials = f"{self.admin_login}:{self.admin_password}"
encoded = base64.b64encode(credentials.encode()).decode()
headers["Authorization"] = f"Basic {encoded}"
logger.info(">>> Using Basic Auth (fallback)")
return headers
async def _make_request(self, method: str, endpoint: str, json_data: Optional[Dict] = None, params: Optional[Dict] = None):
session = await self._get_session()
url = f"{self.host}{endpoint}"
headers = self._get_auth_headers()
logger.info(f"\n{'='*70}")
logger.info(f">>> REQUEST: {method.upper()} {url}")
if params:
logger.info(f">>> PARAMS: {params}")
if json_data:
logger.info(f">>> BODY:\n{json.dumps(json_data, indent=2, ensure_ascii=False)}")
logger.info(f">>> HEADERS: { {k: v for k, v in headers.items() if k != 'Authorization'} }")
try:
if method.upper() == "GET":
async with session.get(url, params=params, headers=headers) as resp:
text = await resp.text()
logger.info(f"<<< RESPONSE [{resp.status}]:\n{text[:3000]}")
try:
return await resp.json()
except:
return {"result": False, "raw": text}
elif method.upper() == "DELETE":
async with session.delete(url, headers=headers) as resp:
text = await resp.text()
logger.info(f"<<< RESPONSE [{resp.status}]:\n{text[:3000]}")
try:
return await resp.json()
except:
return {"result": False, "raw": text}
else:
async with session.post(url, json=json_data, headers=headers) as resp:
text = await resp.text()
logger.info(f"<<< RESPONSE [{resp.status}]:\n{text[:3000]}")
try:
return await resp.json()
except:
return {"result": False, "raw": text}
except Exception as e:
logger.error(f"!!! REQUEST ERROR: {e}")
return {"result": False, "messages": [str(e)]}
# ==================== EXTENSIONS ====================
async def get_extension_template(self) -> Optional[Dict]:
"""Получить шаблон нового сотрудника"""
logger.info(">>> Trying to get extension template...")
# Пробуем v3
response = await self._make_request(
"GET",
"/pbxcore/api/v3/extensions",
params={"id": ""}
)
data = response.get("data")
if isinstance(data, dict):
logger.info(">>> Template received as dict")
return data
elif isinstance(data, list) and len(data) > 0:
logger.info(">>> Template received as list, using first item as template")
return data[0].copy() if isinstance(data[0], dict) else None
else:
logger.warning(">>> Could not get template from API. Will use minimal structure.")
return None
async def create_extension(self, number: str, secret: str = None, username: str = "", email: str = "") -> Dict[str, Any]:
logger.info(f"\n>>> Creating extension {number}...")
template = await self.get_extension_template()
if not template:
logger.warning(">>> Using minimal fallback template")
template = {
"type": "SIP",
"show_in_phonebook": "1",
"is_general_user_number": "1",
}
# Заполняем данными
template["number"] = number
if username:
template["user_username"] = username
if email:
template["user_email"] = email
if secret:
template["sip_secret"] = secret
logger.info(f">>> Final creation payload prepared")
response = await self._make_request(
"POST",
"/pbxcore/api/v3/extensions",
json_data=template
)
if response.get("result"):
return {
"success": True,
"number": number,
"data": response.get("data", {})
}
else:
error_msg = response.get("messages") or response.get("error") or ["Unknown error"]
return {
"success": False,
"error": error_msg
}
async def get_extension(self, number: str) -> Optional[Dict]:
response = await self._make_request("GET", f"/pbxcore/api/v3/extensions/{number}")
return response.get("data") if response.get("result") else None
async def set_extension_secret(self, number: str, new_secret: str) -> Dict[str, Any]:
current = await self.get_extension(number)
if not current:
return {"success": False, "error": "Extension not found"}
current["sip_secret"] = new_secret
response = await self._make_request("PUT", f"/pbxcore/api/v3/extensions/{number}", json_data=current)
return {"success": response.get("result", False), "messages": response.get("messages", [])}
async def delete_extension(self, number: str) -> Dict[str, Any]:
response = await self._make_request("DELETE", f"/pbxcore/api/v3/extensions/{number}")
return {"success": response.get("result", False), "messages": response.get("messages", [])}
async def close(self):
if self.session:
await self.session.close()