-
Notifications
You must be signed in to change notification settings - Fork 62
Commit
Implement the GMP as an IO independent software stack. With this core protocol it will be possible to implement all kind of stacks for GMP easily.
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
# SPDX-FileCopyrightText: 2024 Greenbone AG | ||
# | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
from ._connection import Connection | ||
from ._request import Request | ||
from ._response import Response, StatusError | ||
|
||
__all__ = ( | ||
"Connection", | ||
"Request", | ||
"Response", | ||
"StatusError", | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
# SPDX-FileCopyrightText: 2024 Greenbone AG | ||
# | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
from typing import AnyStr, Optional, Protocol | ||
|
||
from lxml import etree | ||
|
||
from gvm.errors import GvmError | ||
|
||
from ._request import Request | ||
from ._response import Response | ||
|
||
|
||
class XmlReader: | ||
""" | ||
Read a XML command until its closing element | ||
""" | ||
|
||
def start_xml(self) -> None: | ||
self._first_element: Optional[etree._Element] = None | ||
# act on start and end element events and | ||
# allow huge text data (for report content) | ||
self._parser = etree.XMLPullParser( | ||
events=("start", "end"), huge_tree=True | ||
) | ||
|
||
def is_end_xml(self) -> bool: | ||
for action, obj in self._parser.read_events(): | ||
if not self._first_element and action in "start": | ||
self._first_element = obj.tag # type: ignore | ||
|
||
if ( | ||
self._first_element | ||
and action in "end" | ||
and str(self._first_element) == str(obj.tag) # type: ignore | ||
): | ||
return True | ||
return False | ||
|
||
def feed_xml(self, data: AnyStr) -> None: | ||
try: | ||
self._parser.feed(data) | ||
except etree.ParseError as e: | ||
raise GvmError( | ||
f"Cannot parse XML response. Response data read {data!r}", | ||
e, | ||
) from None | ||
|
||
|
||
class InvalidStateError(GvmError): | ||
def __init__(self, message: str = "Invalid State", *args): | ||
super().__init__(message, *args) | ||
|
||
|
||
class State(Protocol): | ||
def __set_context__(self, context: "Context") -> None: ... | ||
Check notice Code scanning / CodeQL Statement has no effect Note
This statement has no effect.
|
||
def send(self, request: Request) -> bytes: ... | ||
Check notice Code scanning / CodeQL Statement has no effect Note
This statement has no effect.
|
||
def receive_data(self, data: bytes) -> Optional[Response]: ... | ||
Check notice Code scanning / CodeQL Statement has no effect Note
This statement has no effect.
|
||
def close(self) -> None: ... | ||
Check notice Code scanning / CodeQL Statement has no effect Note
This statement has no effect.
|
||
|
||
|
||
class Context(Protocol): | ||
def __set_state__(self, state: State) -> None: ... | ||
Check notice Code scanning / CodeQL Statement has no effect Note
This statement has no effect.
|
||
|
||
|
||
class AbstractState: | ||
_context: Context | ||
|
||
def __set_context__(self, context: Context) -> None: | ||
self._context = context | ||
|
||
def set_next_state(self, next_state: State) -> None: | ||
self._context.__set_state__(next_state) | ||
|
||
|
||
class InitialState(AbstractState): | ||
def send(self, request: Request) -> bytes: | ||
self.set_next_state(AwaitingResponseState(request)) | ||
return bytes(request) | ||
|
||
def receive_data(self, data: bytes) -> Optional[Response]: | ||
raise InvalidStateError() | ||
|
||
def close(self) -> None: | ||
# nothing to do | ||
return | ||
|
||
|
||
class AwaitingResponseState(AbstractState): | ||
def __init__(self, request: Request) -> None: | ||
self._request = request | ||
|
||
def send(self, request: Request) -> bytes: | ||
raise InvalidStateError() | ||
|
||
def close(self) -> None: | ||
self.set_next_state(InitialState()) | ||
|
||
def receive_data(self, data: bytes) -> Optional[Response]: | ||
next_state = ReceivingDataState(self._request) | ||
self.set_next_state(next_state) | ||
return next_state.receive_data(data) | ||
|
||
|
||
class ErrorState(AbstractState): | ||
message = ( | ||
"The connection is in an error state. Please close the connection." | ||
) | ||
|
||
def send(self, request: Request) -> bytes: | ||
raise InvalidStateError(self.message) | ||
|
||
def close(self) -> None: | ||
self.set_next_state(InitialState()) | ||
|
||
def receive_data(self, data: bytes) -> Optional[Response]: | ||
raise InvalidStateError(self.message) | ||
|
||
|
||
class ReceivingDataState(AbstractState): | ||
def __init__(self, request: Request) -> None: | ||
self._request = request | ||
self._data = bytearray() | ||
self._reader = XmlReader() | ||
self._reader.start_xml() | ||
|
||
def send(self, request: Request) -> bytes: | ||
raise InvalidStateError() | ||
|
||
def close(self) -> None: | ||
self.set_next_state(InitialState()) | ||
|
||
def receive_data(self, data: bytes) -> Optional[Response]: | ||
self._data += data | ||
try: | ||
self._reader.feed_xml(data) | ||
except GvmError as e: | ||
self.set_next_state(ErrorState()) | ||
raise e | ||
|
||
if not self._reader.is_end_xml(): | ||
return None | ||
|
||
self.set_next_state(InitialState()) | ||
return Response(data=bytes(self._data), request=self._request) | ||
|
||
|
||
class Connection: | ||
""" | ||
This is a [SansIO]() connection and not a socket connection | ||
It is responsible for | ||
""" | ||
|
||
def __init__(self) -> None: | ||
self.__set_state__(InitialState()) | ||
|
||
def send(self, request: Request) -> bytes: | ||
return self._state.send(request) | ||
|
||
def receive_data(self, data: bytes) -> Optional[Response]: | ||
return self._state.receive_data(data) | ||
|
||
def close(self) -> None: | ||
return self._state.close() | ||
|
||
def __set_state__(self, state: State) -> None: | ||
self._state = state | ||
self._state.__set_context__(self) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
# SPDX-FileCopyrightText: 2024 Greenbone AG | ||
# | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
from typing import Protocol, runtime_checkable | ||
|
||
|
||
@runtime_checkable | ||
class Request(Protocol): | ||
def __bytes__(self) -> bytes: ... | ||
Check notice Code scanning / CodeQL Statement has no effect Note
This statement has no effect.
|
||
def __str__(self) -> str: ... | ||
Check notice Code scanning / CodeQL Statement has no effect Note
This statement has no effect.
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
# SPDX-FileCopyrightText: 2024 Greenbone AG | ||
# | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
from functools import cached_property | ||
from typing import Optional | ||
|
||
from typing_extensions import Self | ||
|
||
from gvm.errors import GvmError | ||
from gvm.xml import Element, parse_xml | ||
|
||
from ._request import Request | ||
|
||
|
||
class StatusError(GvmError): | ||
def __init__(self, message: str | None, *args, response: "Response"): | ||
super().__init__(message, *args) | ||
self.response = response | ||
self.request = response.request | ||
|
||
|
||
class Response: | ||
def __init__(self, *, request: Request, data: bytes) -> None: | ||
self._request = request | ||
self._data = data | ||
self.__xml: Optional[Element] = None | ||
|
||
def __root_element(self) -> Element: | ||
if self.__xml is None: | ||
self.__xml = self.xml() | ||
return self.__xml | ||
|
||
def xml(self) -> Element: | ||
return parse_xml(self.data) | ||
|
||
@property | ||
def data(self) -> bytes: | ||
return self._data | ||
|
||
@property | ||
def request(self) -> Request: | ||
return self._request | ||
|
||
@cached_property | ||
def status_code(self) -> Optional[int]: | ||
root = self.__root_element() | ||
try: | ||
status = root.attrib["status"] | ||
return int(status) | ||
except (KeyError, ValueError): | ||
return None | ||
|
||
@property | ||
def is_success(self) -> bool: | ||
status = self.status_code | ||
return status is not None and 200 <= status <= 299 | ||
|
||
def raise_for_status(self) -> Self: | ||
if self.is_success: | ||
return self | ||
raise StatusError( | ||
f"Invalid status code {self.status_code}", response=self | ||
) | ||
|
||
def __bytes__(self) -> bytes: | ||
return self._data | ||
|
||
def __str__(self) -> str: | ||
return self._data.decode() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# SPDX-FileCopyrightText: 2024 Greenbone AG | ||
# | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
from ._auth import Authentication | ||
from ._port_list import PortList, PortRangeType | ||
from ._resource_names import ResourceNames, ResourceType | ||
from ._version import Version | ||
|
||
__all__ = ( | ||
"Authentication", | ||
"PortList", | ||
"PortRangeType", | ||
"Version", | ||
"ResourceNames", | ||
"ResourceType", | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
# SPDX-FileCopyrightText: 2024 Greenbone AG | ||
# | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
from gvm.errors import RequiredArgument | ||
from gvm.xml import XmlCommand | ||
|
||
from .._request import Request | ||
|
||
|
||
class Authentication: | ||
|
||
@classmethod | ||
def authenticate(cls, username: str, password: str) -> Request: | ||
"""Authenticate to gvmd. | ||
The generated authenticate command will be send to server. | ||
Afterwards the response is read, transformed and returned. | ||
Args: | ||
username: Username | ||
password: Password | ||
""" | ||
cmd = XmlCommand("authenticate") | ||
|
||
if not username: | ||
raise RequiredArgument( | ||
function=cls.authenticate.__name__, argument="username" | ||
) | ||
|
||
if not password: | ||
raise RequiredArgument( | ||
function=cls.authenticate.__name__, argument="password" | ||
) | ||
|
||
credentials = cmd.add_element("credentials") | ||
credentials.add_element("username", username) | ||
credentials.add_element("password", password) | ||
return cmd | ||
|
||
@staticmethod | ||
def describe_auth() -> Request: | ||
"""Describe authentication methods | ||
Returns a list of all used authentication methods if such a list is | ||
available. | ||
""" | ||
return XmlCommand("describe_auth") | ||
|
||
@classmethod | ||
def modify_auth( | ||
cls, group_name: str, auth_conf_settings: dict[str, str] | ||
) -> Request: | ||
"""Modifies an existing authentication. | ||
Arguments: | ||
group_name: Name of the group to be modified. | ||
auth_conf_settings: The new auth config. | ||
""" | ||
if not group_name: | ||
raise RequiredArgument( | ||
function=cls.modify_auth.__name__, argument="group_name" | ||
) | ||
if not auth_conf_settings: | ||
raise RequiredArgument( | ||
function=cls.modify_auth.__name__, | ||
argument="auth_conf_settings", | ||
) | ||
|
||
cmd = XmlCommand("modify_auth") | ||
group = cmd.add_element("group", attrs={"name": str(group_name)}) | ||
|
||
for key, value in auth_conf_settings.items(): | ||
auth_conf = group.add_element("auth_conf_setting") | ||
auth_conf.add_element("key", key) | ||
auth_conf.add_element("value", value) | ||
|
||
return cmd |