Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,4 @@ dmypy.json

# Pyre type checker
.pyre/
.vscode/
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ repos:
- --skip="./.*,*.csv,*.json,*.md"
- --quiet-level=2
exclude_types: [csv, json]
- repo: https://gitlab.com/pycqa/flake8
- repo: https://github.com/pycqa/flake8
rev: 3.8.4
hooks:
- id: flake8
Expand All @@ -34,6 +34,6 @@ repos:
hooks:
- id: yamllint
- repo: https://github.com/PyCQA/isort
rev: 5.5.3
rev: 5.12.0
hooks:
- id: isort
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ The Sagemcom F@st series is used by multiple cable companies, where some cable c
| Sagemcom F@st 5370e | Telia | sha512 | |
| Sagemcom F@st 5566 | Bell (Home Hub 3000) | md5 | username: guest, password: "" |
| Sagemcom F@st 5689 | Bell (Home Hub 4000) | md5 | username: admin, password: "" |
| Sagemcom F@st 5689E | Bell (Giga Hub) | sha512 | username: admin, password: "" |
| Sagemcom F@st 5690 | Bell (Giga Hub) | sha512 | username: admin, password: "" |
| Sagemcom F@st 5655V2 | MásMóvil | md5 | |
| Sagemcom F@st 5657IL | | md5 | |
| Speedport Pro | Telekom | md5 | username: admin |
Expand Down
3 changes: 2 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sagemcom_api"
version = "1.0.8"
version = "1.0.9"
description = "Python client to interact with SagemCom F@st routers via internal API's."
authors = ["Mick Vleeshouwer <[email protected]>"]
license = "MIT"
Expand All @@ -20,6 +20,7 @@ packages = [
python = ">=3.9,<4.0"
aiohttp = "^3.7.3"
pyhumps = "^3.0.2"
requests = ">2.0"

[tool.poetry.dev-dependencies]
pytest = "^7.1"
Expand Down
89 changes: 75 additions & 14 deletions sagemcom_api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from typing import Dict, List, Optional, Type
import urllib.parse

from aiohttp import ClientSession, ClientTimeout
from aiohttp import ClientConnectionError, ClientSession, ClientTimeout
from aiohttp.connector import TCPConnector
import humps

Expand All @@ -33,14 +33,15 @@
AccessRestrictionException,
AuthenticationException,
BadRequestException,
LoginConnectionException,
LoginTimeoutException,
MaximumSessionCountException,
NonWritableParameterException,
UnauthorizedException,
UnknownException,
UnknownPathException,
)
from .models import Device, DeviceInfo, PortMapping
from .models import Device, DeviceInfo, PortMapping, SpeedTestResult


class SagemcomClient:
Expand All @@ -55,6 +56,7 @@ def __init__(
session: ClientSession = None,
ssl=False,
verify_ssl=True,
keep_keys=False,
):
"""
Create a SagemCom client.
Expand All @@ -69,6 +71,7 @@ def __init__(
self.username = username
self.authentication_method = authentication_method
self._password_hash = self.__generate_hash(password)
self.keep_keys = keep_keys

self.protocol = "https" if ssl else "http"

Expand Down Expand Up @@ -156,19 +159,20 @@ def __get_response(self, response, index=0):

return value

def __get_response_value(self, response, index=0):
def __get_response_value(self, response, index=0, keep_keys = None):
"""Retrieve response value from value."""
try:
value = self.__get_response(response, index)["value"]
except KeyError:
value = None

# Rewrite result to snake_case
value = humps.decamelize(value)
if (keep_keys is not None and not keep_keys) or (keep_keys is None and not self.keep_keys):
value = humps.decamelize(value)

return value

async def __api_request_async(self, actions, priority=False):
async def __api_request_async(self, actions, priority=False, **request_kwargs):
"""Build request to the internal JSON-req API."""
self.__generate_request_id()
self.__generate_nonce()
Expand All @@ -188,7 +192,9 @@ async def __api_request_async(self, actions, priority=False):
}

async with self.session.post(
api_host, data="req=" + json.dumps(payload, separators=(",", ":"))
api_host,
data="req=" + json.dumps(payload, separators=(",", ":")),
**request_kwargs,
) as response:

if response.status == 400:
Expand Down Expand Up @@ -272,6 +278,10 @@ async def login(self):
raise LoginTimeoutException(
"Request timed-out. This is mainly due to using the wrong encryption method."
) from exception
except ClientConnectionError as exception:
raise LoginConnectionException(
"Unable to connect to the device. Please check the host address."
) from exception

data = self.__get_response(response)

Expand All @@ -293,7 +303,7 @@ async def logout(self):
self._request_id = -1

async def get_value_by_xpath(
self, xpath: str, options: Optional[Dict] = {}
self, xpath: str, options: Optional[Dict] = {}, keep_keys = None
) -> Dict:
"""
Retrieve raw value from router using XPath.
Expand All @@ -309,11 +319,11 @@ async def get_value_by_xpath(
}

response = await self.__api_request_async([actions], False)
data = self.__get_response_value(response)
data = self.__get_response_value(response, keep_keys = keep_keys)

return data

