diff --git a/README.md b/README.md index 4ebc4b2..b98c234 100644 --- a/README.md +++ b/README.md @@ -13,14 +13,15 @@ Add one-time password authentication to your SSH server. The following instructions are based on ubuntu, but they can be adapted for other Linux distributions. +Depending on linux distribution Python 2 or Python 3 may be installed, ssh-otp supports both. Installation ------------ -Copy `ssh-otp` to `/usr/local/bin`: +Copy `ssh-otp` and `ssh-otp.py` to `/usr/local/bin`: sudo mkdir -p /usr/local/bin - sudo cp ssh-otp + sudo cp ssh-otp ssh-otp.py Add the following line in your `/etc/ssh/sshd_config`: @@ -34,7 +35,9 @@ And restart sshd: Enable ------ -Generate one-time password secret for current user: +If no one-time password has been generated the ssh-otp skips asking +for OTP. +If you generate a one-time password secret for current user with: ssh-otp setup @@ -43,7 +46,7 @@ and type in the displayed code on your authenticator to actually enable one-time password authentication on SSH conneciton. -You can find the configuration file at: +The generated configuration file will be available at: ~/.ssh/otp diff --git a/ssh-otp b/ssh-otp index 3ca56f5..374c8f3 100755 --- a/ssh-otp +++ b/ssh-otp @@ -1,236 +1,46 @@ -#!/usr/bin/env python +#!/bin/bash +# Smart Python launcher that tries python3, python, then python2 +# This script automatically finds the best available Python interpreter -from __future__ import with_statement -import sys -import os -import yaml -import argparse -import signal -import hmac -import hashlib -import struct -import base64 -import time -import socket -import urllib +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PYTHON_SCRIPT="$SCRIPT_DIR/ssh-otp.py" -class Action(object): - """ - Action base. - """ - - def __init__(self): - """ - Loads configuration from ~/.ssh/otp - """ - - self.config = { - 'debug': False, - 'enable': False, - 'secret': '', - 'timeout': 120, - 'delay': 3, - 'drift_backward': 1, - 'drift_forward': 1, - } - self.config_path = os.path.join(os.environ['HOME'], '.ssh', 'otp') - self.load() - - def load(self): - try: - with open(self.config_path, 'rb') as f: - self.config.update(yaml.load(f) or {}) - except IOError: - pass - - def save(self): - with open(self.config_path, 'wb') as f: - yaml.dump(self.config, f, default_flow_style=False) - os.chmod(self.config_path, 0600) - - def check(self, code): - drift_backward = max(0, self.config['drift_backward']) - drift_forward = max(0, self.config['drift_forward']) - - for drift in range(-drift_backward, drift_forward + 1): - if code == self.totp(self.config['secret'], drift=drift): - return True - return False - - def totp(self, key, length=6, hash=hashlib.sha1, period=30, drift=0): - counter = int(int(time.time()) / period) + drift - return self.hotp(key, counter, length=length, hash=hash, drift=0) - - def hotp(self, key, counter, length=6, hash=hashlib.sha1, drift=0): - counter = struct.pack('>Q', counter + drift) - key = base64.b32decode(key) - - digest = hmac.new(key, counter, hash).digest() - - offset = ord(digest[-1]) & 0xF - value = struct.unpack('>L', digest[offset:offset + 4])[0] & 0x7FFFFFFFL - code = ('%010d' % value)[-length:] - return code - -class Login(Action): - - def __init__(self): - super(Login, self).__init__() - - # dump environ for debugging - if self.config['debug']: - for name, value in os.environ.iteritems(): - sys.stderr.write('%s = %s\n' % (name, value)) - - # setup timeout - signal.signal(signal.SIGALRM, self.fail) - - def shell(self, command=''): - if command: - os.execl('/bin/bash', '/bin/bash', '-c', command) - else: - shell = os.environ['SHELL'] - os.execl(shell, shell, '-l') - assert False - - def success(self): - self.shell(os.environ.get('SSH_ORIGINAL_COMMAND', '')) - - def fail(self, *args, **kwargs): - os._exit(1) - - def run(self): - # if not enabled, then simply run shell - if not self.config['enable']: - self.success() - - # is the code set in environment? - code = os.environ.get('OTP', '') - if code: - if self.check(code): - self.success() - else: - self.fail() - - # setup timeout - signal.alarm(self.config['timeout']) - - # let user try until success or timeout - try: - while True: - # get code - sys.stderr.write('One-time password: ') - sys.stderr.flush() - - code = raw_input().strip() - if self.check(code): - break - - # delay - time.sleep(self.config['delay']) - sys.stderr.write('Incorrect code. Please try again.\n\n') - - except: - self.fail() - - # disable timeout - signal.alarm(0) - - self.success() - -class Setup(Action): - - def run(self): - sys.stdout.write('\n') - - if not self.config['secret']: - self.generate() - - sys.stdout.write('New one-time password secret\n') - sys.stdout.write('----------------------------\n') - else: - sys.stdout.write('Existing one-time password secret\n') - sys.stdout.write('---------------------------------\n') - - self.display() - - if self.config['enable']: - return - - self.test() - - def generate(self): - # 320 bit of entropy - secret = os.urandom(40) - secret = base64.b32encode(secret) - self.config['secret'] = secret - self.save() - - def display(self): - sys.stdout.write('Secret: %s\n' % self.config['secret']) - - otpauth_url = 'otpauth://totp' - otpauth_url += urllib.quote('/ssh %s@%s' % (os.environ['USER'], socket.gethostname())) - otpauth_url += '?' + urllib.urlencode({ - 'secret': self.config['secret'] - }) - - qrcode_url = 'https://chart.googleapis.com/chart?' + urllib.urlencode({ - 'cht': 'qr', - 'chs': '512x512', - 'chl': otpauth_url, - }) - sys.stdout.write('QRCode: %s\n' % qrcode_url) - sys.stdout.write('Status: %s\n' % ('Enabled' if self.config['enable'] else 'Disabled')) - sys.stdout.write('\n') - - def test(self): - sys.stdout.write('To enable one-time password, please setup your authenticator.\n') - - try: - while True: - # get code - sys.stderr.write('One-time password: ') - sys.stderr.flush() - - code = raw_input().strip() - if self.check(code): - break - - sys.stderr.write('Incorrect code. Please try again.\n\n') - - except: - sys.stdout.write('\nFailed to enable one-time password.\n') - sys.stdout.write('Please rerun setup to try again.\n') - sys.exit(1) - - self.config['enable'] = True - self.save() - - sys.stdout.write('Successful! One-time password is now enabled.\n') - sys.exit(0) - -class Reset(Action): - def run(self): - self.config['secret'] = '' - self.config['enable'] = False - self.save() - -ACTIONS = { - 'login': Login, - 'setup': Setup, - 'reset': Reset, +# Fallback function - single exit point +fallback_to_shell() { + echo "Warning: Allowing fallback access to prevent lockout" >&2 + exec /bin/bash -l } -def parse_args(): - parser = argparse.ArgumentParser(description='SSH One-time Password Authentication') - parser.add_argument('action', choices=ACTIONS.keys()) - return parser.parse_args() - -def main(args): - action_cls = ACTIONS[args.action] - action = action_cls() - action.run() - -if __name__ == '__main__': - main(parse_args()) +# Check if Python script exists and is readable +if [ ! -f "$PYTHON_SCRIPT" ]; then + echo "Error: Python script not found: $PYTHON_SCRIPT" >&2 + fallback_to_shell +fi + +if [ ! -r "$PYTHON_SCRIPT" ]; then + echo "Error: Python script not readable: $PYTHON_SCRIPT" >&2 + fallback_to_shell +fi + +# Try Python versions in order of preference +for python_cmd in python3 python python2; do + if command -v "$python_cmd" >/dev/null 2>&1; then + # Test if we can actually execute the Python script + if "$python_cmd" -c "import sys; sys.exit(0)" 2>/dev/null; then + # Try to run the actual script, capture any errors + if "$python_cmd" "$PYTHON_SCRIPT" --help >/dev/null 2>&1; then + # Script runs successfully, execute it with arguments + exec "$python_cmd" "$PYTHON_SCRIPT" "$@" + else + echo "Warning: Python script failed to run" >&2 + fallback_to_shell + fi + else + echo "Warning: Python interpreter '$python_cmd' failed basic test" >&2 + continue + fi + fi +done + +echo "Error: No working Python interpreter found (python3, python, or python2)" >&2 +fallback_to_shell diff --git a/ssh-otp.py b/ssh-otp.py new file mode 100644 index 0000000..2e6272f --- /dev/null +++ b/ssh-otp.py @@ -0,0 +1,331 @@ +#!/usr/bin/env python + +from __future__ import with_statement, print_function +import sys +import os +import re +import json +import argparse +import signal +import hmac +import hashlib +import struct +import base64 +import time +import socket + +# Use Python 3's built-in compatibility +try: + # Python 3 + from urllib.parse import quote, urlencode + input_func = input + oct_mode = 0o600 +except ImportError: + # Python 2 + from urllib import quote, urlencode + input_func = raw_input + oct_mode = 0o600 # Use 0o prefix for Python 3 compatibility + +class Action(object): + """ + Action base. + """ + + def __init__(self): + """ + Loads configuration from ~/.ssh/otp + """ + + self.config = { + 'debug': False, + 'enable': False, + 'secret': '', + 'timeout': 120, + 'delay': 3, + 'drift_backward': 1, + 'drift_forward': 1, + } + self.config_path = os.path.join(os.environ['HOME'], '.ssh', 'otp') + self.load() + + def yaml_load(self, stream): + """Minimal YAML loader that handles basic key-value pairs""" + try: + content = stream.read() + # Handle bytes to string conversion + if hasattr(content, 'decode'): + content = content.decode('utf-8') + if not content.strip(): + return {} + + # Simple YAML to JSON conversion for basic key-value pairs + result = {} + lines = content.split('\n') + + for line in lines: + line = line.strip() + if not line or line.startswith('#'): + continue + + # Match key: value pattern + match = re.match(r'^(\w+):\s*(.+)$', line) + if match: + key = match.group(1) + value_str = match.group(2).strip() + + # Convert value types + if value_str.lower() in ('true', 'false'): + value = value_str.lower() == 'true' + elif value_str.isdigit(): + value = int(value_str) + elif value_str.startswith('"') and value_str.endswith('"'): + value = value_str[1:-1] + else: + value = value_str + + result[key] = value + + return result + except Exception as e: + # Return empty dict on any error (including file not found) + return {} + + def yaml_dump(self, data, stream=None, default_flow_style=False): + """Minimal YAML dumper""" + lines = [] + for key, value in data.items(): + if isinstance(value, bool): + value_str = str(value).lower() + elif isinstance(value, str): + value_str = '"{}"'.format(value) + else: + value_str = str(value) + lines.append('{}: {}'.format(key, value_str)) + + text = '\n'.join(lines) + if stream is None: + return text + # Handle string to bytes conversion + if hasattr(stream, 'write'): + if hasattr(text, 'encode'): + stream.write(text.encode('utf-8')) + else: + stream.write(text) + + def load(self): + try: + with open(self.config_path, 'rb') as f: + self.config.update(self.yaml_load(f) or {}) + except (IOError, OSError): + pass + + def save(self): + with open(self.config_path, 'wb') as f: + self.yaml_dump(self.config, f, default_flow_style=False) + os.chmod(self.config_path, oct_mode) + + def check(self, code): + drift_backward = max(0, self.config['drift_backward']) + drift_forward = max(0, self.config['drift_forward']) + + for drift in range(-drift_backward, drift_forward + 1): + if code == self.totp(self.config['secret'], drift=drift): + return True + return False + + def totp(self, key, length=6, hash=hashlib.sha1, period=30, drift=0): + counter = int(int(time.time()) / period) + drift + return self.hotp(key, counter, length=length, hash=hash, drift=0) + + def hotp(self, key, counter, length=6, hash=hashlib.sha1, drift=0): + counter = struct.pack('>Q', counter + drift) + key = base64.b32decode(key) + + digest = hmac.new(key, counter, hash).digest() + + # Handle bytes indexing for Python 2/3 compatibility + try: + # Python 3: digest[-1] returns int directly + offset = digest[-1] & 0xF + except TypeError: + # Python 2: digest[-1] returns string, need ord() + offset = ord(digest[-1]) & 0xF + + value = struct.unpack('>L', digest[offset:offset + 4])[0] & 0x7FFFFFFF + code = ('%010d' % value)[-length:] + return code + +class Login(Action): + + def __init__(self): + super(Login, self).__init__() + + # dump environ for debugging + if self.config['debug']: + for name, value in os.environ.items(): + sys.stderr.write('%s = %s\n' % (name, value)) + + # setup timeout + signal.signal(signal.SIGALRM, self.fail) + + def shell(self, command=''): + if command: + os.execl('/bin/bash', '/bin/bash', '-c', command) + else: + shell = os.environ['SHELL'] + os.execl(shell, shell, '-l') + assert False + + def success(self): + self.shell(os.environ.get('SSH_ORIGINAL_COMMAND', '')) + + def fail(self, *args, **kwargs): + os._exit(1) + + def run(self): + # if not enabled, then simply run shell + if not self.config['enable']: + self.success() + + # is the code set in environment? + code = os.environ.get('OTP', '') + if code: + if self.check(code): + self.success() + else: + self.fail() + + # setup timeout + signal.alarm(self.config['timeout']) + + # let user try until success or timeout + try: + while True: + # get code + sys.stderr.write('One-time password: ') + sys.stderr.flush() + + code = input_func().strip() + if self.check(code): + break + + # delay + time.sleep(self.config['delay']) + sys.stderr.write('Incorrect code. Please try again.\n\n') + + except (KeyboardInterrupt, EOFError): + self.fail() + + # disable timeout + signal.alarm(0) + + self.success() + +class Setup(Action): + + def run(self): + sys.stdout.write('\n') + + if not self.config['secret']: + self.generate() + + sys.stdout.write('New one-time password secret\n') + sys.stdout.write('----------------------------\n') + else: + sys.stdout.write('Existing one-time password secret\n') + sys.stdout.write('---------------------------------\n') + + self.display() + + if self.config['enable']: + return + + self.test() + + def generate(self): + # 320 bit of entropy + secret = os.urandom(40) + secret = base64.b32encode(secret) + # Handle bytes to string conversion + if hasattr(secret, 'decode'): + secret = secret.decode('ascii') + self.config['secret'] = secret + self.save() + + def display(self): + sys.stdout.write('Secret: %s\n' % self.config['secret']) + + otpauth_url = 'otpauth://totp' + otpauth_url += quote('/ssh %s@%s' % (os.environ['USER'], socket.gethostname())) + otpauth_url += '?' + urlencode({ + 'secret': self.config['secret'] + }) + + # Use QR Server API (working alternative to Google Charts) + qrcode_url = 'https://api.qrserver.com/v1/create-qr-code/?' + urlencode({ + 'size': '512x512', + 'data': otpauth_url, + 'format': 'png' + }) + + sys.stdout.write('QRCode: %s\n' % qrcode_url) + sys.stdout.write('otpauth URL: %s\n' % otpauth_url) + sys.stdout.write('Status: %s\n' % ('Enabled' if self.config['enable'] else 'Disabled')) + sys.stdout.write('\n') + sys.stdout.write('Setup Instructions:\n') + sys.stdout.write('1. Open the QRCode URL in your browser to see the QR code\n') + sys.stdout.write('2. Scan the QR code with your authenticator app (Google Authenticator, Authy, etc.)\n') + sys.stdout.write('3. Or manually enter the secret: %s\n' % self.config['secret']) + sys.stdout.write('4. Test the setup by entering a code when prompted\n') + sys.stdout.write('\n') + + def test(self): + sys.stdout.write('To enable one-time password, please setup your authenticator.\n') + + try: + while True: + # get code + sys.stderr.write('One-time password: ') + sys.stderr.flush() + + code = input_func().strip() + if self.check(code): + break + + sys.stderr.write('Incorrect code. Please try again.\n\n') + + except (KeyboardInterrupt, EOFError): + sys.stdout.write('\nFailed to enable one-time password.\n') + sys.stdout.write('Please rerun setup to try again.\n') + sys.exit(1) + + self.config['enable'] = True + self.save() + + sys.stdout.write('Successful! One-time password is now enabled.\n') + sys.exit(0) + +class Reset(Action): + def run(self): + self.config['secret'] = '' + self.config['enable'] = False + self.save() + +ACTIONS = { + 'login': Login, + 'setup': Setup, + 'reset': Reset, +} + +def parse_args(): + parser = argparse.ArgumentParser(description='SSH One-time Password Authentication') + parser.add_argument('action', choices=ACTIONS.keys()) + return parser.parse_args() + +def main(args): + action_cls = ACTIONS[args.action] + action = action_cls() + action.run() + +if __name__ == '__main__': + main(parse_args()) \ No newline at end of file