This commit is contained in:
2026-06-24 17:16:44 +03:00
parent 90a4d02b79
commit 95eba2b071
2 changed files with 57 additions and 82 deletions

View File

@@ -24,27 +24,24 @@ class MikoPBXService:
return self.session
def _get_auth_headers(self) -> Dict[str, str]:
"""Возвращает заголовки авторизации (приоритет — Bearer Token)"""
"""Возвращает заголовки авторизации (Bearer Token в приоритете)"""
headers = {
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"X-Requested-With": "XMLHttpRequest"
"Content-Type": "application/json",
"Accept": "application/json"
}
if self.api_token:
headers["Authorization"] = f"Bearer {self.api_token}"
logger.debug("Using Bearer Token authentication")
elif self.admin_login and self.admin_password:
# Fallback на Basic Auth
import base64
credentials = f"{self.admin_login}:{self.admin_password}"
encoded = base64.b64encode(credentials.encode()).decode()
headers["Authorization"] = f"Basic {encoded}"
logger.debug("Using Basic Auth (fallback)")
return headers
async def _make_request(self, method: str, endpoint: str, data: Optional[Dict] = None, params: Optional[Dict] = None):
"""Выполняет запрос к MikoPBX API"""
async def _make_request(self, method: str, endpoint: str, json_data: Optional[Dict] = None, params: Optional[Dict] = None):
"""Выполняет запрос к MikoPBX REST API v3"""
session = await self._get_session()
url = f"{self.host}{endpoint}"
headers = self._get_auth_headers()
@@ -52,88 +49,87 @@ class MikoPBXService:
try:
if method.upper() == "GET":
async with session.get(url, params=params, headers=headers) as resp:
if resp.status == 401:
logger.error("Unauthorized (401). Check your API token.")
return await resp.json()
elif method.upper() == "DELETE":
async with session.delete(url, headers=headers) as resp:
return await resp.json()
else:
async with session.post(url, data=data, headers=headers) as resp:
if resp.status == 401:
logger.error("Unauthorized (401). Check your API token.")
async with session.post(url, json=json_data, headers=headers) as resp:
return await resp.json()
except Exception as e:
logger.error(f"API request error: {e}")
return {"result": False, "messages": [str(e)]}
# ==================== EXTENSIONS ====================
async def get_extension_template(self) -> Optional[Dict]:
"""Получить шаблон для создания нового сотрудника"""
response = await self._make_request(
"GET",
"/pbxcore/api/extensions/getRecord",
params={"id": ""}
)
"""Получить шаблон нового сотрудника (API v3)"""
response = await self._make_request("GET", "/pbxcore/api/v3/extensions")
if response.get("result"):
return response.get("data")
logger.error(f"Failed to get template: {response.get('messages')}")
return None
async def create_extension(self, number: str, secret: str = None, username: str = "", email: str = "") -> Dict[str, Any]:
"""Создать нового SIP пользователя"""
"""Создать нового SIP сотрудника (API v3)"""
template = await self.get_extension_template()
if not template:
return {"success": False, "error": "Failed to get template from MikoPBX"}
return {"success": False, "error": "Failed to get template"}
# Заполняем шаблон
template["number"] = number
if username:
template["user_username"] = username
if email:
template["user_email"] = email
template["type"] = "SIP"
template["show_in_phonebook"] = "1"
template["is_general_user_number"] = "1"
if secret:
template["sip_secret"] = secret
response = await self._make_request(
"POST",
"/pbxcore/api/extensions/saveRecord",
data=template
"/pbxcore/api/v3/extensions",
json_data=template
)
if response.get("result"):
created_data = response.get("data", {})
return {
"success": True,
"success": True,
"number": number,
"mikopbx_id": created_data.get("id") or created_data.get("sip_uniqid"),
"data": created_data
"data": response.get("data", {})
}
else:
return {
"success": False,
"success": False,
"error": response.get("messages", ["Unknown error"])
}
async def set_extension_secret(self, number: str, new_secret: str) -> Dict[str, Any]:
"""Обновить пароль у существующего номера"""
current = await self._make_request(
"GET",
"/pbxcore/api/extensions/getRecord",
params={"id": number}
async def get_extension(self, number: str) -> Optional[Dict]:
"""Получить сотрудника по номеру"""
response = await self._make_request(
"GET",
f"/pbxcore/api/v3/extensions/{number}"
)
if not current.get("result"):
if response.get("result"):
return response.get("data")
return None
async def set_extension_secret(self, number: str, new_secret: str) -> Dict[str, Any]:
"""Обновить пароль"""
current = await self.get_extension(number)
if not current:
return {"success": False, "error": "Extension not found"}
record = current.get("data", {})
record["sip_secret"] = new_secret
current["sip_secret"] = new_secret
response = await self._make_request(
"POST",
"/pbxcore/api/extensions/saveRecord",
data=record
"PUT",
f"/pbxcore/api/v3/extensions/{number}",
json_data=current
)
return {
@@ -142,38 +138,31 @@ class MikoPBXService:
}
async def change_extension_number(self, old_number: str, new_number: str) -> Dict[str, Any]:
"""Сменить номер (удалить старый + создать новый)"""
current = await self._make_request(
"GET",
"/pbxcore/api/extensions/getRecord",
params={"id": old_number}
)
if not current.get("result"):
"""Сменить номер (удалить + создать заново)"""
current = await self.get_extension(old_number)
if not current:
return {"success": False, "error": "Extension not found"}
record = current.get("data", {})
old_secret = record.get("sip_secret", "")
old_secret = current.get("sip_secret", "")
# Удаляем старый
delete_resp = await self._make_request(
"POST",
"/pbxcore/api/extensions/deleteRecord",
data={"id": record.get("id", old_number)}
"DELETE",
f"/pbxcore/api/v3/extensions/{old_number}"
)
if not delete_resp.get("result"):
return {"success": False, "error": "Failed to delete old extension"}
# Создаём новый
record["number"] = new_number
record["id"] = ""
record.pop("sip_uniqid", None)
current["number"] = new_number
current.pop("id", None)
current.pop("sip_uniqid", None)
create_resp = await self._make_request(
"POST",
"/pbxcore/api/extensions/saveRecord",
data=record
"/pbxcore/api/v3/extensions",
json_data=current
)
if create_resp.get("result"):
@@ -187,25 +176,11 @@ class MikoPBXService:
return {"success": False, "error": "Failed to create new extension"}
async def delete_extension(self, number: str) -> Dict[str, Any]:
"""Удалить номер"""
current = await self._make_request(
"GET",
"/pbxcore/api/extensions/getRecord",
params={"id": number}
)
if not current.get("result"):
return {"success": False, "error": "Extension not found"}
record = current.get("data", {})
record_id = record.get("id") or number
"""Удалить сотрудника"""
response = await self._make_request(
"POST",
"/pbxcore/api/extensions/deleteRecord",
data={"id": record_id}
"DELETE",
f"/pbxcore/api/v3/extensions/{number}"
)
return {
"success": response.get("result", False),
"messages": response.get("messages", [])