Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made changes to Calendar and Aggregator classes #109

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,46 @@ example configuration for nginx is provided under `contrib/nginx`.
```
python3 -m unittest discover -v
```



Problem Resolution with LevelDB in OpenTimestamps
## Context
During the setup of the OpenTimestamps server, compatibility issues arose between the leveldb
library and modern Python versions, which caused errors when initializing the LevelDB database.
## Problem
The error identified when trying to run the server was:
leveldb.LevelDBError: IO error: lock /home/<user>/.otsd/backups/db/LOCK: Resource temporarily
unavailable
Other versions of Python were also incompatible with the library.
## Applied Solution
It was determined that Python 3.7.12 is a stable version compatible with the leveldb library. Below
are the steps to correctly configure the environment.
### 1. Install the Correct Python Version
We used pyenv to install and activate the required Python version:
pyenv install 3.7.12
pyenv virtualenv 3.7.12 opentimestamps-env
pyenv activate opentimestamps-env
### 2. Reinstall Dependencies
With the virtual environment active, we installed the necessary dependencies for the
OpenTimestamps server:
pip install -r requirements.txt
pip install plyvel
### 3. Initialize the Server
With the environment set up, we started the server to confirm that the LevelDB database works
correctly:
python3 otsd-backup.py
If the server starts correctly and shows a message like the following, the issue is resolved:
db dir is /home/<user>/.otsd/backups/db
Starting at localhost:14799
## Verification
To verify that everything works properly, you can perform a test timestamping:
echo "OpenTimestamps Test" > test.txt
ots stamp -c http://127.0.0.1:14799 -m 1 test.txt
## Additional Notes
- Make sure you have the proper permissions for the LevelDB lock file:
rm -f ~/.otsd/backups/db/LOCK
- If the problem persists, ensure no previous processes are occupying the required resources.
With this solution, the OpenTimestamps server should be operating correctly.


75 changes: 40 additions & 35 deletions otsserver/calendar.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import sys
import threading
import time
import requests

from opentimestamps.core.notary import TimeAttestation, PendingAttestation, BitcoinBlockHeaderAttestation
from opentimestamps.core.op import Op, OpPrepend, OpAppend, OpSHA256
Expand Down Expand Up @@ -101,6 +102,8 @@ def submit(self, commitment):
os.fsync(self.append_fd.fileno())

class LevelDbCalendar:
def __del__(self):
del self.db
def __init__(self, path):
self.db = leveldb.LevelDB(path)

Expand Down Expand Up @@ -142,7 +145,7 @@ def __put_timestamp(self, new_timestamp, batch, batch_cache):
for op in new_timestamp.ops:
op.serialize(ctx)

batch.Put(new_timestamp.msg, ctx.getbytes())
batch.Put(new_timestamp.msg.encode('utf-8'), ctx.getbytes())
batch_cache[new_timestamp.msg] = new_timestamp

def __getitem__(self, msg):
Expand Down Expand Up @@ -205,36 +208,44 @@ def add_timestamps(self, new_timestamps):
self.db.Write(batch, sync=True)
logging.debug("Done LevelDbCalendar.add_timestamps(), added %d timestamps total" % n)


class Calendar:
def __init__(self, path):
def __init__(self, path, upstream=None, upstream_timeout=15):
path = os.path.normpath(path)
os.makedirs(path, exist_ok=True)
self.path = path
self.journal = JournalWriter(path + '/journal')
self.journal = JournalWriter(os.path.join(path, 'journal'))

self.db = LevelDbCalendar(os.path.join(path, 'db'))

self.db = LevelDbCalendar(path + '/db')
self.upstream = upstream
self.upstream_timeout = upstream_timeout

try:
uri_path = self.path + '/uri'
uri_path = os.path.join(self.path, 'uri')
with open(uri_path, 'r') as fd:
self.uri = fd.read().strip()
except FileNotFoundError as err:
logging.error('Calendar URI not yet set; %r does not exist' % uri_path)
except FileNotFoundError:
logging.error('Calendar URI not yet set; %r does not exist', uri_path)
sys.exit(1)
except Exception as e:
logging.error('Error reading Calendar URI: %s', e)
sys.exit(1)

try:
hmac_key_path = self.path + '/hmac-key'
hmac_key_path = os.path.join(self.path, 'hmac-key')
with open(hmac_key_path, 'rb') as fd:
self.hmac_key = fd.read()
except FileNotFoundError as err:
logging.error('HMAC secret key not set; %r does not exist' % hmac_key_path)
except FileNotFoundError:
logging.error('HMAC secret key not set; %r does not exist', hmac_key_path)
sys.exit(1)
except Exception as e:
logging.error('Error reading HMAC secret key: %s', e)
sys.exit(1)

def submit(self, submitted_commitment):
idx = int(time.time())

serialized_idx = struct.pack('>L', idx)

commitment = submitted_commitment.ops.add(OpPrepend(serialized_idx))

per_idx_key = derive_key_for_idx(self.hmac_key, idx, bits=32)
Expand All @@ -244,42 +255,39 @@ def submit(self, submitted_commitment):
macced_commitment.attestations.add(PendingAttestation(self.uri))
self.journal.submit(macced_commitment.msg)

def __contains__(self, commitment):
return commitment in self.db

def __getitem__(self, commitment):
"""Get commitment timestamps(s)"""
return self.db[commitment]

def add_commitment_timestamps(self, new_timestamps):
"""Add timestamps"""
self.db.add_timestamps(new_timestamps)

# send server commitment to upstream
if self.upstream:
try:
response = requests.post(
self.upstream,
data=macced_commitment.msg,
timeout=self.upstream_timeout
)
response.raise_for_status()
logging.info(f"Commitment sent to upstream: {self.upstream}")
except requests.RequestException as e:
logging.error(f"Failed to send commitment to upstream: {e}")

class Aggregator:
def __loop(self):
logging.info("Starting aggregator loop")
while not self.exit_event.wait(self.commitment_interval):
digests = []
done_events = []
last_commitment = time.time()
while not self.digest_queue.empty():
# This should never raise the Empty exception, as we should be
# the only thread taking items off the queue
(digest, done_event) = self.digest_queue.get_nowait()
digests.append(digest)
done_events.append(done_event)

if not len(digests):
if not digests:
continue

digests_commitment = make_merkle_tree(digests)

logging.info("Aggregated %d digests under commitment %s" % (len(digests), b2x(digests_commitment.msg)))

self.calendar.submit(digests_commitment)

# Notify all requesters that the commitment is done
# Notify all requestors that the commitment is done
for done_event in done_events:
done_event.set()

Expand All @@ -291,21 +299,18 @@ def __init__(self, calendar, exit_event, commitment_interval=1):
self.thread = threading.Thread(target=self.__loop)
self.thread.start()

def submit(self, msg):
def submit(self, msg):
"""Submit message for aggregation

Aggregator thread will aggregate the message along with all other
messages, and return a Timestamp
"""
"""
timestamp = Timestamp(msg)

# Add nonce to ensure requester doesn't learn anything about other
# Add nonce to ensure requestor doesn't learn anything about other
# messages being committed at the same time, as well as to ensure that
# anything we store related to this commitment can't be controlled by
# them.
done_event = threading.Event()
self.digest_queue.put((nonce_timestamp(timestamp), done_event))

done_event.wait()

return timestamp