Skip to content

Commit

Permalink
Fixed index error on reading data from multiple measurements with com…
Browse files Browse the repository at this point in the history
…mon tags and added test case. Updated template code to handle templates without tags - added test case. Resolves #32
  • Loading branch information
pkittenis committed Jun 1, 2017
1 parent b027f8f commit 92aaf42
Show file tree
Hide file tree
Showing 7 changed files with 1,114 additions and 539 deletions.
3 changes: 2 additions & 1 deletion influxgraph/classes/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,8 @@ def _get_all_template_values(self, paths):
# Found template match for path, append query data and
# remove matched paths so we do not try to match them again
measurements.extend(_measurements)
tags.append(_tags)
if _tags:
tags.append(_tags)
fields = fields.union(_fields)
for path in matched_paths:
del paths[paths.index(path)]
Expand Down
1,539 changes: 1,004 additions & 535 deletions influxgraph/ext/templates.c

Large diffs are not rendered by default.

25 changes: 23 additions & 2 deletions influxgraph/ext/templates.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import logging

logger = logging.getLogger('influxgraph')


cpdef list heapsort(list iterable):
cdef list h = []
cdef tuple value
for value in iterable:
heappush(h, value)
return [heappop(h) for _ in range(len(h))]


cpdef list get_series_with_tags(list paths, dict all_fields,
list graphite_templates,
str separator = '.'):
Expand All @@ -53,6 +55,7 @@ cpdef list get_series_with_tags(list paths, dict all_fields,
series.append(split_path)
return series


cdef tuple _split_series_with_tags(list paths, list graphite_templates,
str separator):
cdef list split_path = []
Expand Down Expand Up @@ -84,16 +87,33 @@ cdef tuple _split_series_with_tags(list paths, list graphite_templates,
split_path = []
return [], template


cdef _get_first_not_none_tmpl_val(dict template):
for t in template.values():
if t:
return t


cdef _get_measurement_idx(dict template):
for key in template:
if template[key] == 'measurement':
return key


cpdef void _make_path_from_template(list split_path, unicode measurement,
dict template, list tags_values,
str separator):
cdef Py_ssize_t measurement_found = 0
cdef bint measurement_found = 0
cdef Py_ssize_t i
if not tags_values and separator in measurement and \
'measurement*' == [t for t in template.values() if t][0]:
_get_first_not_none_tmpl_val(template) == 'measurement*':
for i, measurement in enumerate(measurement.split(separator)):
split_path.append((i, measurement))
return
# Measurement without tags
if not tags_values:
split_path.append((_get_measurement_idx(template), measurement))
return
cdef unicode tag_key
cdef unicode tag_val
for (tag_key, tag_val) in tags_values:
Expand All @@ -106,6 +126,7 @@ cpdef void _make_path_from_template(list split_path, unicode measurement,
measurement_found = 1
split_path.append((i, measurement))


cdef void _add_fields_to_paths(list fields, list split_path, list series,
str separator):
cdef unicode field_key
Expand Down
10 changes: 10 additions & 0 deletions influxgraph/templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ def _get_first_not_none_tmpl_val(template):
return t


def _get_measurement_idx(template):
for key in template:
if template[key] == 'measurement':
return key


def _make_path_from_template(split_path, measurement, template, tags_values,
separator='.'):
if not tags_values and separator in measurement and \
Expand All @@ -258,6 +264,10 @@ def _make_path_from_template(split_path, measurement, template, tags_values,
split_path.append((i, measurement))
return
measurement_found = False
# Measurement without tags
if not tags_values:
split_path.append((_get_measurement_idx(template), measurement))
return
for (tag_key, tag_val) in tags_values:
for i, tmpl_tag_key in template.items():
if not tmpl_tag_key:
Expand Down
5 changes: 4 additions & 1 deletion influxgraph/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def read_influxdb_values(influxdb_data, paths, measurement_data):
for infl_data in influxdb_data:
for infl_keys in infl_data.keys():
measurement = infl_keys[0]
tags = infl_keys[1]
tags = infl_keys[1] if infl_keys[1] is not None else {}
if not measurement_data:
_read_measurement_metric_values(infl_data, measurement,
paths, _data)
Expand All @@ -216,6 +216,9 @@ def read_influxdb_values(influxdb_data, paths, measurement_data):
seen_measurements = set(
tuple(seen_measurements) + (measurement,))
m_path_ind = 0
elif len(measurement_data[measurement]['paths']) == 0:
# No paths left for measurement
continue
elif m_path_ind >= len(measurement_data[measurement]['paths']):
m_path_ind = 0
metric = measurement_data[measurement]['paths'][m_path_ind]
Expand Down
11 changes: 11 additions & 0 deletions tests/test_influxdb_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def setUp(self):
self.steps = int(round((int(self.end_time.strftime("%s")) - \
int(self.start_time.strftime("%s"))) * 1.0 / self.step)) + 1
self.config = { 'influxdb': { 'db' : self.db_name,
'log_level': 0,
},
'statsd': { 'host': 'localhost' },
'search_index': 'index',
Expand Down Expand Up @@ -94,6 +95,7 @@ def test_configured_deltas(self):
# Set data interval to 1 second for queries
# of one hour or less
'deltas' : {3600: 1},
'log_level': 0,
}}
finder = influxgraph.InfluxDBFinder(config)
self.assertTrue(finder.deltas)
Expand Down Expand Up @@ -440,6 +442,7 @@ def test_series_loader(self):
'ttl' : 60,
'max_value': 20,
},
'log_level': 0,
},}
try:
_memcache = memcache.Client([config['influxdb']['memcache']['host']])
Expand Down Expand Up @@ -476,6 +479,7 @@ def test_reindex(self):
'ttl' : 60,
'max_value': 20,
},
'log_level': 0,
},}
finder = influxgraph.InfluxDBFinder(config)
time.sleep(_reindex_interval)
Expand Down Expand Up @@ -507,6 +511,7 @@ def test_memcache_integration(self):
'memcache' : { 'host': 'localhost',
'ttl' : 60,
'max_value' : 20},
'log_level': 0,
},}
try:
_memcache = memcache.Client([config['influxdb']['memcache']['host']])
Expand Down Expand Up @@ -602,6 +607,7 @@ def test_memcache_default_config_values(self):
'pass' : 'root',
'db' : self.db_name,
'memcache' : { 'host': 'localhost'},
'log_level': 0,
},}
finder = influxgraph.InfluxDBFinder(config)
self.assertTrue(finder.memcache)
Expand Down Expand Up @@ -654,6 +660,7 @@ def test_parent_branch_series(self):
'memcache' : { 'host': 'localhost',
'ttl' : 60,
'max_value' : 20},
'log_level': 0,
},}
try:
_memcache = memcache.Client([config['influxdb']['memcache']['host']])
Expand Down Expand Up @@ -724,6 +731,7 @@ def test_retention_policies(self):
3600: 1800,
},
'retention_policies' : retention_policies,
'log_level': 0,
}}
self.client.create_retention_policy('10m', '1d', 1, database=self.db_name, default=False)
self.client.create_retention_policy('30m', '1d', 1, database=self.db_name, default=False)
Expand Down Expand Up @@ -796,6 +804,7 @@ def test_index_save_load_failure(self):
'user' : 'root',
'pass' : 'root',
'db' : self.db_name,
'log_level': 0,
},
'statsd': {'host': 'localhost' },
'search_index': bad_index_path,
Expand Down Expand Up @@ -829,6 +838,7 @@ def test_index_save_load(self):
'pass' : 'root',
'db' : self.db_name,
'reindex_interval': 1,
'log_level': 0,
},
}
finder = influxgraph.InfluxDBFinder(config)
Expand Down Expand Up @@ -885,6 +895,7 @@ def test_loader_limit(self):
'db' : self.db_name,
'loader_limit': 1,
'series_loader_interval': 1,
'log_level': 0,
'memcache' : { 'host': 'localhost',
'ttl' : 60,
},
Expand Down
60 changes: 60 additions & 0 deletions tests/test_influxdb_templates_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def setUp(self):
'templates': [
self.template,
],
'log_level': 0,
},
}
self.client = InfluxDBClient(database=self.db_name)
Expand Down Expand Up @@ -731,6 +732,65 @@ def test_data_with_fields(self):
else:
self.assertTrue(data[metric][-1] == fields[metric.split('.')[-1]])