async def get_values_by_xpaths(self, xpaths, options: Optional[Dict] = {}) -> Dict:
async def get_values_by_xpaths(self, xpaths, options: Optional[Dict] = {}, keep_keys = None) -> Dict:
"""
Retrieve raw values from router using XPath.

Expand All @@ -331,7 +341,7 @@ async def get_values_by_xpaths(self, xpaths, options: Optional[Dict] = {}) -> Di
]

response = await self.__api_request_async(actions, False)
values = [self.__get_response_value(response, i) for i in range(len(xpaths))]
values = [self.__get_response_value(response, i, keep_keys = keep_keys) for i in range(len(xpaths))]
data = dict(zip(xpaths.keys(), values))

return data
Expand Down Expand Up @@ -361,7 +371,7 @@ async def set_value_by_xpath(
async def get_device_info(self) -> DeviceInfo:
"""Retrieve information about Sagemcom F@st device."""
try:
data = await self.get_value_by_xpath("Device/DeviceInfo")
data = await self.get_value_by_xpath("Device/DeviceInfo", keep_keys = False)
return DeviceInfo(**data.get("device_info"))
except UnknownPathException:
data = await self.get_values_by_xpaths(
Expand All @@ -380,7 +390,7 @@ async def get_device_info(self) -> DeviceInfo:

async def get_hosts(self, only_active: Optional[bool] = False) -> List[Device]:
"""Retrieve hosts connected to Sagemcom F@st device."""
data = await self.get_value_by_xpath("Device/Hosts/Hosts")
data = await self.get_value_by_xpath("Device/Hosts/Hosts", keep_keys = False)
devices = [Device(**d) for d in data]

if only_active:
Expand All @@ -391,11 +401,30 @@ async def get_hosts(self, only_active: Optional[bool] = False) -> List[Device]:

async def get_port_mappings(self) -> List[PortMapping]:
"""Retrieve configured Port Mappings on Sagemcom F@st device."""
data = await self.get_value_by_xpath("Device/NAT/PortMappings")
data = await self.get_value_by_xpath("Device/NAT/PortMappings", keep_keys = False)
port_mappings = [PortMapping(**p) for p in data]

return port_mappings

async def get_logs(self) -> List[str]:
"""
Retrieve system logs.
"""

actions = {
"id": 0,
"method": "getVendorLogDownloadURI",
"xpath": urllib.parse.quote("Device/DeviceInfo/VendorLogFiles/VendorLogFile[@uid='1']"),
}

response = await self.__api_request_async([actions], False)
log_path = response["reply"]["actions"][0]["callbacks"][0]["parameters"]["uri"]

log_uri = f"{self.protocol}://{self.host}{log_path}"
response = await self.session.get(log_uri, timeout=10)

return await response.text()

async def reboot(self):
"""Reboot Sagemcom F@st device."""
action = {
Expand All @@ -405,6 +434,38 @@ async def reboot(self):
}

response = await self.__api_request_async([action], False)
data = self.__get_response_value(response)
data = self.__get_response_value(response, keep_keys = False)

return data

async def run_speed_test(self, block_traffic: bool = False):
"""Run Speed Test on Sagemcom F@st device."""
actions = [
{
"id": 0,
"method": "speedTestClient",
"xpath": "Device/IP/Diagnostics/SpeedTest",
"parameters": {"BlockTraffic": block_traffic},
}
]
return await self.__api_request_async(actions, False, timeout=100)

async def get_speed_test_results(self):
"""Retrieve Speed Test results from Sagemcom F@st device."""
ret = await self.get_value_by_xpath("Device/IP/Diagnostics/SpeedTest")
history = ret["speed_test"]["history"]
if history:
timestamps = (int(k) for k in history["timestamp"].split(","))
server_address = history["selected_server_address"].split(",")
block_traffic = history["block_traffic"].split(",")
latency = history["latency"].split(",")
upload = (float(k) for k in history["upload"].split(","))
download = (float(k) for k in history["download"].split(","))
results = [
SpeedTestResult(*data)
for data in zip(
timestamps, server_address, block_traffic, latency, upload, download
)
]
return results
return []
6 changes: 6 additions & 0 deletions sagemcom_api/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ class LoginTimeoutException(Exception):
pass


class LoginConnectionException(Exception):
"""Raised when a connection error is encountered during login."""

pass


class NonWritableParameterException(Exception):
"""Raised when provided parameter is not writable."""

Expand Down
28 changes: 28 additions & 0 deletions sagemcom_api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import dataclasses
from dataclasses import dataclass
import time
from typing import Any, List, Optional


Expand Down Expand Up @@ -162,3 +163,30 @@ def __init__(self, **kwargs):
def id(self):
"""Return unique ID for port mapping."""
return self.uid


@dataclass
class SpeedTestResult:
"""Representation of a speedtest result."""

timestamp: str
selected_server_address: str
block_traffic: bool
latency: str
upload: str
download: str

def __post_init__(self):
"""Process data after init."""
# Convert timestamp to datetime object.
self.timestamp = time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(self.timestamp)
)
self.block_traffic = bool(self.block_traffic)

def __str__(self) -> str:
"""Return string representation of speedtest result."""
return (
f"timestamp: {self.timestamp}, latency: {self.latency}, "
f"upload: {self.upload}, download: {self.download}"
)