Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dataset caching for DatabaseULog #91

Merged
merged 4 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ dist/
*.egg-info
.eggs/
*.sqlite3
venv/

32 changes: 30 additions & 2 deletions pyulog/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ def load(self, lazy=True):
dataset = self.get_dataset(dataset_name,
multi_instance=multi_id,
lazy=lazy,
db_cursor=cur)
db_cursor=cur,
caching=False)
self._data_list.append(dataset)

# dropouts
Expand Down Expand Up @@ -390,7 +391,7 @@ def load(self, lazy=True):

cur.close()

def get_dataset(self, name, multi_instance=0, lazy=False, db_cursor=None):
def get_dataset(self, name, multi_instance=0, lazy=False, db_cursor=None, caching=True):
'''
Access a specific dataset and its data series from the database.

Expand All @@ -400,14 +401,31 @@ def get_dataset(self, name, multi_instance=0, lazy=False, db_cursor=None):

The optional "db_cursor" argument can be used to avoid re-opening the
database connection each time get_dataset is called.

Since we don't expect the data to change often, we will normally use
self._data_list as a cache, and check there before reading from the
database. However, if caching=False, then we will always read anew
from the database.
'''

if db_cursor is None:
db_context = self._db()
cur = db_context.cursor()
else:
db_context = contextlib.nullcontext()
cur = db_cursor

existing_dataset = None
for dataset in self._data_list:
if dataset.name == name and dataset.multi_id == multi_instance:
existing_dataset = dataset
break

if (caching
and existing_dataset is not None
and (lazy or existing_dataset.data)):
return existing_dataset

with db_context:
cur.execute('''
SELECT Id, TimestampIndex, MessageId
Expand All @@ -432,6 +450,16 @@ def get_dataset(self, name, multi_instance=0, lazy=False, db_cursor=None):
dtype = DatabaseULog._UNPACK_TYPES[data_type][2]
data[field_name] = np.frombuffer(value_bytes, dtype=dtype)

# If caching=True but there is no existing dataset we could append a
# new one to self._data_list, but that could be considered a
# non-obvious side effect.
if caching and existing_dataset is not None:
existing_dataset.msg_id = msg_id
existing_dataset.timestamp_idx = timestamp_idx
existing_dataset.field_data = fields
existing_dataset.data = data
dataset = existing_dataset
else:
dataset = DatabaseULog.DatabaseData(
name=name,
multi_id=multi_instance,
Expand Down
37 changes: 34 additions & 3 deletions test/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,42 @@ def test_lazy(self):
primary_key = dbulog_saved.primary_key
dbulog_loaded = DatabaseULog(self.db_handle, primary_key=primary_key)
for dataset in ulog.data_list:
db_dataset = next(ds for ds in dbulog_loaded.data_list if ds.name == dataset.name)
db_dataset = next(ds for ds in dbulog_loaded.data_list
if ds.name == dataset.name and ds.multi_id == dataset.multi_id)
self.assertEqual(len(db_dataset.data), 0)
self.assertNotEqual(len(dataset.data), 0)
self.assertEqual(ulog.get_dataset(dataset.name),
dbulog_loaded.get_dataset(dataset.name))
ulog_dataset = ulog.get_dataset(dataset.name,
multi_instance=dataset.multi_id)
dbulog_dataset = dbulog_loaded.get_dataset(dataset.name,
multi_instance=dataset.multi_id)
self.assertEqual(ulog_dataset, dbulog_dataset)


def test_data_caching(self):
'''
Verify that the caching of dataset data works as expected.
'''
test_file = os.path.join(TEST_PATH, 'sample_log_small.ulg')

dbulog_saved = DatabaseULog(self.db_handle, log_file=test_file)
dbulog_saved.save()
primary_key = dbulog_saved.primary_key
dbulog_loaded = DatabaseULog(self.db_handle, primary_key=primary_key, lazy=True)
for dataset in dbulog_loaded.data_list:
cache_miss = dbulog_loaded.get_dataset(dataset.name,
multi_instance=dataset.multi_id,
caching=True)
cache_hit = dbulog_loaded.get_dataset(dataset.name,
multi_instance=dataset.multi_id,
caching=True)
uncached = dbulog_loaded.get_dataset(dataset.name,
multi_instance=dataset.multi_id,
caching=False)

self.assertEqual(cache_miss, cache_hit)
self.assertEqual(cache_miss, uncached)
self.assertIs(cache_miss, cache_hit)
self.assertIsNot(cache_miss, uncached)

def test_save(self):
'''
Expand Down
83 changes: 83 additions & 0 deletions test/test_px4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
'''
Tests the PX4ULog class
'''

import os
import inspect
import unittest

from ddt import ddt, data

from pyulog import ULog
from pyulog.px4 import PX4ULog
from pyulog.db import DatabaseULog
from pyulog.migrate_db import migrate_db

TEST_PATH = os.path.dirname(os.path.abspath(
inspect.getfile(inspect.currentframe())))

@ddt
class TestPX4ULog(unittest.TestCase):
'''
Tests the PX4ULog class
'''

def setUp(self):
'''
Set up the test database.
'''
self.db_path = os.path.join(TEST_PATH, 'pyulog_test.sqlite3')
self.db_handle = DatabaseULog.get_db_handle(self.db_path)
migrate_db(self.db_path)


def tearDown(self):
'''
Remove the test database after use.
'''
os.remove(self.db_path)

@data('sample',
'sample_appended',
'sample_appended_multiple',
'sample_logging_tagged_and_default_params')
def test_add_roll_pitch_yaw(self, base_name):
'''
Test that add_roll_pitch_yaw correctly adds RPY values to 'vehicle_attitude'
'''
ulog_file_name = os.path.join(TEST_PATH, base_name + '.ulg')
ulog = ULog(ulog_file_name)
px4 = PX4ULog(ulog)
px4.add_roll_pitch_yaw()

dataset = ulog.get_dataset('vehicle_attitude')
assert 'roll' in dataset.data
assert 'pitch' in dataset.data
assert 'yaw' in dataset.data

@data('sample',
'sample_appended',
'sample_appended_multiple',
'sample_logging_tagged_and_default_params')
def test_add_roll_pitch_yaw_db(self, base_name):
'''
Test that add_roll_pitch_yaw correctly adds RPY values to
'vehicle_attitude' on a DatabaseULog object.
'''
ulog_file_name = os.path.join(TEST_PATH, base_name + '.ulg')
dbulog = DatabaseULog(self.db_handle, log_file=ulog_file_name)
dbulog.save()
del dbulog
digest = DatabaseULog.calc_sha256sum(ulog_file_name)
primary_key = DatabaseULog.primary_key_from_sha256sum(self.db_handle, digest)
dbulog = DatabaseULog(self.db_handle, primary_key=primary_key, lazy=False)
px4 = PX4ULog(dbulog)
px4.add_roll_pitch_yaw()

dataset = dbulog.get_dataset('vehicle_attitude')
assert 'roll' in dataset.data
assert 'pitch' in dataset.data
assert 'yaw' in dataset.data


# vim: set et fenc=utf-8 ft=python ff=unix sts=4 sw=4 ts=4
Loading