import aiohttp import logging from typing import Optional, Dict, Any from config import MIKOPBX_HOST, MIKOPBX_ADMIN_LOGIN, MIKOPBX_ADMIN_PASSWORD logger = logging.getLogger(__name__) class MikoPBXService: def __init__(self): self.host = MIKOPBX_HOST.rstrip('/') self.admin_login = MIKOPBX_ADMIN_LOGIN self.admin_password = MIKOPBX_ADMIN_PASSWORD self.session = None self.auth_cookies = None async def _get_session(self): if self.session is None: self.session = aiohttp.ClientSession() return self.session async def login(self) -> bool: """Login to MikoPBX admin panel to get session cookie""" session = await self._get_session() try: url = f"{self.host}/admin-cabinet/session/start" data = { "login": self.admin_login, "password": self.admin_password } async with session.post(url, data=data) as resp: if resp.status == 200: self.auth_cookies = resp.cookies logger.info("Successfully logged into MikoPBX") return True else: logger.error(f"Login failed: {resp.status}") return False except Exception as e: logger.error(f"Login error: {e}") return False async def _make_request(self, method: str, endpoint: str, data: Optional[Dict] = None, params: Optional[Dict] = None): """Make authenticated request to MikoPBX API""" session = await self._get_session() if not self.auth_cookies: await self.login() url = f"{self.host}{endpoint}" headers = { "X-Requested-With": "XMLHttpRequest", "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8" } cookies = {} if self.auth_cookies: for key, cookie in self.auth_cookies.items(): cookies[key] = cookie.value try: if method.upper() == "GET": async with session.get(url, params=params, cookies=cookies, headers=headers) as resp: return await resp.json() else: async with session.post(url, data=data, cookies=cookies, headers=headers) as resp: return await resp.json() except Exception as e: logger.error(f"API request error: {e}") return {"result": False, "messages": [str(e)]} async def get_extension_template(self) -> Optional[Dict]: """Get empty template for new extension""" response = await self._make_request( "GET", "/pbxcore/api/extensions/getRecord", params={"id": ""} ) if response.get("result"): return response.get("data") return None async def create_extension(self, number: str, secret: str = None, username: str = "", email: str = "") -> Dict[str, Any]: """Create new SIP extension with optional custom secret""" template = await self.get_extension_template() if not template: return {"success": False, "error": "Failed to get template"} # Fill template with our data template["number"] = number if username: template["user_username"] = username if email: template["user_email"] = email # Ensure SIP type and basic settings template["type"] = "SIP" template["show_in_phonebook"] = "1" template["is_general_user_number"] = "1" # Set custom secret if provided if secret: template["sip_secret"] = secret response = await self._make_request( "POST", "/pbxcore/api/extensions/saveRecord", data=template ) if response.get("result"): created_data = response.get("data", {}) return { "success": True, "number": number, "mikopbx_id": created_data.get("id") or created_data.get("sip_uniqid"), "data": created_data } else: return { "success": False, "error": response.get("messages", ["Unknown error"]) } async def update_extension(self, extension_id: str, data: Dict) -> Dict[str, Any]: """Update existing extension""" # First get current record current = await self._make_request( "GET", "/pbxcore/api/extensions/getRecord", params={"id": extension_id} ) if not current.get("result"): return {"success": False, "error": "Extension not found"} record = current.get("data", {}) record.update(data) response = await self._make_request( "POST", "/pbxcore/api/extensions/saveRecord", data=record ) return { "success": response.get("result", False), "messages": response.get("messages", []) } async def change_extension_number(self, old_number: str, new_number: str) -> Dict[str, Any]: """Change extension number (delete old + create new with same secret)""" # Get current record by number (we'll search via getList if available, else use template approach) # For simplicity we get template and update the number template = await self.get_extension_template() if not template: return {"success": False, "error": "Failed to get template"} # We need the existing record. Since there's no direct "get by number" we use a workaround: # Try to find by attempting getRecord with number as ID (MikoPBX sometimes accepts it) current = await self._make_request( "GET", "/pbxcore/api/extensions/getRecord", params={"id": old_number} ) if current.get("result"): record = current.get("data", {}) old_secret = record.get("sip_secret", "") # Delete old extension first delete_response = await self._make_request( "POST", "/pbxcore/api/extensions/deleteRecord", data={"id": record.get("id", old_number)} ) # Create new one with same settings but new number record["number"] = new_number record["id"] = "" # new record record.pop("sip_uniqid", None) create_response = await self._make_request( "POST", "/pbxcore/api/extensions/saveRecord", data=record ) if create_response.get("result"): return { "success": True, "old_number": old_number, "new_number": new_number, "secret": old_secret } return {"success": False, "error": "Could not change number"} async def set_extension_secret(self, number: str, new_secret: str) -> Dict[str, Any]: """Update secret for existing extension""" current = await self._make_request( "GET", "/pbxcore/api/extensions/getRecord", params={"id": number} ) if not current.get("result"): return {"success": False, "error": "Extension not found"} record = current.get("data", {}) record["sip_secret"] = new_secret response = await self._make_request( "POST", "/pbxcore/api/extensions/saveRecord", data=record ) return { "success": response.get("result", False), "messages": response.get("messages", []) } async def get_all_extensions(self) -> list: """Get list of all extensions (if supported by MikoPBX)""" # Note: MikoPBX may have /pbxcore/api/extensions/getList endpoint response = await self._make_request( "GET", "/pbxcore/api/extensions/getList" ) if response.get("result"): return response.get("data", []) return [] async def delete_extension(self, number: str) -> Dict[str, Any]: """Delete extension""" current = await self._make_request( "GET", "/pbxcore/api/extensions/getRecord", params={"id": number} ) if not current.get("result"): return {"success": False, "error": "Extension not found"} record = current.get("data", {}) record_id = record.get("id") or number response = await self._make_request( "POST", "/pbxcore/api/extensions/deleteRecord", data={"id": record_id} ) return { "success": response.get("result", False), "messages": response.get("messages", []) } async def get_extension_by_number(self, number: str) -> Optional[Dict]: """Try to find extension by number""" # This is a simplified approach - in real scenario we would use getList # For now return None, or implement getList return None async def close(self): if self.session: await self.session.close()