diff --git a/str-twincoding/.gitignore b/str-twincoding/.gitignore index de3a1206d..94fed7b0a 100644 --- a/str-twincoding/.gitignore +++ b/str-twincoding/.gitignore @@ -4,8 +4,11 @@ venv file_1MB.dat *.encoded *.dat -recovered a.py +a.sh +a.svg +recovered twin_coding_nogf.py - - +output.html +repository +providers.jsonc diff --git a/str-twincoding/README-in.md b/str-twincoding/README-in.md new file mode 100644 index 000000000..40080fa7b --- /dev/null +++ b/str-twincoding/README-in.md @@ -0,0 +1,75 @@ + +## Orchid Storage Project + +_*Orchid Storage is a **work in progress*** - Orchid is an open source project. Help +us build a truly decentralized storage system._ + +***TODO: Link to whitepaper*** + +This repository contains work in progress on the file encoding CLI and server framework. + +![monitor](docs/monitor.png "Monitor") + +A key aspect of the Orchid Storage project is the development of an efficient encoding scheme that minimizes +bandwidth costs incurred during migration of distributed data through providers over time. + +**Twin Coding** is a hybrid encoding scheme that works with any two linear coding schemes and combines +them to achieve a space-bandwidth tradeoff, minimizing the amount of data that must be transferred +between storage nodes in order to recover a lost shard of data. In contrast to a traditional +erasure scheme, in which restoration of a lost node requires a full reconstruction of the original +file, Twin Coding allows for the recovery of a lost data shard with data transfer totalling exactly +the size of the lost data shard, with no additional transfer overhead. + + +This repository contains an implementation of Twin Coding, as well as a command line API for encoding +files, decoding files with erasures, and optimally recovering lost shards. There is also a + +See [`twin_coding.py`](encoding/twin_coding.py) for an explanation of the algorithm, example code, and a link to the original paper. + + +## Installation + +``` +# Create a virtual environment +python3 -m venv venv +``` + +``` +# Activate the virtual environment +# For macOS and Linux: +source venv/bin/activate +# For Windows: +.\venv\Scripts\activate +``` + +``` +# Install the dependencies +pip install -r requirements.txt +``` + +## Example Usage +``` +INSERT_USAGE +``` + +## Encoding CLI Examples + +See also [`examples.sh`](examples/examples.sh) + +``` +INSERT_EXAMPLES +``` + +## CLI Docs + +INSERT_STORAGE_DOCS + +## Server Docs +``` +INSERT_SERVER_DOCS +``` + +## Monitor Docs +``` +INSERT_MONITOR_DOCS +``` diff --git a/str-twincoding/README.md b/str-twincoding/README.md index b61d267bd..e041d6dc6 100644 --- a/str-twincoding/README.md +++ b/str-twincoding/README.md @@ -1,17 +1,30 @@ -## About +## Orchid Storage Project -Twin Coding is a hybrid encoding scheme that works with any two linear coding schemes and combines +_*Orchid Storage is a **work in progress*** - Orchid is an open source project. Help +us build a truly decentralized storage system._ + +***TODO: Link to whitepaper*** + +This repository contains work in progress on the file encoding CLI and server framework. + +![monitor](docs/monitor.png "Monitor") + +A key aspect of the Orchid Storage project is the development of an efficient encoding scheme that minimizes +bandwidth costs incurred during migration of distributed data through providers over time. + +**Twin Coding** is a hybrid encoding scheme that works with any two linear coding schemes and combines them to achieve a space-bandwidth tradeoff, minimizing the amount of data that must be transferred between storage nodes in order to recover a lost shard of data. In contrast to a traditional erasure scheme, in which restoration of a lost node requires a full reconstruction of the original file, Twin Coding allows for the recovery of a lost data shard with data transfer totalling exactly the size of the lost data shard, with no additional transfer overhead. + This repository contains an implementation of Twin Coding, as well as a command line API for encoding -files, decoding files with erasures, and optimally recovering lost shards. +files, decoding files with erasures, and optimally recovering lost shards. There is also a -See `twin_coding.py` for an explanation of the algorithm, example code, and a link to the original paper. +See [`twin_coding.py`](encoding/twin_coding.py) for an explanation of the algorithm, example code, and a link to the original paper. ## Installation @@ -34,53 +47,289 @@ source venv/bin/activate pip install -r requirements.txt ``` -## Usage +## Example Usage +``` +# Generate some test files +test-content.sh + +# Import a file into the default local repository with default encoding +storage.sh import data/foo_file.dat + +# List the repository +storage.sh repo list + +# Start a test provider server cluster +test-cluster.sh start 5001 5002 5003 5004 5005 + +# Confirm that the test servers are running +test-cluster.sh list + +# "Discover" these providers, adding them to our known provider list +# This will normally be done via the directory service and performed at file push time. +test-discover.sh 5001 5002 5003 5004 5005 + +# Start the monitor application (in another window) +monitor.sh --update 1 + +# Push the file by name +storage.sh push foo_file.dat -See also `examples.sh`. +# TODO: +# Monitor file availability while: +# Observing resilient upload progress +# Killing servers and prompting efficient rebuilds +# Shut downt the servers +test-cluster.sh stop ``` -# Generate some random data -dd if=/dev/urandom of="file_1KB.dat" bs=1K count=1 +## Encoding CLI Examples +See also [`examples.sh`](examples/examples.sh) + +``` # Encode a file, writing n files for each of the two node types to a ".encoded" directory. -./storage.sh encode \ - --path "file_1KB.dat" \ +encoded_file_path=$(storage.sh repo --path "$repository" file_path --file "$file") +storage.sh encode \ + --path "$file" \ + --output_path "$encoded_file_path" \ --encoding0 reed_solomon --k0 3 --n0 5 \ --encoding1 reed_solomon --k1 3 --n1 5 \ --overwrite +# This import command is equivalent to the above encode, usign the default repository path and encoding type. +storage.sh import "$file" + +# List files in the repository. +storage.sh repo --path "$repository" list + # Decode a file from an encoded storage directory, tolerant of missing files (erasures). -./storage.sh decode \ - --encoded "file_1KB.dat.encoded" \ - --recovered "recovered.dat" \ +recovered_file=$(storage.sh repo --path "$repository" tmp_file_path --file "recovered_${file}") +storage.sh decode \ + --encoded "$encoded_file_path" \ + --recovered "$recovered_file" \ --overwrite # Compare the original and decoded files. -cmp -s "file_1KB.dat" "recovered.dat" && echo "Passed" || echo "Failed" +cmp -s "$file" "$recovered_file" && echo "Passed" || echo "Failed" -# Generate shard recovery files for restoration of node type 1 index 0, using 3 (k) type 0 -# node sources (helper nodes), -for helper_node in 0 1 2 +# Prepare node recovery: Generate shard recovery source files for restoration of +# node type 1 index 0, using 3 (k) type 0 node sources (helper nodes), +recover_node_type=1 +recover_node_index=0 +for helper_node_index in 0 1 2 do -./storage.sh generate_recovery_file \ - --recover_node_index 0 \ - --recover_encoding reed_solomon --k 3 --n 5 \ - --data_path "file_1KB.dat.encoded/type0_node${helper_node}.dat" \ - --output_path "recover_type1_node0/recover_${helper_node}.dat" \ - --overwrite + helper_node_type=0 + helper_shard_file=$(storage.sh repo --path "$repository" shard_path \ + --file "$file" --node_type $helper_node_type --node_index $helper_node_index) + recovery_source_file=$(storage.sh repo --path "$repository" recovery_file_path \ + --file "$file" --recover_node_type $recover_node_type --recover_node_index $recover_node_index \ + --helper_node_index "$helper_node_index") + storage.sh generate_recovery_file \ + --recover_node_type $recover_node_type \ + --recover_node_index $recover_node_index \ + --recover_encoding reed_solomon --k 3 --n 5 \ + --data_path "$helper_shard_file" \ + --output_path "$recovery_source_file" \ + --overwrite done -# Recover the shard for node type 1 index 0 from the k (3) recovery files. -./storage.sh recover_node \ + +# Complete node recovery: Recover the shard for node type 1 index 0 from the k (3) recovery files. +recovered_shard_file=$(storage.sh repo --path "$repository" tmp_file_path \ + --file "recovered_${file}_type${recover_node_type}_node${recover_node_index}.dat") +storage.sh recover_node \ --k 3 --n 5 --encoding reed_solomon \ - --files_dir "recover_type1_node0" \ - --output_path "recovered_type1_0.dat" \ + --recover_node_type $recover_node_type \ + --recover_node_index $recover_node_index \ + --files_dir "$encoded_file_path" \ + --output_path "$recovered_shard_file" \ --overwrite # Compare the original and recovered data shards. -cmp -s "file_1KB.dat.encoded/type1_node0.dat" "recovered_type1_0.dat" && echo "Passed" || echo "Failed" +original_shard_file=$(storage.sh repo --path "$repository" shard_path \ + --file "$file" --node_type 1 --node_index 0) +cmp -s "$original_shard_file" "$recovered_shard_file" && echo "Passed" || echo "Failed" + +``` + +## CLI Docs +###`repo` ``` +usage: storage repo [-h] [--path PATH] REPO_COMMAND ... +positional arguments: + REPO_COMMAND Repository path commands available. + list List files in the repository. + file_path Get the path to an encoded file. + shard_path Get the path to a shard of the encoded file. + recovery_file_path + Get the path for a recovery file. + tmp_file_path Get the path for a temporary file. + +options: + -h, --help show this help message and exit + --path PATH Path to the repository. +None +``` +###`encode` +``` +usage: storage encode [-h] --path PATH --output_path OUTPUT_PATH --k0 K0 --n0 + N0 --k1 K1 --n1 N1 [--encoding0 ENCODING0] + [--encoding1 ENCODING1] [--overwrite] + +options: + -h, --help show this help message and exit + --path PATH Path to the file to encode. + --output_path OUTPUT_PATH + Output path for the encoded file. + --k0 K0 k value for node type 0. + --n0 N0 n value for node type 0. + --k1 K1 k value for node type 1. + --n1 N1 n value for node type 1. + --encoding0 ENCODING0 + Encoding for node type 0. + --encoding1 ENCODING1 + Encoding for node type 1. + --overwrite Overwrite existing files. +None +``` +###`decode` +``` +usage: storage decode [-h] --encoded ENCODED --recovered RECOVERED + [--overwrite] + +options: + -h, --help show this help message and exit + --encoded ENCODED Path to the encoded file. + --recovered RECOVERED + Path to the recovered file. + --overwrite Overwrite existing files. +None +``` +###`generate_recovery_file` +``` +usage: storage generate_recovery_file [-h] --recover_node_type + RECOVER_NODE_TYPE --recover_node_index + RECOVER_NODE_INDEX + [--recover_encoding RECOVER_ENCODING] + --k K --n N --data_path DATA_PATH + --output_path OUTPUT_PATH [--overwrite] + +options: + -h, --help show this help message and exit + --recover_node_type RECOVER_NODE_TYPE + Type of the recovering node. + --recover_node_index RECOVER_NODE_INDEX + Index of the recovering node. + --recover_encoding RECOVER_ENCODING + Encoding for the recovering node. + --k K k value for the recovering node. + --n N n value for the recovering node. + --data_path DATA_PATH + Path to the source node data. + --output_path OUTPUT_PATH + Path to the output recovery file. + --overwrite Overwrite existing files. +None +``` +###`recover_node` +``` +usage: storage recover_node [-h] --recover_node_type RECOVER_NODE_TYPE + --recover_node_index RECOVER_NODE_INDEX --k K --n + N [--encoding ENCODING] --files_dir FILES_DIR + --output_path OUTPUT_PATH [--overwrite] + +options: + -h, --help show this help message and exit + --recover_node_type RECOVER_NODE_TYPE + Type of the recovering node. + --recover_node_index RECOVER_NODE_INDEX + Index of the recovering node. + --k K k value for node type. + --n N n value for node type. + --encoding ENCODING Encoding for node type. + --files_dir FILES_DIR + Path to the recovery files. + --output_path OUTPUT_PATH + Path to the recovered file. + --overwrite Overwrite existing files. +None +``` +###`import` +``` +usage: storage import [-h] [--repo REPO] [--k0 K0] [--n0 N0] [--k1 K1] + [--n1 N1] [--encoding0 ENCODING0] + [--encoding1 ENCODING1] [--overwrite] + path + +positional arguments: + path Path to the file to import. + +options: + -h, --help show this help message and exit + --repo REPO Path to the repository. + --k0 K0 k value for node type 0. + --n0 N0 n value for node type 0. + --k1 K1 k value for node type 1. + --n1 N1 n value for node type 1. + --encoding0 ENCODING0 + Encoding for node type 0. + --encoding1 ENCODING1 + Encoding for node type 1. + --overwrite Overwrite existing files. +None +``` +###`push` +``` +usage: storage push [-h] [--repo REPO] [--servers [SERVERS ...]] [--validate] + file + +positional arguments: + file Name of the file in the repository. + +options: + -h, --help show this help message and exit + --repo REPO Path to the repository. + --servers [SERVERS ...] + List of server names or urls to push to. + --validate After push, download and reconstruct the file. +None +``` + +## Server Docs +``` +Using default repository: /Users/pat/Desktop/OrchidProject/lab.orchid.com/orchid/str-twincoding/repository +usage: server_cli.py [-h] [--config CONFIG] [--interface INTERFACE] + [--port PORT] [--repository_dir REPOSITORY_DIR] + [--auth_key AUTH_KEY] [--show_console] + +Flask server with argument parsing + +options: + -h, --help show this help message and exit + --config CONFIG server config file + --interface INTERFACE + Interface the server listens on + --port PORT Port the server listens on + --repository_dir REPOSITORY_DIR + Directory to store repository files + --auth_key AUTH_KEY Authentication key to validate requests + --show_console Flag to show console logs +``` + +## Monitor Docs +``` +usage: monitor_cli.py [-h] [--providers PROVIDERS] [--debug] [--update UPDATE] + +Process command line arguments. + +options: + -h, --help show this help message and exit + --providers PROVIDERS + Providers file path + --debug Show debug + --update UPDATE Update view with polling period seconds +``` diff --git a/str-twincoding/README.md.in b/str-twincoding/README.md.in deleted file mode 100644 index f2bf35d82..000000000 --- a/str-twincoding/README.md.in +++ /dev/null @@ -1,44 +0,0 @@ - -## About - -Twin Coding is a hybrid encoding scheme that works with any two linear coding schemes and combines -them to achieve a space-bandwidth tradeoff, minimizing the amount of data that must be transferred -between storage nodes in order to recover a lost shard of data. In contrast to a traditional -erasure scheme, in which restoration of a lost node requires a full reconstruction of the original -file, Twin Coding allows for the recovery of a lost data shard with data transfer totalling exactly -the size of the lost data shard, with no additional transfer overhead. - -This repository contains an implementation of Twin Coding, as well as a command line API for encoding -files, decoding files with erasures, and optimally recovering lost shards. - -See `twin_coding.py` for an explanation of the algorithm, example code, and a link to the original paper. - - -## Installation - -``` -# Create a virtual environment -python3 -m venv venv -``` - -``` -# Activate the virtual environment -# For macOS and Linux: -source venv/bin/activate -# For Windows: -.\venv\Scripts\activate -``` - -``` -# Install the dependencies -pip install -r requirements.txt -``` - -## Usage - -See also `examples.sh`. - -``` -INSERT_USAGE -``` - diff --git a/str-twincoding/build_readme.sh b/str-twincoding/build_readme.sh index d676b20b0..66b303bd7 100755 --- a/str-twincoding/build_readme.sh +++ b/str-twincoding/build_readme.sh @@ -1 +1,30 @@ -sed '/INSERT_USAGE/r examples.sh' README.md.in | sed '/INSERT_USAGE/d' > README.md +#!/bin/bash + +source "$(dirname "$0")/env.sh" + +# examples +tmp=/tmp/readme.$$ +sed -n '/# START_EXAMPLES/,/# END_EXAMPLES/p' examples/examples.sh | sed '/# START_EXAMPLES/d; /# END_EXAMPLES/d' > $tmp +sed "/INSERT_EXAMPLES/r $tmp" README-in.md | sed '/INSERT_EXAMPLES/d' > README.md + +# usage +sed "/INSERT_USAGE/r docs/usage.sh" README.md | sed '/INSERT_USAGE/d' > out.md +mv out.md README.md + +# storage cli +storage.sh docs > $tmp +sed "/INSERT_STORAGE_DOCS/r $tmp" README.md | sed '/INSERT_STORAGE_DOCS/d' > out.md +mv out.md README.md + +# server cli +server.sh --help > $tmp +sed "/INSERT_SERVER_DOCS/r $tmp" README.md | sed '/INSERT_SERVER_DOCS/d' > out.md +mv out.md README.md + +# monitor cli +monitor.sh --help > $tmp +sed "/INSERT_MONITOR_DOCS/r $tmp" README.md | sed '/INSERT_MONITOR_DOCS/d' > out.md +mv out.md README.md + +rm $tmp + diff --git a/str-twincoding/clean.sh b/str-twincoding/clean.sh index 7cfce6d79..090ad4559 100755 --- a/str-twincoding/clean.sh +++ b/str-twincoding/clean.sh @@ -1,3 +1,3 @@ -rm -rf *.encoded recovered.dat recover_type1_node0 recovered_type1_0.dat file_1KB.dat file_1MB.dat +rm -rf *.encoded recovered.dat file_1KB.dat file_1MB.dat repository encoding/repository server/repository diff --git a/str-twincoding/config.py b/str-twincoding/config.py deleted file mode 100644 index 824240fae..000000000 --- a/str-twincoding/config.py +++ /dev/null @@ -1,92 +0,0 @@ -import commentjson as json -from pydantic import BaseModel - -config_str = """ -{ - "config_version": "1.0", - - # Console - "console": { - "public_url": "http://localhost:8080/", - "port": 8080, - }, - - "encoder": { - # num_cores: 4, - }, - - # Cluster - "cluster": { - "name": "testnet", - "type0": { - "encoding": "reed_solomon", - "k": 3, - "n": 5 - }, - "type1": { - "encoding": "reed_solomon", - "k": 3, - "n": 5 - }, - }, -} -""" - - -class NodeType(BaseModel): - encoding: str - k: int - n: int - transpose: bool = None - - -class NodeType0(NodeType): - transpose: bool = False - - -class NodeType1(NodeType): - transpose: bool = True - - -class Console(BaseModel): - public_url: str - port: int - - -class Cluster(BaseModel): - name: str - type0: NodeType0 - type1: NodeType1 - - -class Config(BaseModel): - config_version: str - console: Console - cluster: Cluster - - -class EncodedFileConfig(BaseModel): - name: str - type0: NodeType0 - type1: NodeType1 - file_length: int - # file_hash: str - - -def load_config(file_path): - # with open(file_path, 'r') as f: - # config_dict = json.load(f) - print("Loading internal config") - config_dict = json.loads(config_str) - return Config(**config_dict) - - -def load_file_config(path) -> EncodedFileConfig: - with open(path, 'rb') as f: - config_dict = json.load(f) - return EncodedFileConfig(**config_dict) - - -if __name__ == '__main__': - config = load_config('config.json') - print(f"config version: {config.config_version}") diff --git a/str-twincoding/docs/.gitignore b/str-twincoding/docs/.gitignore new file mode 100644 index 000000000..318ef5dc1 --- /dev/null +++ b/str-twincoding/docs/.gitignore @@ -0,0 +1 @@ +monitor.org.png diff --git a/str-twincoding/docs/monitor.png b/str-twincoding/docs/monitor.png new file mode 100644 index 000000000..56d618d87 Binary files /dev/null and b/str-twincoding/docs/monitor.png differ diff --git a/str-twincoding/docs/usage.sh b/str-twincoding/docs/usage.sh new file mode 100644 index 000000000..64e60933b --- /dev/null +++ b/str-twincoding/docs/usage.sh @@ -0,0 +1,32 @@ +# Generate some test files +test-content.sh + +# Import a file into the default local repository with default encoding +storage.sh import data/foo_file.dat + +# List the repository +storage.sh repo list + +# Start a test provider server cluster +test-cluster.sh start 5001 5002 5003 5004 5005 + +# Confirm that the test servers are running +test-cluster.sh list + +# "Discover" these providers, adding them to our known provider list +# This will normally be done via the directory service and performed at file push time. +test-discover.sh 5001 5002 5003 5004 5005 + +# Start the monitor application (in another window) +monitor.sh --update 1 + +# Push the file by name +storage.sh push foo_file.dat + +# TODO: +# Monitor file availability while: +# Observing resilient upload progress +# Killing servers and prompting efficient rebuilds + +# Shut downt the servers +test-cluster.sh stop diff --git a/str-twincoding/chunks.py b/str-twincoding/encoding/chunks.py similarity index 88% rename from str-twincoding/chunks.py rename to str-twincoding/encoding/chunks.py index f85bb7a91..8aae00f15 100644 --- a/str-twincoding/chunks.py +++ b/str-twincoding/encoding/chunks.py @@ -1,9 +1,13 @@ -import math +import io import os import time +from typing import Optional + +import math import numpy as np from tqdm import tqdm + # Note: We will parallelize these in a future update. Lots of opportunity here to read chunks # Note: in batches and perform encoding/decoding in parallel. @@ -93,3 +97,16 @@ def update_pbar(self, ci: int, num_files: int, pbar: tqdm, start: float): rate = ci * self.chunk_size * num_files / (time.time() - start) pbar.set_postfix({"Rate": f"{rate / (1024 * 1024):.4f}MB/s"}, refresh=True) pbar.update(1) + + +def open_output_file(output_path: str, overwrite: bool) -> Optional[io.BufferedWriter]: + if not overwrite and os.path.exists(output_path): + print(f"Output file already exists: {output_path}.") + return None + + # Make intervening directories if needeed + directory = os.path.dirname(output_path) + if directory: + os.makedirs(directory, exist_ok=True) + + return io.BufferedWriter(open(output_path, 'wb')) diff --git a/str-twincoding/file_decoder.py b/str-twincoding/encoding/file_decoder.py similarity index 79% rename from str-twincoding/file_decoder.py rename to str-twincoding/encoding/file_decoder.py index 2ec318723..120b8614d 100644 --- a/str-twincoding/file_decoder.py +++ b/str-twincoding/encoding/file_decoder.py @@ -1,6 +1,5 @@ import filecmp import os -import re import time import uuid from collections import OrderedDict @@ -8,10 +7,12 @@ import numpy as np from tqdm import tqdm -from chunks import ChunksReader -from config import NodeType, load_file_config -from twin_coding import rs_generator_matrix -from util import open_output_file, assert_rs +from storage.config import NodeType, EncodedFileConfig +from storage.util import assert_rs +from storage.repository import Repository + +from encoding.chunks import ChunksReader, open_output_file +from encoding.twin_coding import rs_generator_matrix # Decode a set of erasure-coded files supplied as a file map or from a storage-encoded directory. @@ -53,9 +54,9 @@ def __init__(self, # at least k files of the same type. @staticmethod def from_encoded_dir(path: str, output_path: str = None, overwrite: bool = False): - file_config = load_file_config(f'{path}/config.json') + file_config = EncodedFileConfig.load(os.path.join(path, 'config.json')) assert file_config.type0.k == file_config.type1.k, "Config node types must have the same k." - recover_from_files = FileDecoder.map_files(path, k=file_config.type0.k) + recover_from_files = FileDecoder.get_threshold_files(path, k=file_config.type0.k) if os.path.basename(list(recover_from_files)[0]).startswith("type0_"): node_type = file_config.type0 else: @@ -72,21 +73,13 @@ def from_encoded_dir(path: str, output_path: str = None, overwrite: bool = False # Map the files in a file store encoded directory. At least k files of the same type must be present # to succeed. Returns a map of the first k files of either type found. - @staticmethod - def map_files(files_dir: str, k: int) -> dict[str, int]: - type0_files, type1_files = {}, {} - for filename in os.listdir(files_dir): - match = re.match(r'type([01])_node(\d+).dat', filename) - if not match: - continue - type_no, index_no = int(match.group(1)), int(match.group(2)) - files = type0_files if type_no == 0 else type1_files - files[os.path.join(files_dir, filename)] = index_no - + @classmethod + def get_threshold_files(cls, files_dir: str, k: int) -> dict[str, int]: + type0_files, type1_files = Repository.map_files(files_dir) if len(type0_files) >= k: - return OrderedDict(sorted(type0_files.items(), key=lambda x: x[1])[:k]) + return OrderedDict(list(type0_files.items())[:k]) elif len(type1_files) >= k: - return OrderedDict(sorted(type1_files.items(), key=lambda x: x[1])[:k]) + return OrderedDict(list(type1_files.items())[:k]) else: raise ValueError( f"Insufficient files in {files_dir} to recover: {len(type0_files)} type 0 files, " @@ -131,13 +124,17 @@ def close(self): if __name__ == '__main__': - file = 'file_1KB.dat' - encoded = f'{file}.encoded' - recovered = 'recovered.dat' + repo = Repository('./repository') + filename = 'file_1KB.dat' + original_file = repo.tmp_file_path(filename) + encoded_file = repo.file_path(filename) + print(repo.status_str(filename)) + + recovered_file = repo.tmp_file_path(f'recovered_{filename}') decoder = FileDecoder.from_encoded_dir( - path=encoded, - output_path=recovered, + path=encoded_file, + output_path=recovered_file, overwrite=True ) decoder.decode() - print("Passed" if filecmp.cmp(file, recovered) else "Failed") + print("Passed" if filecmp.cmp(original_file, recovered_file) else "Failed") diff --git a/str-twincoding/file_encoder.py b/str-twincoding/encoding/file_encoder.py similarity index 81% rename from str-twincoding/file_encoder.py rename to str-twincoding/encoding/file_encoder.py index b6a5c681a..e9d508ee7 100644 --- a/str-twincoding/file_encoder.py +++ b/str-twincoding/encoding/file_encoder.py @@ -1,12 +1,13 @@ import os from contextlib import ExitStack import galois -from chunks import ChunkReader -from config import EncodedFileConfig, NodeType0, NodeType1 -from twin_coding import rs_generator_matrix, Code, twin_code +from encoding.chunks import ChunkReader +from encoding.twin_coding import rs_generator_matrix, Code, twin_code +from storage.config import EncodedFileConfig, NodeType0, NodeType1 +from storage.repository import Repository +from storage.util import assert_rs from tqdm import tqdm import time -from util import assert_rs # Erasure code a file into two sets of shards, one for each node type in the twin coding scheme. @@ -24,22 +25,23 @@ class FileEncoder(ChunkReader): def __init__(self, node_type0: NodeType0, node_type1: NodeType1, - path: str, + input_file: str, output_path: str = None, overwrite: bool = False): assert_rs(node_type0) assert_rs(node_type1) assert node_type0.k == node_type1.k, "The two node types must have the same k." + assert node_type0.n > node_type0.k and node_type1.n > node_type1.k, "The node type must have n > k." self.node_type0 = node_type0 self.node_type1 = node_type1 self.k = node_type0.k - self.path = path - self.output_dir = output_path or path + '.encoded' + self.path = input_file + self.output_dir = output_path or input_file + '.encoded' self.overwrite = overwrite chunk_size = self.k ** 2 - super().__init__(path=path, chunk_size=chunk_size) + super().__init__(path=input_file, chunk_size=chunk_size) # Initialize the output directory that will hold the erasure-encoded chunks. def init_output_dir(self) -> bool: @@ -109,11 +111,21 @@ def close(self): if __name__ == '__main__': - path = 'file_1KB.dat' + repo = Repository('./repository') + + # Random test file + filename = 'file_1KB.dat' + file = repo.tmp_file_path(filename) + # if the file doesn't exist create it + if not os.path.exists(file): + with open(file, "wb") as f: + f.write(os.urandom(1024)) + encoder = FileEncoder( node_type0=NodeType0(k=3, n=5, encoding='reed_solomon'), node_type1=NodeType1(k=3, n=5, encoding='reed_solomon'), - path=path, + input_file=file, + output_path=repo.file_path(filename, expected=False), overwrite=True ) encoder.encode() diff --git a/str-twincoding/node_recovery_client.py b/str-twincoding/encoding/node_recovery_client.py similarity index 65% rename from str-twincoding/node_recovery_client.py rename to str-twincoding/encoding/node_recovery_client.py index 8797042ba..07644df66 100644 --- a/str-twincoding/node_recovery_client.py +++ b/str-twincoding/encoding/node_recovery_client.py @@ -1,15 +1,18 @@ import filecmp import os +import re import time import uuid from collections import OrderedDict import galois import numpy as np from tqdm import tqdm -from chunks import ChunksReader -from config import NodeType -from twin_coding import rs_generator_matrix -from util import assert_rs, open_output_file + +from encoding.chunks import ChunksReader, open_output_file +from encoding.twin_coding import rs_generator_matrix +from storage.config import NodeType, NodeType1 +from storage.repository import Repository +from storage.util import assert_rs # Consume recovery files from k nodes of the opposite type to recover a lost node's data. @@ -39,12 +42,18 @@ def __init__(self, # Map recovery files in a directory. Exactly k recovery files should be present. @staticmethod - def map_files(files_dir: str, k: int) -> dict[str, int]: + def map_files(files_dir: str, + recover_node_type: int, + recover_node_index: int, + k: int) -> dict[str, int]: files = {} - + prefix = f"recover_type{recover_node_type}_node{recover_node_index}" for filename in os.listdir(files_dir): - if filename.startswith("recover_") and filename.endswith(".dat"): - index = int(filename.split("_")[1].split(".")[0]) + if filename.startswith(prefix) and filename.endswith(".dat"): + match = re.search(r"from(\d+)", filename) + index = int(match.group(1)) if match else None + if index is None: + continue files[os.path.join(files_dir, filename)] = index assert len(files) == k, "Exactly k recovery files must be present." @@ -76,16 +85,27 @@ def recover_node(self): if __name__ == '__main__': + file = 'file_1KB.dat' + repo = Repository('./repository') + # Use recovery files generated for type 1 node index 0 to recover the lost data shard. - recovery_files_dir = 'recover_type1_node0' - recovered = 'recovered_type1_node0.dat' + recovery_files_dir = repo.file_path(file) + recover_node_type = 1 + recover_node_index = 0 + recovered_shard = repo.tmp_file_path( + f'recovered_{file}_type{recover_node_type}_node{recover_node_index}.dat') + NodeRecoveryClient( - recovery_source_node_type=NodeType(k=3, n=5, encoding='reed_solomon'), - file_map=NodeRecoveryClient.map_files(files_dir=recovery_files_dir, k=3), - output_path=recovered, + recovery_source_node_type=NodeType1(k=3, n=5, encoding='reed_solomon'), + file_map=NodeRecoveryClient.map_files( + files_dir=recovery_files_dir, + recover_node_type=recover_node_type, + recover_node_index=recover_node_index, + k=3), + output_path=recovered_shard, overwrite=True ).recover_node() - compare_file = 'file_1KB.dat.encoded/type1_node0.dat' - print("Passed" if filecmp.cmp(compare_file, recovered) else "Failed") + original_shard = repo.shard_path(file, node_type=1, node_index=0) + print("Passed" if filecmp.cmp(original_shard, recovered_shard) else "Failed") ... diff --git a/str-twincoding/node_recovery_source.py b/str-twincoding/encoding/node_recovery_source.py similarity index 66% rename from str-twincoding/node_recovery_source.py rename to str-twincoding/encoding/node_recovery_source.py index 5fe407eb2..d0b83ddef 100644 --- a/str-twincoding/node_recovery_source.py +++ b/str-twincoding/encoding/node_recovery_source.py @@ -2,10 +2,10 @@ import galois from tqdm import tqdm -from chunks import ChunkReader -from config import NodeType, NodeType1 -from twin_coding import rs_generator_matrix -from util import open_output_file +from storage.config import NodeType, NodeType1 +from storage.repository import Repository +from encoding.chunks import ChunkReader, open_output_file +from encoding.twin_coding import rs_generator_matrix # Generate a node recovery file for a specified node of the opposite type. @@ -52,13 +52,32 @@ def generate(self): if __name__ == '__main__': - # Use a type 0 node source to generate recovery files for recovering node type 1 index 0. - for i in range(3): + file = 'file_1KB.dat' + repo = Repository('./repository') + + # The recovering node + recover_node_encoding = NodeType1(k=3, n=5, encoding='reed_solomon') + recover_node_index = 0 + + # Use three helper nodes of type 0 to generate recovery files for a node of type 1. + helper_node_type = 0 + for helper_node_index in range(3): + # The input path of the helper node's shard + helper_shard_path = repo.shard_path( + file, node_type=helper_node_type, node_index=helper_node_index) + # The output path of the recovery file + recovery_file_path = repo.recovery_file_path( + file, + recover_node_type=recover_node_encoding.type, + recover_node_index=recover_node_index, + helper_node_index=helper_node_index, + expected=False) + NodeRecoverySource( - recover_node_type=NodeType1(k=3, n=5, encoding='reed_solomon'), - recover_node_index=0, - data_path=f'file_1KB.dat.encoded/type0_node{i}.dat', - output_path=f'recover_type1_node0/recover_{i}.dat', + recover_node_type=recover_node_encoding, + recover_node_index=recover_node_index, + data_path=helper_shard_path, + output_path=recovery_file_path, overwrite=True ).generate() ... diff --git a/str-twincoding/tests.py b/str-twincoding/encoding/tests.py similarity index 77% rename from str-twincoding/tests.py rename to str-twincoding/encoding/tests.py index 37714241e..c6d371201 100644 --- a/str-twincoding/tests.py +++ b/str-twincoding/encoding/tests.py @@ -1,11 +1,8 @@ -import os import runpy # # See also `examples.sh` # -with open("file_1KB.dat", "wb") as f: - f.write(os.urandom(1024)) runpy.run_module('file_encoder', run_name='__main__') runpy.run_module('file_decoder', run_name='__main__') diff --git a/str-twincoding/twin_coding.py b/str-twincoding/encoding/twin_coding.py similarity index 100% rename from str-twincoding/twin_coding.py rename to str-twincoding/encoding/twin_coding.py diff --git a/str-twincoding/twin_coding_batched.py b/str-twincoding/encoding/twin_coding_batched.py similarity index 100% rename from str-twincoding/twin_coding_batched.py rename to str-twincoding/encoding/twin_coding_batched.py diff --git a/str-twincoding/env.sh b/str-twincoding/env.sh new file mode 100755 index 000000000..9f466baac --- /dev/null +++ b/str-twincoding/env.sh @@ -0,0 +1,6 @@ +#!/bin/bash +# Source this file into your shell +export STRHOME=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $STRHOME/venv/bin/activate +export PYTHONPATH="$STRHOME" +export PATH=$PATH:"$STRHOME" diff --git a/str-twincoding/examples.sh b/str-twincoding/examples.sh deleted file mode 100644 index 9e8f811f4..000000000 --- a/str-twincoding/examples.sh +++ /dev/null @@ -1,43 +0,0 @@ - -# Generate some random data -dd if=/dev/urandom of="file_1KB.dat" bs=1K count=1 - -# Encode a file, writing n files for each of the two node types to a ".encoded" directory. -./storage.sh encode \ - --path "file_1KB.dat" \ - --encoding0 reed_solomon --k0 3 --n0 5 \ - --encoding1 reed_solomon --k1 3 --n1 5 \ - --overwrite - -# Decode a file from an encoded storage directory, tolerant of missing files (erasures). -./storage.sh decode \ - --encoded "file_1KB.dat.encoded" \ - --recovered "recovered.dat" \ - --overwrite - -# Compare the original and decoded files. -cmp -s "file_1KB.dat" "recovered.dat" && echo "Passed" || echo "Failed" - - -# Generate shard recovery files for restoration of node type 1 index 0, using 3 (k) type 0 -# node sources (helper nodes), -for helper_node in 0 1 2 -do -./storage.sh generate_recovery_file \ - --recover_node_index 0 \ - --recover_encoding reed_solomon --k 3 --n 5 \ - --data_path "file_1KB.dat.encoded/type0_node${helper_node}.dat" \ - --output_path "recover_type1_node0/recover_${helper_node}.dat" \ - --overwrite -done - -# Recover the shard for node type 1 index 0 from the k (3) recovery files. -./storage.sh recover_node \ - --k 3 --n 5 --encoding reed_solomon \ - --files_dir "recover_type1_node0" \ - --output_path "recovered_type1_0.dat" \ - --overwrite - -# Compare the original and recovered data shards. -cmp -s "file_1KB.dat.encoded/type1_node0.dat" "recovered_type1_0.dat" && echo "Passed" || echo "Failed" - diff --git a/str-twincoding/examples/.gitignore b/str-twincoding/examples/.gitignore new file mode 100644 index 000000000..b2c8bf4f3 --- /dev/null +++ b/str-twincoding/examples/.gitignore @@ -0,0 +1,2 @@ +data +repository diff --git a/str-twincoding/examples/clean.sh b/str-twincoding/examples/clean.sh new file mode 100755 index 000000000..8b808bbc8 --- /dev/null +++ b/str-twincoding/examples/clean.sh @@ -0,0 +1 @@ +rm -rf data diff --git a/str-twincoding/examples/examples.sh b/str-twincoding/examples/examples.sh new file mode 100755 index 000000000..009c0093f --- /dev/null +++ b/str-twincoding/examples/examples.sh @@ -0,0 +1,79 @@ +#!/bin/bash +set -euo pipefail + +source "$(dirname "$(readlink -f "$0")")/../env.sh" + +# The repository dir +repository="$STRHOME/repository" +mkdir -p "$repository" + +# Generate some random data +file="file_1KB.dat" +[ -f "$file" ] || dd if=/dev/urandom of="$file" bs=1K count=1 + +# START_EXAMPLES +# Encode a file, writing n files for each of the two node types to a ".encoded" directory. +encoded_file_path=$(storage.sh repo --path "$repository" file_path --file "$file") +storage.sh encode \ + --path "$file" \ + --output_path "$encoded_file_path" \ + --encoding0 reed_solomon --k0 3 --n0 5 \ + --encoding1 reed_solomon --k1 3 --n1 5 \ + --overwrite + +# This import command is equivalent to the above encode, usign the default repository path and encoding type. +storage.sh import "$file" + +# List files in the repository. +storage.sh repo --path "$repository" list + +# Decode a file from an encoded storage directory, tolerant of missing files (erasures). +recovered_file=$(storage.sh repo --path "$repository" tmp_file_path --file "recovered_${file}") +storage.sh decode \ + --encoded "$encoded_file_path" \ + --recovered "$recovered_file" \ + --overwrite + +# Compare the original and decoded files. +cmp -s "$file" "$recovered_file" && echo "Passed" || echo "Failed" + + +# Prepare node recovery: Generate shard recovery source files for restoration of +# node type 1 index 0, using 3 (k) type 0 node sources (helper nodes), +recover_node_type=1 +recover_node_index=0 +for helper_node_index in 0 1 2 +do + helper_node_type=0 + helper_shard_file=$(storage.sh repo --path "$repository" shard_path \ + --file "$file" --node_type $helper_node_type --node_index $helper_node_index) + recovery_source_file=$(storage.sh repo --path "$repository" recovery_file_path \ + --file "$file" --recover_node_type $recover_node_type --recover_node_index $recover_node_index \ + --helper_node_index "$helper_node_index") + storage.sh generate_recovery_file \ + --recover_node_type $recover_node_type \ + --recover_node_index $recover_node_index \ + --recover_encoding reed_solomon --k 3 --n 5 \ + --data_path "$helper_shard_file" \ + --output_path "$recovery_source_file" \ + --overwrite +done + + +# Complete node recovery: Recover the shard for node type 1 index 0 from the k (3) recovery files. +recovered_shard_file=$(storage.sh repo --path "$repository" tmp_file_path \ + --file "recovered_${file}_type${recover_node_type}_node${recover_node_index}.dat") +storage.sh recover_node \ + --k 3 --n 5 --encoding reed_solomon \ + --recover_node_type $recover_node_type \ + --recover_node_index $recover_node_index \ + --files_dir "$encoded_file_path" \ + --output_path "$recovered_shard_file" \ + --overwrite + +# Compare the original and recovered data shards. +original_shard_file=$(storage.sh repo --path "$repository" shard_path \ + --file "$file" --node_type 1 --node_index 0) +cmp -s "$original_shard_file" "$recovered_shard_file" && echo "Passed" || echo "Failed" + +# END_EXAMPLES diff --git a/str-twincoding/examples/test-cluster.sh b/str-twincoding/examples/test-cluster.sh new file mode 100755 index 000000000..4f890e6b2 --- /dev/null +++ b/str-twincoding/examples/test-cluster.sh @@ -0,0 +1,118 @@ +#!/bin/bash + +source "$(dirname "$0")/../env.sh" + +# STRHOME should have been set by env.sh above +if [ -z "$STRHOME" ]; then + echo "STRHOME is not set." + exit 1 +fi +app="$STRHOME/server/server_cli.py" +data="$STRHOME/examples/data" +mkdir -p "$data" + +# Function to start Flask servers +start_servers() { + echo "Starting servers..." + for port in "$@" + do + repo="$data/$port/repo" + log="$data/$port/log" + mkdir -p "$repo" + python "$app" --port $port --repository "$repo" > "$log" 2>&1 & + echo "Started Flask server on port $port with repository $repo" + done +} + +# Function to stop a specific server +stop_server() { + echo "Stopping server on port $1..." + pid=$(lsof -i :$1 -t) + if [ -z "$pid" ]; then + echo "No server found on port $1" + else + kill -9 $pid + echo "Stopped server with PID $pid" + fi +} + +list_all() { + echo "All instances of $app" + + # Print header + printf "%-10s %-10s %-10s\n" "PID" "PORT" "TIME" + + # Find all PIDs for the given process name and extract relevant information + ps auxw | grep "$app" | grep -v grep | awk '{ + pid = $2; + time = $10; + command = $11; + for (i = 12; i <= NF; i++) command = command " " $i; + port = "N/A"; + + # Extract port if present in the command + if (match(command, /--port[= ]+[0-9]+/)) { + port = substr(command, RSTART+7, RLENGTH-7); + } + + printf "%-10s %-10s %-10s\n", pid, port, time + }' +} + +stop_all() { + echo "Killing all instances of $app..." + + # Find all PIDs for the given process name + pids=$(ps auxw | grep "$app" | grep -v grep | awk '{print $2}') + + # Check if any PIDs were found + if [ -z "$pids" ]; then + echo "No processes found with the name $process_name." + return + fi + + # Kill the processes + for pid in $pids + do + kill -9 $pid + echo "Stopped process $pid" + done +} + + +# Check command line arguments +if [ $# -eq 0 ] +then + echo "No arguments provided. Usage: ./script.sh start|stop|kill [ports...]" + exit 1 +fi + +# Main logic +case $1 in + start) + shift + start_servers "$@" + ;; + stop) + shift + if [ $# -eq 0 ]; then + stop_all + else + for port in "$@" + do + stop_server $port + done + fi + ;; + list) + list_all + ;; + stop-all) + stop_all + ;; + *) + echo "Invalid command. Usage: ./script.sh start|stop|stop-all|kill [ports...]" + exit 1 + ;; +esac + diff --git a/str-twincoding/examples/test-content.sh b/str-twincoding/examples/test-content.sh new file mode 100755 index 000000000..34bd88023 --- /dev/null +++ b/str-twincoding/examples/test-content.sh @@ -0,0 +1,19 @@ +#!/bin/bash +set -euo pipefail + +source "$(dirname "$0")/../env.sh" +data="$STRHOME/examples/data" +mkdir -p "$data" + +function add_file { + file=$1 + name=$(basename $file) + echo "Generating file $name in data" + [ -f "$file" ] || dd if=/dev/urandom of="$file" bs=1K count=1 status=none +} + +# Add a few test files to the examples data dir +add_file "$data/foo_file.dat" +add_file "$data/bar_file.dat" +add_file "$data/baz_file.dat" + diff --git a/str-twincoding/examples/test-discover.sh b/str-twincoding/examples/test-discover.sh new file mode 100755 index 000000000..feb2895d6 --- /dev/null +++ b/str-twincoding/examples/test-discover.sh @@ -0,0 +1,47 @@ +#!/bin/bash +# +# { +# "providers": [ +# { "name": "5001", "url": "http://localhost:5001" }, +# { "name": "5002", "url": "http://localhost:5002" }, +# { "name": "5003", "url": "http://localhost:5003" } +# ] +# } +# +source "$(dirname "$0")/../env.sh" +providers_file="$STRHOME/providers.jsonc" + +# if no args, print help +if [ $# -eq 0 ]; then + echo "Usage: $0 ..." + exit 1 +fi + +# validate that all args are numbers +for port in "$@" +do + if ! [[ "$port" =~ ^[0-9]+$ ]]; then + echo "Invalid port: $port" + exit 1 + fi +done + +# if the file exists, bail +if [ -f "$providers_file" ]; then + echo "File exists: $providers_file" + exit 1 +fi + +# Generate the file +echo '{' > "$providers_file" +echo ' "providers": [' >> "$providers_file" +for port in "$@" +do + echo ' { "name": "'"$port"'", "url": "http://localhost:'"$port"'" },' >> "$providers_file" +done +# jsonc is tolerant of trailing commas +#sed -i '$ s/,$//' "$providers_file" +echo ' ]' >> "$providers_file" +echo '}' >> "$providers_file" + +echo "Generated file: $providers_file" diff --git a/str-twincoding/monitor.sh b/str-twincoding/monitor.sh new file mode 100755 index 000000000..2b0c8c692 --- /dev/null +++ b/str-twincoding/monitor.sh @@ -0,0 +1,3 @@ +#!/bin/bash +source "$(dirname "$0")/env.sh" +python "$STRHOME/monitor/monitor_cli.py" "$@" diff --git a/str-twincoding/monitor/file_table.py b/str-twincoding/monitor/file_table.py new file mode 100644 index 000000000..144ae4479 --- /dev/null +++ b/str-twincoding/monitor/file_table.py @@ -0,0 +1,103 @@ +import time +from pydantic import BaseModel +from rich.box import * +from rich.console import Console +from rich.layout import Layout +from rich.live import Live +from rich.table import Table + +from server_table import ServerTable, ServerStatusView, example_servers + + +class FileStatusView(BaseModel): + name: str + availability: float = 0 + encoding: str + auth: float = 0 + payments: float = 0 + + +class FileTable: + def __init__(self, + file: FileStatusView, + servers: List[ServerStatusView] = None, + ): + self.file = file + self.servers = servers or [] + ... + + @staticmethod + def header_table(): + # table = Table(expand=True, show_lines=False, show_edge=False, box=SIMPLE_HEAVY) + table = Table(expand=True, show_lines=False, show_edge=False, box=MINIMAL_HEAVY_HEAD) + + table.add_column("File", style="magenta") + table.add_column("Availability", style="green") + table.add_column("Encoding", style="blue") + table.add_column("Auth", style="cyan") + table.add_column("Payments", style="cyan") + + return table + + def file_row(self, file: FileStatusView): + table = self.header_table() + # table.show_header = False + + # progress_meter = Status.create_progress_meter(cpu_usage) + table.add_row( + file.name, + f'{file.availability:.2f}', + file.encoding, + f'{file.auth:.2f}', + f'{file.payments:.2f}', + ) + + return table + + def get(self): + # Create the main table with a single column + main_table = Table(expand=True, show_header=False, box=ROUNDED, show_lines=False) + main_table.add_column("Main Column", justify="left") + + # file row + file_row = self.file_row(self.file) + main_table.add_row(file_row) + + # server row + main_table.add_row() + server_row = ServerTable(self.servers, show_edge=False, + hide_cols={'status', 'stake', 'payments'}).get() + main_table.add_row(server_row) + + return main_table + + def output(self): + Console(record=True, force_terminal=True).print(self.get()) + + def run(self): + console = Console(record=True, force_terminal=True) + layout = Layout() + layout.split(Layout(name="main")) + + def update(): layout['main'].update(self.get()) + + update() + with Live(layout, console=console, refresh_per_second=1) as live: + # while True: + for _ in range(3): + update() + # await asyncio.sleep(1) + time.sleep(1) + + +example_file = FileStatusView( + name="file1.log", + availability=0.0, + encoding="UTF-8", + auth=0.0, + payments=0.0, +) + +if __name__ == '__main__': + FileTable(example_file, example_servers).output() + # FileTable(example_files, example_servers).run() diff --git a/str-twincoding/monitor/monitor_cli.py b/str-twincoding/monitor/monitor_cli.py new file mode 100644 index 000000000..d83d9f5f0 --- /dev/null +++ b/str-twincoding/monitor/monitor_cli.py @@ -0,0 +1,288 @@ +import asyncio +import json +import os +import random +from asyncio import Task +from datetime import datetime +from typing import Dict, Optional + +from aiohttp import ClientSession, ClientTimeout +from icecream import ic +from rich.box import * +from rich.console import Console +from rich.layout import Layout +from rich.live import Live +from rich.table import Table + +from monitor.file_table import FileStatusView, FileTable +from monitor.monitor_config import ServerStatus, MonitorConfig +from server.server_config import Server, ServerFileStatus, ServerFile +from server_table import ServerStatusView, ServerTable +from storage.util import summarize_ranges + + +class Monitor: + def __init__(self, + providers_config: Optional[str] = None, + providers: Optional[List[Server]] = None, + polling_period: int = 0, + timeout: int = 30, + simulate_latency: int = 0, + debug: bool = False + ): + + # config or servers + assert providers_config or providers + assert not (providers_config and providers) + self.providers_config_last_update: Optional[datetime] = None + self.providers_config = providers_config + self.servers = providers + + self.polling_period = polling_period + self.timeout = timeout + self.server_status: Dict[Server, ServerStatus] = {} + self.file_status: Dict[Server, List[ServerFileStatus]] = {} + self.simulate_latency = simulate_latency + self.task_map: Dict[Server, Task] = {} + self.debug = debug + + async def fetch(self, session: ClientSession, server: Server, after_seconds: int = 0): + url = f'{server.url}/list' + self.log(f"\nFetching {(server.name or '') + server.url}") + + if after_seconds > 0: + await asyncio.sleep(after_seconds) + + if self.simulate_latency > 0: + await asyncio.sleep(random.random() * self.simulate_latency) + + def clear_status(): + self.server_status[server] = ServerStatus.UNKNOWN + self.file_status.pop(server, None) + + try: + async with session.post(url, data={'auth_key': server.auth_token}) as response: + # server status + if response.status != 200: + clear_status() + self.log("error: ", response.status, response.reason) + return + self.server_status[server] = ServerStatus.OK + self.log("response OK") + + # file status + text = await response.text() + # self.log("text: ", text) + from_json = json.loads(text) + status = [ServerFileStatus(**file) for file in from_json] + self.log("status: ", status) + self.file_status[server] = status + except Exception as e: + self.log(f"Exception: {e}") + clear_status() + + # Update the list of servers from the server config if required + def update_server_config(self): + if not self.providers_config: + return self.servers + config_current_mod = datetime.fromtimestamp(os.path.getmtime(self.providers_config)) + if not self.servers or self.providers_config_last_update is None or config_current_mod > self.providers_config_last_update: + config = MonitorConfig.load(self.providers_config) + self.servers = config.providers + self.providers_config_last_update = config_current_mod + + # ensure that all servers are being polled + def update_task_map(self, session: ClientSession): + for server in self.servers: + # TODO: We don't currently clear tasks for servers that are removed from the config + if server not in self.task_map: + task: Task = asyncio.create_task(self.fetch(session, server)) + self.task_map[server] = task + + # Reschedule completed tasks + def schedule_tasks(self, session: ClientSession, delay: int = 0): + for server, task in self.task_map.items(): + if task.done(): + self.task_map[server] = asyncio.create_task( + self.fetch(session, server, after_seconds=delay)) + + async def poll_servers_loop(self): + async with ClientSession(timeout=ClientTimeout(total=self.timeout)) as session: + if self.polling_period > 0: + while True: + self.update_server_config() + self.update_task_map(session) + period = self.polling_period if self.server_status else 0 + self.schedule_tasks(session, period) + await asyncio.sleep(1) + else: + self.update_server_config() + self.update_task_map(session) + self.schedule_tasks(session) + await asyncio.gather(*self.task_map.values()) + + # Generate the table of known providers + def get_provider_table(self) -> Table: + servers = [ + ServerStatusView(name=server.url, status=self.server_status[server]) + for server in self.servers if server in self.server_status + ] + return ServerTable(servers=servers, hide_cols={'shards', 'last_validated'}).get() + + # Generate a table for each file that we found + def get_file_tables(self) -> Table: + self.log("server status:", self.server_status) + self.log("file status:", self.file_status) + + # Map of distinct files across all servers results + distinct_files = {file_status.file: file_status + for server in self.file_status for file_status in self.file_status[server]} + + def find_servers_with_file(file: ServerFile) -> List[Server]: + return [server for server in self.file_status if + file in [status.file for status in self.file_status[server]]] + + def find_file_status_in_server_list(file: ServerFile, server: Server) -> Optional[ServerFileStatus]: + try: + return [status for status in self.file_status[server] if status.file == file][0] + except IndexError: + return None + + # Invert the server data to a per-file map: {file: {server: file_status}} + file_status_map = {file: + { + server: find_file_status_in_server_list(file, server) + for server in find_servers_with_file(file) + } + for file in distinct_files + } + self.log("file status map:", file_status_map) + + file_tables = [self.create_file_table(file, file_status_map[file]) for file in distinct_files] + + # Combine the file tables into a single table + combined_file_table = Table(expand=True, show_header=False, show_lines=False, show_edge=False, pad_edge=False) + combined_file_table.add_column("main", justify="left") + for file_table in file_tables: + combined_file_table.add_row(file_table) + return combined_file_table + + # Create a file table for a single file + def create_file_table(self, file: ServerFile, server_map: Dict[Server, ServerFileStatus]): + + # calculate availability + distinct_shards0 = list({shard for server in server_map for shard in server_map[server].shards0}) + distinct_shards1 = list({shard for server in server_map for shard in server_map[server].shards1}) + count0 = len(distinct_shards0) + availability0 = count0 / file.k0 if count0 >= file.k0 else 0 + count1 = len(distinct_shards1) + availability1 = count1 / file.k1 if count1 >= file.k1 else 0 + availability = availability0 + availability1 + + server_views = [] + for server in server_map: + status: ServerFileStatus = server_map[server] + shards0 = f"type0: {summarize_ranges(status.shards0)}" + shards1 = f"type1: {summarize_ranges(status.shards1)}" + if status.shards0 and status.shards1: + shards = f"{shards0} | {shards1}" + else: + shards = shards0 if status.shards0 else shards1 + + view = ServerStatusView( + name=server.url, + status=self.server_status[server], + shards=shards, + ) + server_views.append(view) + + file_view = FileStatusView(name=file.name, + availability=availability, + encoding=file.encoding_str(), + auth=0.0, payments=0.0) + return FileTable(file=file_view, servers=server_views).get() + + async def draw_screen_loop(self): + if self.debug: + return + + console = Console(record=True, force_terminal=True) + + def create_layout(): + _layout = Layout() + height = 3 + len(self.servers) * 2 # header + server rows with lines + _layout.split( + Layout(name="servers", size=height), + Layout(name="files") + ) + return _layout + + def update(): + layout['servers'].update(self.get_provider_table()) + layout['files'].update(self.get_file_tables()) + + if self.live: + layout = create_layout() + layout_server_len = len(self.servers) + update() + with Live(layout, console=console) as live: + while True: + if len(self.servers) != layout_server_len: + layout = create_layout() + live.update(layout) + layout_server_len = len(self.servers) + update() + await asyncio.sleep(1 if self.server_status else 0.1) + else: + console.print(self.get_provider_table()) + console.print(self.get_file_tables()) + + async def main(self): + if self.live: + await asyncio.gather( + asyncio.create_task(self.poll_servers_loop()), + asyncio.create_task(self.draw_screen_loop()) + ) + else: + await self.poll_servers_loop() + await self.draw_screen_loop() + + def run(self): + asyncio.run(self.main()) + + @property + def live(self): + return self.polling_period > 0 + + def log(self, *args): + if self.debug: + print(*args) + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser(description='Process command line arguments.') + parser.add_argument('--providers', type=str, help='Providers file path') + parser.add_argument('--debug', action='store_true', help='Show debug') + parser.add_argument('--update', type=int, help='Update view with polling period seconds') + args = parser.parse_args() + + providers = args.providers + if not args.providers: + STRHOME = os.environ.get('STRHOME') or '.' + providers = os.path.join(STRHOME, 'providers.jsonc') + print(f"Using default providers file: {providers}") + + monitor = Monitor( + providers_config=providers, + polling_period=args.update or 0, + debug=args.debug or False, + simulate_latency=0, timeout=3, + # servers=[ + # Server(name="1", url='http://localhost:8080'), + # Server(name="2", url='http://localhost:8080'), + # Server(name="3", url='http://localhost:8080'), + # ] + ) + monitor.run() diff --git a/str-twincoding/monitor/monitor_config.py b/str-twincoding/monitor/monitor_config.py new file mode 100644 index 000000000..7472e2171 --- /dev/null +++ b/str-twincoding/monitor/monitor_config.py @@ -0,0 +1,15 @@ +from enum import Enum +from typing import List + +from server.server_config import Server +from storage.config import ModelBase + + +class ServerStatus(Enum): + OK = "OK" + UNKNOWN = "UNKNOWN" + NA = "-" + + +class MonitorConfig(ModelBase): + providers: List[Server] diff --git a/str-twincoding/monitor/server_table.py b/str-twincoding/monitor/server_table.py new file mode 100644 index 000000000..dc06cef84 --- /dev/null +++ b/str-twincoding/monitor/server_table.py @@ -0,0 +1,115 @@ +from datetime import datetime +from enum import Enum +from typing import List, Dict, Set + +from pydantic import BaseModel +from rich.box import SIMPLE, MINIMAL_HEAVY_HEAD, ROUNDED +from rich.console import Console +from rich.table import Table + +from monitor.monitor_config import ServerStatus + + +class ServerStatusView(BaseModel): + name: str + status: ServerStatus = ServerStatus.UNKNOWN + stake: float = 0 + auth: float = 0 + payments: float = 0 + shards: str = None + validated: datetime = None + + +class ServerTable: + def __init__(self, + servers: List[ServerStatusView] = None, + show_edge: bool = True, + hide_cols: Set[str] = None + ): + self.servers = servers or [] + self.show_edge = show_edge + self.hide_cols = hide_cols or {} + ... + + def get(self): + # table = Table(expand=True, show_lines=False, title="Known Providers", title_justify="left") + table = Table( + expand=True, + # title="Known Providers" if self.standalone else None, + show_edge=self.show_edge, + show_lines=True, + box=ROUNDED, + ) + + table.add_column("Provider", style="magenta") + if "status" not in self.hide_cols: + table.add_column("Status", style="green") + if "stake" not in self.hide_cols: + table.add_column("Stake", style="red") + if "shards" not in self.hide_cols: + table.add_column("Shards", style="blue") + if "auth" not in self.hide_cols: + table.add_column("Auth", style="cyan") + if "payments" not in self.hide_cols: + table.add_column("Payments", style="cyan") + if "last_validated" not in self.hide_cols: + table.add_column("Last Validated", style="blue") + + # Add rows - you can replace this with actual server status data + for server in self.servers: + # progress_meter = Status.create_progress_meter(cpu_usage) + # table.add_row( + # server.name, + # server.status.name, + # f'{server.stake:.2f}', + # f'{server.shards if server.shards else "-"}', + # f'{server.auth:.2f}', + # f'{server.payments:.2f}', + # f'{server.validated.strftime("%Y-%m-%d %H:%M:%S") if server.validated else "-"}' + # ) + + row_data = [ server.name ] + if "status" not in self.hide_cols: + row_data.append(server.status.name) + if "stake" not in self.hide_cols: + row_data.append(f'{server.stake:.2f}') + if "shards" not in self.hide_cols: + row_data.append(f'{server.shards if server.shards else "-"}') + if "auth" not in self.hide_cols: + row_data.append(f'{server.auth:.2f}') + if "payments" not in self.hide_cols: + row_data.append(f'{server.payments:.2f}') + if "last_validated" not in self.hide_cols: + row_data.append(f'{server.validated.strftime("%Y-%m-%d %H:%M:%S") if server.validated else "-"}') + + table.add_row(*row_data) + + return table + + def output(self): + Console(record=True, force_terminal=True).print(self.get()) + + +example_servers = [ + ServerStatusView( + name="server1.example.com", + status=ServerStatus.OK, + stake=0.0, auth=0.0, payments=0.0, hosting=0, + validated=datetime.now() + ), + ServerStatusView( + name="server2.example.com", + status=ServerStatus.OK, + stake=0.0, auth=0.0, payments=0.0, hosting=0, + validated=datetime.now() + ), + ServerStatusView( + name="server3.example.com", + status=ServerStatus.OK, + stake=0.0, auth=0.0, payments=0.0, hosting=0, + validated=datetime.now() + ), +] + +if __name__ == '__main__': + ServerTable(example_servers).output() diff --git a/str-twincoding/requirements.txt b/str-twincoding/requirements.txt index d0e4f9ba7..580dfcf43 100644 --- a/str-twincoding/requirements.txt +++ b/str-twincoding/requirements.txt @@ -3,4 +3,7 @@ commentjson~=0.9.0 pydantic~=2.4.2 numpy==1.25.0 # galois requires 1.25.0? galois~=0.3.6 -tqdm~=4.66.1 \ No newline at end of file +tqdm~=4.66.1 +rich==13.6.0 +aiohttp==3.8.6 +icecream==2.1.3 \ No newline at end of file diff --git a/str-twincoding/server.conf b/str-twincoding/server.conf deleted file mode 100644 index 6caa6d3c4..000000000 --- a/str-twincoding/server.conf +++ /dev/null @@ -1,19 +0,0 @@ -{ - "config_version": "1.0", - "console_url": "http://localhost:8080/", - - # Cluster definition - "cluster": { - "name": "testnet", - "type0": { - "encoding": "reed_solomon", - "k": 3, - "n": 5, - }, - "type1": { - "encoding": "reed_solomon", - "k": 3, - "n": 5, - }, - }, -} diff --git a/str-twincoding/server.py b/str-twincoding/server.py deleted file mode 100644 index 4437a36cc..000000000 --- a/str-twincoding/server.py +++ /dev/null @@ -1,16 +0,0 @@ -from flask import Flask -from config import load_config - -app = Flask(__name__) - - -@app.route('/') -def hello_world(): - return 'Hello, World!' - - -if __name__ == '__main__': - config = load_config('config.json') - print(f"config version: {config.config_version}") - app.run(port=config.console.port) - diff --git a/str-twincoding/server.sh b/str-twincoding/server.sh new file mode 100755 index 000000000..70d2bf822 --- /dev/null +++ b/str-twincoding/server.sh @@ -0,0 +1,10 @@ +#!/bin/bash +source "$(dirname "$0")/env.sh" + +# if "--repository" is not specified, use the default repository +if [[ "$*" != *"--repository"* ]]; then + echo "Using default repository: $STRHOME/repository" + set -- "$@" --repository "$STRHOME/repository" +fi + +python "$STRHOME/server/server_cli.py" "$@" diff --git a/str-twincoding/server/cluster.jsonc b/str-twincoding/server/cluster.jsonc new file mode 100644 index 000000000..8b9a825e8 --- /dev/null +++ b/str-twincoding/server/cluster.jsonc @@ -0,0 +1,13 @@ +{ + "config_version": "1.0", + "name": "testnet", + "servers": [ + { "url": "http://localhost:8101" }, + { "url": "http://localhost:8102" }, + { "url": "http://localhost:8103" }, + { "url": "http://localhost:8104" }, + { "url": "http://localhost:8105" }, + { "url": "http://localhost:8106" }, + { "url": "http://localhost:8107" }, + ], +} diff --git a/str-twincoding/server/server.jsonc b/str-twincoding/server/server.jsonc new file mode 100644 index 000000000..85af43196 --- /dev/null +++ b/str-twincoding/server/server.jsonc @@ -0,0 +1,4 @@ +{ + "config_version": "1.0", + +} diff --git a/str-twincoding/server/server_api.py b/str-twincoding/server/server_api.py new file mode 100644 index 000000000..723078b74 --- /dev/null +++ b/str-twincoding/server/server_api.py @@ -0,0 +1,66 @@ +# In your views.py or a similar file +import json + +from flask import Blueprint, jsonify, request +from server_args import app +from server_config import ServerFileStatus, ServerFile + +bp = Blueprint('my_blueprint', __name__, url_prefix='/') + + +# Placeholder for authorization logic and payment system. +def is_authorized(request): + return request.headers.get('Authorization') == app.auth_key or (not app.auth_key) + + +@bp.route('/') +def hello_world(): + return 'Storage Server' + + +@bp.route('/health', methods=['POST', 'GET']) +def health_check(): + if not is_authorized(request): + return jsonify({"error": "Unauthorized"}), 401 + return jsonify({"status": "OK"}) + + +@bp.route('/list', methods=['POST', 'GET']) +def list_files(): + if not is_authorized(request): + return jsonify({"error": "Unauthorized"}), 401 + try: + print([map_file(file).model_dump() for file in app.repository.list()]) + return jsonify([map_file(file).model_dump() for file in app.repository.list()]) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +def map_file(filename): + type0_files, type1_files = app.repository.map(filename) + t0 = list(type0_files.values()) + t1 = list(type1_files.values()) + config = app.repository.file_config(filename) + return ServerFileStatus( + file=ServerFile(name=filename, + encoding0=config.type0.encoding, + k0=config.type0.k, + n0=config.type0.n, + encoding1=config.type1.encoding, + k1=config.type1.k, + n1=config.type1.n), + shards0=t0, + shards1=t1, + ) + + +if __name__ == '__main__': + file = app.repository.list()[0] + type0_files, type1_files = app.repository.map(file) + config = app.repository.file_config(file) + to_json = json.dumps([map_file(file).model_dump() for file in app.repository.list()]) + from_json = json.loads(to_json) + for file in from_json: + fs = ServerFileStatus(**file) + print(fs.name) + ... diff --git a/str-twincoding/server/server_args.py b/str-twincoding/server/server_args.py new file mode 100644 index 000000000..a773f35d0 --- /dev/null +++ b/str-twincoding/server/server_args.py @@ -0,0 +1,38 @@ +from server.server_config import ServerConfig +import argparse + +from storage.repository import Repository +from storage.util import dump_docs_md + + +class App: + def __init__(self): + # Define the command line argument parser + parser = argparse.ArgumentParser(description="Flask server with argument parsing") + parser.add_argument("--config", type=str, help="server config file") + parser.add_argument("--interface", type=str, help="Interface the server listens on") + parser.add_argument("--port", type=int, help="Port the server listens on") + parser.add_argument("--repository_dir", type=str, help="Directory to store repository files") + parser.add_argument("--auth_key", type=str, help="Authentication key to validate requests") + parser.add_argument("--show_console", action='store_true', help="Flag to show console logs") + + args = parser.parse_args() + + # Config file + config = ServerConfig.load(args.config) if args.config else ServerConfig() + print(f"config version: {config.config_version}") + + # Merge params + self.interface = args.interface or config.interface + self.port = args.port or config.port or 8080 + + # Repository + self.repository = Repository(args.repository_dir) if args.repository_dir else Repository.default() + + self.auth_key = args.auth_key or config.auth_key + if not self.auth_key: + print("WARNING: No client auth key db provided. Requests will not be validated.") + self.show_console = args.show_console or False + + +app: App = App() diff --git a/str-twincoding/server/server_cli.py b/str-twincoding/server/server_cli.py new file mode 100644 index 000000000..8a4872bc0 --- /dev/null +++ b/str-twincoding/server/server_cli.py @@ -0,0 +1,8 @@ +from flask import Flask +from server_api import bp +from server_args import app + +if __name__ == '__main__': + flask = Flask(__name__) + flask.register_blueprint(bp) + flask.run(host=app.interface, port=app.port, debug=app.show_console) diff --git a/str-twincoding/server/server_config.py b/str-twincoding/server/server_config.py new file mode 100644 index 000000000..b88ef31df --- /dev/null +++ b/str-twincoding/server/server_config.py @@ -0,0 +1,65 @@ +from typing import Optional, List +from storage.config import ModelBase + + +class Server(ModelBase): + name: str = None # for testing + url: str + # An auth token for the server-client pair allows listing files. + # We could also support stateless discovery by hash. + auth_token: Optional[str] = None + + class Config: + frozen = True # hashable + ... + + +class Cluster(ModelBase): + name: str + servers: List[Server] + + +class ServerConfig(ModelBase): + config_version: Optional[str] = None + interface: Optional[str] = None + port: Optional[int] = 8080 + auth_key: Optional[str] = None + repository_dir: Optional[str] = None + cluster: Optional[Cluster] = None + + +class ServerFile(ModelBase): + name: str + encoding0: str + k0: int + n0: int + encoding1: str + k1: int + n1: int + + def encoding_str(self): + encoding0 = f'{self.encoding0}:{self.k0}/{self.n0}' + encoding1 = f'{self.encoding1}:{self.k1}/{self.n1}' + return encoding0 if encoding0 == encoding1 else f"{encoding0}|{encoding1}" + + class Config: + frozen = True # hashable + + +class ServerFileStatus(ModelBase): + file: ServerFile + shards0: List[int] + shards1: List[int] + + class Config: + frozen = True # hashable + + +if __name__ == '__main__': + # config = Cluster.load('cluster.jsonc') + # print(config) + # print(f"config version: {config.config_version}") + + file = ServerFile(name='test', encoding0='erasure', k0=2, n0=3, encoding1='erasure', k1=2, n1=3) + file2 = ServerFile(name='test', encoding0='erasure', k0=2, n0=3, encoding1='erasure', k1=2, n1=3) + print(hash(file) == hash(file2)) diff --git a/str-twincoding/storage.py b/str-twincoding/storage.py deleted file mode 100644 index 7e8c53d64..000000000 --- a/str-twincoding/storage.py +++ /dev/null @@ -1,97 +0,0 @@ -import argparse -from config import NodeType, NodeType0, NodeType1 -from file_decoder import FileDecoder -from file_encoder import FileEncoder -from node_recovery_client import NodeRecoveryClient -from node_recovery_source import NodeRecoverySource - - -# -# Command line interface for node encoding and recovery. -# - -def encode_file(args): - FileEncoder( - node_type0=NodeType0(k=args.k0, n=args.n0, encoding=args.encoding0), - node_type1=NodeType1(k=args.k1, n=args.n1, encoding=args.encoding1), - path=args.path, - overwrite=args.overwrite - ).encode() - - -def decode_file(args): - FileDecoder.from_encoded_dir( - path=args.encoded, - output_path=args.recovered, - overwrite=args.overwrite - ).decode() - - -def generate_recovery_file(args): - NodeRecoverySource( - recover_node_type=NodeType(k=args.k, n=args.n, encoding=args.recover_encoding), - recover_node_index=args.recover_node_index, - data_path=args.data_path, - output_path=args.output_path, - overwrite=args.overwrite - ).generate() - - -def recover_node(args): - NodeRecoveryClient( - recovery_source_node_type=NodeType(k=args.k, n=args.n, encoding=args.encoding), - file_map=NodeRecoveryClient.map_files(files_dir=args.files_dir, k=args.k), - output_path=args.output_path, - overwrite=args.overwrite - ).recover_node() - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="File encoding and decoding utility") - subparsers = parser.add_subparsers(metavar='COMMAND', help="Sub-commands available.") - - # Encoding - parser_encode = subparsers.add_parser('encode', help="Encode a file.") - parser_encode.add_argument('--path', required=True, help="Path to the file to encode.") - parser_encode.add_argument('--k0', type=int, required=True, help="k value for node type 0.") - parser_encode.add_argument('--n0', type=int, required=True, help="n value for node type 0.") - parser_encode.add_argument('--k1', type=int, required=True, help="k value for node type 1.") - parser_encode.add_argument('--n1', type=int, required=True, help="n value for node type 1.") - parser_encode.add_argument('--encoding0', default='reed_solomon', help="Encoding for node type 0.") - parser_encode.add_argument('--encoding1', default='reed_solomon', help="Encoding for node type 1.") - parser_encode.add_argument('--overwrite', action='store_true', help="Overwrite existing files.") - parser_encode.set_defaults(func=encode_file) - - # Decoding - parser_decode = subparsers.add_parser('decode', help="Decode an encoded file.") - parser_decode.add_argument('--encoded', required=True, help="Path to the encoded file.") - parser_decode.add_argument('--recovered', required=True, help="Path to the recovered file.") - parser_decode.add_argument('--overwrite', action='store_true', help="Overwrite existing files.") - parser_decode.set_defaults(func=decode_file) - - # Recovery Generation - parser_gen_rec = subparsers.add_parser('generate_recovery_file', help="Generate recovery files.") - parser_gen_rec.add_argument('--recover_node_index', type=int, required=True, help="Index of the recovering node.") - parser_gen_rec.add_argument('--recover_encoding', default='reed_solomon', help="Encoding for the recovering node.") - parser_gen_rec.add_argument('--k', type=int, required=True, help="k value for the recovering node.") - parser_gen_rec.add_argument('--n', type=int, required=True, help="n value for the recovering node.") - parser_gen_rec.add_argument('--data_path', required=True, help="Path to the source node data.") - parser_gen_rec.add_argument('--output_path', required=True, help="Path to the output recovery file.") - parser_gen_rec.add_argument('--overwrite', action='store_true', help="Overwrite existing files.") - parser_gen_rec.set_defaults(func=generate_recovery_file) - - # Node Recovery - parser_rec_node = subparsers.add_parser('recover_node', help="Recover a node from recovery files.") - parser_rec_node.add_argument('--k', type=int, required=True, help="k value for node type.") - parser_rec_node.add_argument('--n', type=int, required=True, help="n value for node type.") - parser_rec_node.add_argument('--encoding', default='reed_solomon', help="Encoding for node type.") - parser_rec_node.add_argument('--files_dir', required=True, help="Path to the recovery files.") - parser_rec_node.add_argument('--output_path', required=True, help="Path to the recovered file.") - parser_rec_node.add_argument('--overwrite', action='store_true', help="Overwrite existing files.") - parser_rec_node.set_defaults(func=recover_node) - - args = parser.parse_args() - if hasattr(args, 'func'): - args.func(args) - else: - parser.print_help() diff --git a/str-twincoding/storage.sh b/str-twincoding/storage.sh index 5c6e5a5f2..234934ef4 100755 --- a/str-twincoding/storage.sh +++ b/str-twincoding/storage.sh @@ -1,3 +1,3 @@ #!/bin/bash -source "$(dirname "$0")/venv/bin/activate" -python "$(dirname "$0")/storage.py" "$@" +source "$(dirname "$0")/env.sh" +python "$STRHOME/storage/storage_cli.py" "$@" diff --git a/str-twincoding/storage/config.jsonc b/str-twincoding/storage/config.jsonc new file mode 100644 index 000000000..64efe68e1 --- /dev/null +++ b/str-twincoding/storage/config.jsonc @@ -0,0 +1,8 @@ +{ + "config_version": "1.0", + + // Cluster definition + "cluster": { + "name": "testnet", + }, +} diff --git a/str-twincoding/storage/config.py b/str-twincoding/storage/config.py new file mode 100644 index 000000000..5e16ef913 --- /dev/null +++ b/str-twincoding/storage/config.py @@ -0,0 +1,57 @@ +import commentjson as json +from pydantic import BaseModel + + +class ModelBase(BaseModel): + @classmethod + def load(cls, file_path): + with open(file_path, 'r') as f: + return cls(**json.load(f)) + + +# +# File config +# + +class NodeType(ModelBase): + type: int + encoding: str + k: int + n: int + + @property + def transpose(self): + return self.type == 1 + + +class NodeType0(NodeType): + # transpose: bool = False + type: int = 0 + + +class NodeType1(NodeType): + # transpose: bool = True + type: int = 1 + + +class EncodedFileConfig(ModelBase): + name: str + type0: NodeType0 + type1: NodeType1 + file_length: int + # file_hash: str + + +if __name__ == '__main__': + # config = load_config('config.jsonc') + # print(config) + # print(f"config version: {config.config_version}") + + node = NodeType0(encoding='reed_solomon', k=3, n=5) + print(node, node.transpose) + node = NodeType(type=0, encoding='reed_solomon', k=3, n=5) + print(node, node.transpose) + node = NodeType1(encoding='reed_solomon', k=3, n=5) + print(node, node.transpose) + node = NodeType(type=1, encoding='reed_solomon', k=3, n=5) + print(node, node.transpose) diff --git a/str-twincoding/storage/repository.py b/str-twincoding/storage/repository.py new file mode 100644 index 000000000..0ddb84d91 --- /dev/null +++ b/str-twincoding/storage/repository.py @@ -0,0 +1,105 @@ +import os +import re +from collections import OrderedDict +from typing import Any + +from storage.config import EncodedFileConfig +from storage.util import * + + +class Repository: + @staticmethod + def default() -> 'Repository': + STRHOME = os.environ.get('STRHOME') or '.' + path = os.path.join(STRHOME, 'repository') + print(f"Using default repository path: {path}") + return Repository(path=path) + + def __init__(self, path: str, init: bool = True): + self._path = path + if init and not os.path.exists(path): + os.makedirs(os.path.join(self._path, 'tmp')) + ... + + # Get the path to an encoded file. + # 'file' must be a simple filename (not a path) + def file_path(self, file: str, expected: bool = True) -> str: + assert os.path.basename(file) == file, "file must be a filename only with no path." + path = os.path.join(self._path, f'{file}.encoded') + if expected: + assert os.path.exists(path), f"Encoded file not found: {path}" + return path + + def file_config(self, file: str) -> EncodedFileConfig: + path = self.file_path(file) + return EncodedFileConfig.load(os.path.join(path, 'config.json')) + + # Get the path to a shard of the encoded file by node type and index + def shard_path(self, file: str, node_type: int, node_index: int, expected: bool = True) -> str: + assert_node_type(node_type) + path = f'{self.file_path(file)}/type{node_type}_node{node_index}.dat' + if expected: + assert os.path.exists(path), f"Shard not found: {path}" + return path + + def recovery_file_path(self, file: str, + recover_node_type: int, recover_node_index: int, + helper_node_index: int, expected: bool = True) -> str: + assert_node_type(recover_node_type) + path = (f'{self.file_path(file)}/recover' + f'_type{recover_node_type}' + f'_node{recover_node_index}' + f'_from{helper_node_index}.dat') + if expected: + assert os.path.exists(path), f"Recovery file not found: {path}" + return path + + def tmp_file_path(self, file: str): + return os.path.join(self._path, 'tmp', f'{file}') + + # Map the files in an encoded file directory. + # This returns two ordered dicts, one for type 0 files and one for type 1 files, filename -> node index + def map(self, file: str) -> (dict[str, int], dict[str, int]): + return Repository.map_files(self.file_path(file)) + + # Produce s compact string summarizing the availability of a file. + def status_str(self, file: str) -> str: + type0_files, type1_files = self.map(file) + # config = load_file_config(f'{self.path(file)}/config.json') + return (f"{file}: Availability: Type 0 shards: {summarize_ranges(list(type0_files.values()))}, " + f"Type 1 shards: {summarize_ranges(list(type1_files.values()))}") + + # Generate a list of the files in the repository. + def list(self) -> List[str]: + encoded = [f for f in os.listdir(self._path) if f.endswith('.encoded')] + return [f[:-8] for f in encoded] + + @staticmethod + # Return maps of type 0 and type 1 files, file path -> node index + def map_files(files_dir: str) -> (dict[str, int], dict[str, int]): + type0_files: dict[Any, Any] + type0_files, type1_files = {}, {} + for filename in os.listdir(files_dir): + match = re.match(r'type([01])_node(\d+).dat', filename) + if not match: + continue + type_no, index_no = int(match.group(1)), int(match.group(2)) + files = type0_files if type_no == 0 else type1_files + files[os.path.join(files_dir, filename)] = index_no + + return (OrderedDict(sorted(type0_files.items(), key=lambda x: x[1])), + OrderedDict(sorted(type1_files.items(), key=lambda x: x[1]))) + + +if __name__ == '__main__': + repo = Repository('./repository') + + files = repo.list() + print("Files in repository:") + for file in files: + print(' ', repo.status_str(file)) + + # print() + # file = 'file_1KB.dat' + # print(repo.file_path(file)) + # print(repo.shard_path(file, node_type=0, node_index=0)) diff --git a/str-twincoding/storage/storage_cli.py b/str-twincoding/storage/storage_cli.py new file mode 100644 index 000000000..8076c002b --- /dev/null +++ b/str-twincoding/storage/storage_cli.py @@ -0,0 +1,245 @@ +import argparse +import os + +from icecream import ic + +from storage.config import NodeType, NodeType0, NodeType1 +from storage.repository import Repository +from encoding.file_decoder import FileDecoder +from encoding.file_encoder import FileEncoder +from encoding.node_recovery_client import NodeRecoveryClient +from encoding.node_recovery_source import NodeRecoverySource +from storage.util import dump_docs_md + + +# +# Command line interface for node encoding and recovery. +# + +def repo_list(args): + repo = Repository(path=args.path) if args.path else Repository.default() + files = repo.list() + for file in files: + print(repo.status_str(file)) + + +def repo_file_path(args): + print(Repository(path=args.path).file_path(file=args.file, expected=False)) + + +def repo_shard_path(args): + print(Repository(path=args.path).shard_path(file=args.file, node_type=args.node_type, node_index=args.node_index, + expected=False)) + + +def repo_recovery_file_path(args): + print(Repository(path=args.path).recovery_file_path( + file=args.file, + recover_node_type=args.recover_node_type, + recover_node_index=args.recover_node_index, + helper_node_index=args.helper_node_index, + expected=False)) + + +def repo_tmp_file_path(args): + print(Repository(path=args.path).tmp_file_path(file=args.file)) + + +def encode_file(args): + FileEncoder( + node_type0=NodeType0(k=args.k0, n=args.n0, encoding=args.encoding0), + node_type1=NodeType1(k=args.k1, n=args.n1, encoding=args.encoding1), + input_file=args.path, + output_path=args.output_path, + overwrite=args.overwrite + ).encode() + + +# Import command: Similar to encode_file but defaults to repository paths +def import_file(args): + # Default the file path + filename = os.path.basename(args.path) + repo = Repository(path=args.repo) if args.repo else Repository.default() + + output_path = repo.file_path(file=filename, expected=False) + FileEncoder( + node_type0=NodeType0(k=args.k0, n=args.n0, encoding=args.encoding0), + node_type1=NodeType1(k=args.k1, n=args.n1, encoding=args.encoding1), + input_file=args.path, + output_path=output_path, + overwrite=args.overwrite + ).encode() + + +def push_file(args): + ic(args.validate) + assert not args.validate, "Validation not implemented yet." + # prepare file + filename = args.file + repo = Repository(path=args.repo) if args.repo else Repository.default() + encoded_file_config = repo.file_config(file=filename) + # TODO: validate the file before uploading? + type0_shards, type1_shards = repo.map(filename) + # inverted_dict = {value: key for key, value in original_dict.items()} + + # prepare servers + servers = args.servers + if not servers: + # pick n0+n1 servers from known providers round-robin + servers = [] + + # FileUploader( ).encode() + + +def decode_file(args): + FileDecoder.from_encoded_dir( + path=args.encoded, + output_path=args.recovered, + overwrite=args.overwrite + ).decode() + + +def generate_recovery_file(args): + NodeRecoverySource( + recover_node_type=NodeType( + type=args.recover_node_type, + k=args.k, n=args.n, encoding=args.recover_encoding), + recover_node_index=args.recover_node_index, + data_path=args.data_path, + output_path=args.output_path, + overwrite=args.overwrite + ).generate() + + +def recover_node(args): + NodeRecoveryClient( + recovery_source_node_type=NodeType( + # assume we are the opposite type of the node we are recovering + type=0 if args.recover_node_type == 1 else 1, + k=args.k, n=args.n, encoding=args.encoding), + file_map=NodeRecoveryClient.map_files( + files_dir=args.files_dir, + recover_node_type=args.recover_node_type, + recover_node_index=args.recover_node_index, + k=args.k), + output_path=args.output_path, + overwrite=args.overwrite + ).recover_node() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(prog="storage", description="File encoding and decoding utility") + subparsers = parser.add_subparsers(metavar='COMMAND', help="Sub-commands available.") + + # Repository + parser_repo = subparsers.add_parser('repo', help="Repository path related operations.") + parser_repo.add_argument('--path', required=False, help="Path to the repository.") + repo_subparsers = parser_repo.add_subparsers(metavar='REPO_COMMAND', help='Repository path commands available.') + + # Repository - list files + parser_repo_list = repo_subparsers.add_parser('list', help="List files in the repository.") + parser_repo_list.set_defaults(func=repo_list) + + # Repository - file_path command + parser_file_path = repo_subparsers.add_parser('file_path', help="Get the path to an encoded file.") + parser_file_path.add_argument('--file', required=True, help="The filename to get the path for.") + parser_file_path.set_defaults(func=repo_file_path) + + # Repository - shard_path command + parser_shard_path = repo_subparsers.add_parser('shard_path', help="Get the path to a shard of the encoded file.") + parser_shard_path.add_argument('--file', required=True, help="The filename to get the shard path for.") + parser_shard_path.add_argument('--node_type', type=int, required=True, help="The node type for the shard.") + parser_shard_path.add_argument('--node_index', type=int, required=True, help="The node index for the shard.") + parser_shard_path.set_defaults(func=repo_shard_path) + + # Repository - recovery_file_path command + parser_recovery_file_path = repo_subparsers.add_parser('recovery_file_path', + help="Get the path for a recovery file.") + parser_recovery_file_path.add_argument('--file', required=True, help="The filename to get the recovery path for.") + parser_recovery_file_path.add_argument('--recover_node_type', type=int, required=True, + help="The node type for recovery.") + parser_recovery_file_path.add_argument('--recover_node_index', type=int, required=True, + help="The node index for recovery.") + parser_recovery_file_path.add_argument('--helper_node_index', type=int, required=True, + help="The helper node index for recovery.") + parser_recovery_file_path.set_defaults(func=repo_recovery_file_path) + + # Repository - tmp_file_path command + parser_tmp_file_path = repo_subparsers.add_parser('tmp_file_path', help="Get the path for a temporary file.") + parser_tmp_file_path.add_argument('--file', required=True, help="The filename to get the temporary path for.") + parser_tmp_file_path.set_defaults(func=repo_tmp_file_path) + + # Encode + parser_encode = subparsers.add_parser('encode', help="Encode a file.") + parser_encode.add_argument('--path', required=True, help="Path to the file to encode.") + parser_encode.add_argument('--output_path', required=True, help="Output path for the encoded file.") + parser_encode.add_argument('--k0', type=int, required=True, help="k value for node type 0.") + parser_encode.add_argument('--n0', type=int, required=True, help="n value for node type 0.") + parser_encode.add_argument('--k1', type=int, required=True, help="k value for node type 1.") + parser_encode.add_argument('--n1', type=int, required=True, help="n value for node type 1.") + parser_encode.add_argument('--encoding0', default='reed_solomon', help="Encoding for node type 0.") + parser_encode.add_argument('--encoding1', default='reed_solomon', help="Encoding for node type 1.") + parser_encode.add_argument('--overwrite', action='store_true', help="Overwrite existing files.") + parser_encode.set_defaults(func=encode_file) + + # Decode + parser_decode = subparsers.add_parser('decode', help="Decode an encoded file.") + parser_decode.add_argument('--encoded', required=True, help="Path to the encoded file.") + parser_decode.add_argument('--recovered', required=True, help="Path to the recovered file.") + parser_decode.add_argument('--overwrite', action='store_true', help="Overwrite existing files.") + parser_decode.set_defaults(func=decode_file) + + # Recovery Generation + parser_gen_rec = subparsers.add_parser('generate_recovery_file', help="Generate recovery files.") + parser_gen_rec.add_argument('--recover_node_type', type=int, required=True, help="Type of the recovering node.") + parser_gen_rec.add_argument('--recover_node_index', type=int, required=True, help="Index of the recovering node.") + parser_gen_rec.add_argument('--recover_encoding', default='reed_solomon', help="Encoding for the recovering node.") + parser_gen_rec.add_argument('--k', type=int, required=True, help="k value for the recovering node.") + parser_gen_rec.add_argument('--n', type=int, required=True, help="n value for the recovering node.") + parser_gen_rec.add_argument('--data_path', required=True, help="Path to the source node data.") + parser_gen_rec.add_argument('--output_path', required=True, help="Path to the output recovery file.") + parser_gen_rec.add_argument('--overwrite', action='store_true', help="Overwrite existing files.") + parser_gen_rec.set_defaults(func=generate_recovery_file) + + # Node Recovery + parser_rec_node = subparsers.add_parser('recover_node', help="Recover a node from recovery files.") + parser_rec_node.add_argument('--recover_node_type', type=int, required=True, help="Type of the recovering node.") + parser_rec_node.add_argument('--recover_node_index', type=int, required=True, help="Index of the recovering node.") + parser_rec_node.add_argument('--k', type=int, required=True, help="k value for node type.") + parser_rec_node.add_argument('--n', type=int, required=True, help="n value for node type.") + parser_rec_node.add_argument('--encoding', default='reed_solomon', help="Encoding for node type.") + parser_rec_node.add_argument('--files_dir', required=True, help="Path to the recovery files.") + parser_rec_node.add_argument('--output_path', required=True, help="Path to the recovered file.") + parser_rec_node.add_argument('--overwrite', action='store_true', help="Overwrite existing files.") + parser_rec_node.set_defaults(func=recover_node) + + # Import: Like Encode but using the default repo paths and encoding + parser_import = subparsers.add_parser('import', help="Import file using default repo and encoding.") + parser_import.add_argument('--repo', required=False, help="Path to the repository.") + parser_import.add_argument('--k0', type=int, default="3", help="k value for node type 0.") + parser_import.add_argument('--n0', type=int, default="5", help="n value for node type 0.") + parser_import.add_argument('--k1', type=int, default="3", help="k value for node type 1.") + parser_import.add_argument('--n1', type=int, default="5", help="n value for node type 1.") + parser_import.add_argument('--encoding0', default='reed_solomon', help="Encoding for node type 0.") + parser_import.add_argument('--encoding1', default='reed_solomon', help="Encoding for node type 1.") + parser_import.add_argument('--overwrite', action='store_true', help="Overwrite existing files.") + parser_import.add_argument('path', help="Path to the file to import.") # Positional argument + parser_import.set_defaults(func=import_file) + + # Push: Send a file in the repository to one or more providers + parser_push = subparsers.add_parser('push', help="Send a file in the repository to one or more providers.") + parser_push.add_argument('--repo', required=False, help="Path to the repository.") + parser_push.add_argument('--servers', nargs='*', help="List of server names or urls to push to.") + parser_push.add_argument('--validate', action='store_true', help="After push, download and reconstruct the file.") + parser_push.add_argument('file', help="Name of the file in the repository.") + parser_push.set_defaults(func=push_file) + + # Dump all help for docs + parser_docs = subparsers.add_parser('docs') + parser_docs.set_defaults(func=lambda args: dump_docs_md(args, subparsers)) + + args = parser.parse_args() + if hasattr(args, 'func'): + args.func(args) + else: + parser.print_help() diff --git a/str-twincoding/storage/util.py b/str-twincoding/storage/util.py new file mode 100644 index 000000000..2b5c0a44a --- /dev/null +++ b/str-twincoding/storage/util.py @@ -0,0 +1,53 @@ +import argparse +from typing import List +from storage.config import NodeType + + +def assert_rs(node_type: NodeType): + assert node_type.encoding == 'reed_solomon', "Only reed solomon encoding is currently supported." + + +def assert_node_type(node_type: int): + assert node_type in [0, 1], "node_type must be 0 or 1." + + +def summarize_ranges(numbers: List[int]) -> str: + """Summarize a list of integers into a compact range string.""" + if not numbers: + return "" + + # Start with the first number, and initialize current range + ranges = [] + start = numbers[0] + end = numbers[0] + + for n in numbers[1:]: + if n == end + 1: + # Continue the range + end = n + else: + # Finish the current range and start a new one + if start == end: + ranges.append(str(start)) + else: + ranges.append(f"{start}-{end}") + start = end = n + + # Add the last range + if start == end: + ranges.append(str(start)) + else: + ranges.append(f"{start}-{end}") + + return ", ".join(ranges) + + +# Dump all help for docs for the argparse subparsers as markdown +def dump_docs_md(_, subparsers: argparse._SubParsersAction): + for name, subparser in subparsers._name_parser_map.items(): + if name == 'docs': + continue + print(f"###`{name}`") + print('```') + print(subparser.print_help()) + print('```') diff --git a/str-twincoding/tests.sh b/str-twincoding/tests.sh index 633b23d71..5a985d2cb 120000 --- a/str-twincoding/tests.sh +++ b/str-twincoding/tests.sh @@ -1 +1 @@ -examples.sh \ No newline at end of file +examples/examples.sh \ No newline at end of file diff --git a/str-twincoding/util.py b/str-twincoding/util.py deleted file mode 100644 index 050d79e4a..000000000 --- a/str-twincoding/util.py +++ /dev/null @@ -1,21 +0,0 @@ -import io -import os -from typing import Optional -from config import NodeType - - -def open_output_file(output_path: str, overwrite: bool) -> Optional[io.BufferedWriter]: - if not overwrite and os.path.exists(output_path): - print(f"Output file already exists: {output_path}.") - return None - - # Make intervening directories if needeed - directory = os.path.dirname(output_path) - if directory: - os.makedirs(directory, exist_ok=True) - - return io.BufferedWriter(open(output_path, 'wb')) - - -def assert_rs(node_type: NodeType): - assert node_type.encoding == 'reed_solomon', "Only reed solomon encoding is currently supported."