-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Tobias Wolf <[email protected]>
- Loading branch information
1 parent
14541a6
commit b417dc3
Showing
5 changed files
with
134 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
from socket import AF_INET, IPPROTO_TCP, SO_REUSEADDR, SOCK_STREAM, SOL_SOCKET, socket | ||
|
||
from threading import Event, RLock | ||
|
||
from paramiko import ( | ||
AUTH_FAILED, | ||
AUTH_SUCCESSFUL, | ||
OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED, | ||
OPEN_SUCCEEDED, | ||
AutoAddPolicy, | ||
RSAKey, | ||
ServerInterface, | ||
SSHClient, | ||
Transport, | ||
) | ||
|
||
class MockSSHServer(ServerInterface): | ||
"""An ssh server accepting the pre-generated key.""" | ||
|
||
ssh_username = "pytest" | ||
ssh_key = RSAKey.generate(4096) | ||
|
||
def __init__(self): | ||
self._channel = None | ||
self._client = None | ||
self._command = None | ||
self.event = Event() | ||
self._server_transport = None | ||
self._thread_lock = RLock() | ||
|
||
def __del__(self): | ||
self.close() | ||
|
||
@property | ||
def client(self): | ||
with self._thread_lock: | ||
if self._client is None: | ||
connection_event = Event() | ||
|
||
server_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) | ||
server_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) | ||
server_socket.bind(("127.0.0.1", 0)) | ||
server_socket.listen() | ||
|
||
server_address = server_socket.getsockname() | ||
|
||
client_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) | ||
client_socket.connect(server_address) | ||
|
||
(transport_socket, _) = server_socket.accept() | ||
|
||
self._server_transport = Transport(transport_socket) | ||
self._server_transport.add_server_key(self.__class__.ssh_key) | ||
self._server_transport.start_server(connection_event, self) | ||
|
||
self._client = SSHClient() | ||
self._client.set_missing_host_key_policy(AutoAddPolicy()) | ||
|
||
self._client.connect( | ||
server_address[0], | ||
server_address[1], | ||
username=self.__class__.ssh_username, | ||
pkey=self.__class__.ssh_key, | ||
sock=client_socket | ||
) | ||
|
||
connection_event.wait() | ||
|
||
return self._client | ||
|
||
def check_channel_request(self, kind, chanid): | ||
if kind == "session": | ||
return OPEN_SUCCEEDED | ||
return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED | ||
|
||
def check_auth_password(self, username, password): | ||
return AUTH_FAILED | ||
|
||
def check_auth_publickey(self, username, key): | ||
return ( | ||
AUTH_SUCCESSFUL | ||
if username == self.__class__.ssh_username and key == self.__class__.ssh_key else | ||
AUTH_FAILED | ||
) | ||
|
||
def check_channel_exec_request(self, channel, command): | ||
if self.event.is_set(): | ||
return False | ||
|
||
self.event.set() | ||
|
||
self._channel = channel | ||
self._command = command | ||
|
||
return True | ||
|
||
def close(self): | ||
if self._server_transport is not None: | ||
self._server_transport.close() | ||
self._server_transport = None | ||
|
||
def get_allowed_auths(self, username): | ||
return "publickey" if username == self.__class__.ssh_username else "" | ||
|
||
def handle_exec_request(self, _callable): | ||
if not callable(_callable): | ||
raise RuntimeError("Handler function given is invalid") | ||
|
||
_callable(self._command, self._channel) | ||
|
||
self._channel = None | ||
self._client = None | ||
|
||
self.event.clear() |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
import pytest | ||
|
||
from rookify.modules.example import ExampleHandler | ||
from rookify.modules.module import ModuleException | ||
|
||
def test_preflight_check(): | ||
with pytest.raises(ModuleException): | ||
ExampleHandler({}, {}).preflight_check() |