def test_multi_measurement_no_tag_field(self):
template = "measurement.field"
measurements = ['m1', 'm2']
fields = lambda: {'f1': self.randval(),
'f2': self.randval(),}
m1_fields = fields()
m2_fields = fields()
self.client.drop_database(self.db_name)
self.client.create_database(self.db_name)
self.write_data([measurements[0]], {}, m1_fields)
self.write_data([measurements[1]], {}, m2_fields)
self.config['influxdb']['templates'] = [template]
self.finder = influxgraph.InfluxDBFinder(self.config)
metrics = ['.'.join([m, f])
for m in measurements
for f in list(m1_fields.keys())]
nodes = [influxgraph.classes.leaf.InfluxDBLeafNode(
path, self.finder.reader)
for path in metrics]
data = self._test_data_in_nodes(nodes)
for metric in data:
_fields = m1_fields if metric.startswith('m1') else m2_fields
field = 'f1' if metric.endswith('f1') else 'f2'
self.assertEqual(data[metric][-1], _fields[field])

def test_multi_measurement_multi_tag_non_greedy_field(self):
template = "measurement.tag.field"
measurements = ['m1', 'm2']
fields = lambda: {'f1': self.randval(),
'f2': self.randval(),}
m1_t1_fields = fields()
m1_t2_fields = fields()
m2_t1_fields = fields()
m2_t2_fields = fields()
tags1 = {'tag': 't1',}
tags2 = {'tag': 't2',}
metrics = ['.'.join([m, t['tag'], f])
for m in measurements
for t in [tags1, tags2]
for f in list(m1_t1_fields.keys())]
self.client.drop_database(self.db_name)
self.client.create_database(self.db_name)
self.write_data([measurements[0]], tags1, m1_t1_fields)
self.write_data([measurements[1]], tags1, m2_t1_fields)
self.write_data([measurements[0]], tags2, m1_t2_fields)
self.write_data([measurements[1]], tags2, m2_t2_fields)
self.config['influxdb']['templates'] = [template]
self.finder = influxgraph.InfluxDBFinder(self.config)
paths = ['m1.t1.f1', 'm2.t2.f2']
nodes = [influxgraph.classes.leaf.InfluxDBLeafNode(
path, self.finder.reader)
for path in paths]
data = self._test_data_in_nodes(nodes)
for metric in data:
if metric.endswith('f1'):
self.assertEqual(data[metric][-1], m1_t1_fields['f1'])
elif metric.endswith('f2'):
self.assertEqual(data[metric][-1], m2_t2_fields['f2'])

def test_multi_tag_values_multi_measurement_single_field(self):
template = "env.host.measurement.field*"
measurements = ['cpu', 'io']
Expand Down

0 comments on commit 92aaf42

Please sign in to comment.