import aiohttp import logging import json from typing import Optional, Dict, Any from config import ( MIKOPBX_HOST, MIKOPBX_API_TOKEN, MIKOPBX_ADMIN_LOGIN, MIKOPBX_ADMIN_PASSWORD ) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) class MikoPBXService: def __init__(self): self.host = MIKOPBX_HOST.rstrip('/') self.api_token = MIKOPBX_API_TOKEN self.admin_login = MIKOPBX_ADMIN_LOGIN self.admin_password = MIKOPBX_ADMIN_PASSWORD self.session = None async def _get_session(self): if self.session is None: self.session = aiohttp.ClientSession() return self.session def _get_auth_headers(self) -> Dict[str, str]: headers = { "Content-Type": "application/json", "Accept": "application/json" } if self.api_token: headers["Authorization"] = f"Bearer {self.api_token}" logger.info(">>> Using Bearer Token authentication") elif self.admin_login and self.admin_password: import base64 credentials = f"{self.admin_login}:{self.admin_password}" encoded = base64.b64encode(credentials.encode()).decode() headers["Authorization"] = f"Basic {encoded}" logger.info(">>> Using Basic Auth (fallback)") return headers async def _make_request(self, method: str, endpoint: str, json_data: Optional[Dict] = None, params: Optional[Dict] = None): session = await self._get_session() url = f"{self.host}{endpoint}" headers = self._get_auth_headers() method = method.upper() logger.info(f"\n{'='*70}") logger.info(f">>> REQUEST: {method} {url}") if params: logger.info(f">>> PARAMS: {params}") if json_data: logger.info(f">>> BODY:\n{json.dumps(json_data, indent=2, ensure_ascii=False)}") try: async with session.request(method, url, params=params, json=json_data, headers=headers) as resp: text = await resp.text() logger.info(f"<<< RESPONSE [{resp.status}]:\n{text[:3000]}") try: return await resp.json() except: return {"result": False, "raw": text} except Exception as e: logger.error(f"!!! REQUEST ERROR: {e}") return {"result": False, "messages": [str(e)]} # ==================== EMPLOYEES (correct v3 endpoint) ==================== async def create_extension(self, number: str, secret: str, username: str = "", email: str = "") -> Dict[str, Any]: """ Create new employee (SIP extension) using the correct endpoint POST /pbxcore/api/v3/employees """ logger.info(f"\n>>> Creating employee/extension {number} via /v3/employees ...") # Check if number already exists in MikoPBX existing = await self.get_employee_by_number(number) if existing: return { "success": False, "error": [f"Internal number {number} is already occupied"] } payload = { "number": number, "user_username": username or f"User {number}", "sip_secret": secret, } if email: payload["user_email"] = email # Optional sensible defaults payload["sip_enableRecording"] = True payload["sip_dtmfmode"] = "auto" payload["sip_transport"] = "udp,tcp" payload["fwd_ringlength"] = 45 logger.info(f">>> Sending create employee request...") response = await self._make_request( "POST", "/pbxcore/api/v3/employees", json_data=payload ) if response.get("result"): logger.info(">>> SUCCESS: Employee created!") return { "success": True, "number": number, "data": response.get("data", {}) } else: error = response.get("messages") or response.get("error") or ["Unknown error"] logger.error(f">>> Creation failed: {error}") return { "success": False, "error": error } def _extract_employee_id(self, employee: Dict) -> Optional[str]: """Extract and validate employee ID for URL usage (v3 expects string id).""" emp_id = employee.get("id") if emp_id is None or emp_id == "": return None return str(emp_id) async def get_employee_by_number(self, number: str) -> Optional[Dict]: """Get employee record by extension number (returns full record with real 'id')""" # Get list and search by number response = await self._make_request("GET", "/pbxcore/api/v3/employees") if not response.get("result"): return None employees = response.get("data", []) for emp in employees: if isinstance(emp, dict) and emp.get("number") == number: return emp return None async def get_extension(self, number: str) -> Optional[Dict]: """Get employee by number""" return await self.get_employee_by_number(number) async def get_full_employee(self, employee_id) -> Optional[Dict]: """Get full employee record by ID (used for PUT updates)""" response = await self._make_request("GET", f"/pbxcore/api/v3/employees/{employee_id}") if response.get("result") and response.get("data"): data = response.get("data") if isinstance(data, dict): return data return None async def set_extension_secret(self, number: str, new_secret: str) -> Dict[str, Any]: current = await self.get_employee_by_number(number) if not current: return {"success": False, "error": "Extension not found"} real_id = self._extract_employee_id(current) if not real_id: return {"success": False, "error": "Employee ID is missing"} # Try to get full employee record to avoid overwriting fields full_current = await self.get_full_employee(real_id) if full_current: current = full_current current["sip_secret"] = new_secret response = await self._make_request( "PUT", f"/pbxcore/api/v3/employees/{real_id}", json_data=current ) return {"success": response.get("result", False), "messages": response.get("messages", [])} async def change_extension_number(self, old_number: str, new_number: str) -> Dict[str, Any]: """ Change employee extension number in MikoPBX. Finds the employee by old number, updates the number field and sends PUT. """ current = await self.get_employee_by_number(old_number) if not current: return {"success": False, "error": f"Extension {old_number} not found in MikoPBX"} # Check if the new number is already occupied existing_new = await self.get_employee_by_number(new_number) if existing_new: return {"success": False, "error": f"Number {new_number} is already occupied in MikoPBX"} real_id = self._extract_employee_id(current) if not real_id: return {"success": False, "error": "Employee ID is missing"} # Try to get full employee record to avoid overwriting fields full_current = await self.get_full_employee(real_id) if full_current: current = full_current current["number"] = new_number response = await self._make_request( "PUT", f"/pbxcore/api/v3/employees/{real_id}", json_data=current ) if response.get("result"): logger.info(f">>> SUCCESS: Number changed from {old_number} to {new_number}") return {"success": True, "data": response.get("data", {})} else: error = response.get("messages") or response.get("error") or ["Unknown error"] logger.error(f">>> Failed to change number: {error}") return {"success": False, "error": error} async def delete_extension(self, number: str) -> Dict[str, Any]: """Delete employee by extension number (finds real ID first)""" employee = await self.get_employee_by_number(number) if not employee: return {"success": False, "error": "Employee not found"} real_id = self._extract_employee_id(employee) if not real_id: return {"success": False, "error": "Employee ID is missing"} logger.info(f">>> Deleting employee with real ID: {real_id}") response = await self._make_request( "DELETE", f"/pbxcore/api/v3/employees/{real_id}" ) return { "success": response.get("result", False), "messages": response.get("messages", []) } async def close(self): if self.session: await self.session.close()