Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate values #33

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
98 changes: 96 additions & 2 deletions bin/user/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,21 @@
# To specify multiple cyphers, delimit with commas and enclose
# in quotes.
#ciphers =

To send aggregated values, define them in section

[StdRestful]
[[MQTT]]
...
[[[augmentations]]]
aggobs = period.obstype.aggtype

aggobs: the name you give the aggregated value
period: one of 'day', 'yesterday', 'week', 'month', 'year'
obstype: an observation type in the packet
aggtype: an aggregation to be perfomed on the observation type
There can be multiple lines like that.

"""

try:
Expand Down Expand Up @@ -126,8 +141,10 @@
import weewx.restx
import weewx.units
from weeutil.weeutil import to_int, to_bool, accumulateLeaves
import weeutil.weeutil
import weewx.xtypes

VERSION = "0.24"
VERSION = "0.25"

if weewx.__version__ < "3":
raise weewx.UnsupportedFeature("weewx 3 is required, found %s" %
Expand Down Expand Up @@ -280,6 +297,9 @@ def __init__(self, engine, config_dict):
for obs_type in site_dict['inputs']:
_compat(site_dict['inputs'][obs_type], 'units', 'unit')

if 'augmentations' in config_dict['StdRESTful']['MQTT']:
site_dict['augmentations'] = config_dict['StdRESTful']['MQTT']['augmentations']

site_dict['append_units_label'] = to_bool(site_dict.get('append_units_label'))
site_dict['augment_record'] = to_bool(site_dict.get('augment_record'))
site_dict['retain'] = to_bool(site_dict.get('retain'))
Expand Down Expand Up @@ -390,7 +410,8 @@ def __init__(self, queue, server_url,
post_interval=None, stale=None,
log_success=True, log_failure=True,
timeout=60, max_tries=3, retry_wait=5,
max_backlog=sys.maxsize):
max_backlog=sys.maxsize,
augmentations={'dayRain':'day.rain.sum'}):
super(MQTTThread, self).__init__(queue,
protocol_name='MQTT',
manager_dict=manager_dict,
Expand Down Expand Up @@ -431,6 +452,25 @@ def __init__(self, queue, server_url,
self.skip_upload = skip_upload
self.mc = None
self.mc_try_time = 0

self.augmentations = dict()
for agg_obs in augmentations:
# split definition into its parts timespan, observation type, and
# aggregation type
tag = augmentations[agg_obs].split('.')
# see whether all 3 parts are present
if len(tag)==3:
# example: day.rain.sum
# '$' at the beginning is possible but not necessary
if tag[0][0]=='$': tag[0] = tag[0][1:]
# timespan (day, yesterday, week, month, year)
if tag[0] not in MQTTThread.PERIODS:
logerr("unknown time period '%s' for '%s'" % (tag[0],agg_obs))
else:
self.augmentations[agg_obs] = tag
else:
logerr("syntax error in %s: timespan.obstype.aggregation required" % augmentations[agg_obs])


def get_mqtt_client(self):
if self.mc:
Expand Down Expand Up @@ -537,3 +577,57 @@ def process_record(self, record, dbm):
if res != mqtt.MQTT_ERR_SUCCESS:
logerr("publish failed for %s: %s" %
(tpc, mqtt.error_string(res)))

PERIODS = {
'day':lambda _time_ts:weeutil.weeutil.archiveDaySpan(_time_ts),
'yesterday':lambda _time_ts:weeutil.weeutil.archiveDaySpan(_time_ts,1,1),
'week':lambda _time_ts:weeutil.weeutil.archiveWeekSpan(_time_ts),
'month':lambda _time_ts:weeutil.weeutil.archiveMonthSpan(_time_ts),
'year':lambda _time_ts:weeutil.weeutil.archiveYearSpan(_time_ts)}

def get_record(self, record, dbmanager):
"""Augment record data with additional data from the archive.
Should return results in the same units as the record and the database.

returns: A dictionary of weather values"""

# run parent class
_datadict = super(MQTTThread,self).get_record(record,dbmanager)

# actual time stamp
_time_ts = _datadict['dateTime']

# go through all augmentations
for agg_obs in self.augmentations:
tag = self.augmentations[agg_obs]
# If the observation type is in _datadict, calculate
# the aggregation.
# Note: It is no error, if the observation type is not
# in _datadict, as _datadict can be a LOOP packet
# that does not contain all the observation
# types.
if tag[2]!='series' and self.aug_has_data(tag[1],tag[2],_datadict):
# get timespan
# (There is no need for error handling regarding the
# presence of tag[0] in PERIODS here, as this is already
# done in initialization.)
ts = MQTTThread.PERIODS[tag[0]](_time_ts)
try:
# get aggregate value
__result = weewx.xtypes.get_aggregate(tag[1],ts,tag[2],dbmanager)
# convert to unit system of _datadict
_datadict[agg_obs] = weewx.units.convertStd(__result,_datadict['usUnits'])[0]
# register name with unit group if necessary
weewx.units.obs_group_dict.setdefault(agg_obs,__result[2])
except (LookupError,ValueError,TypeError,weewx.UnknownType,weewx.UnknownAggregation,weewx.CannotCalculate) as e:
logerr('%s = %s: error %s' % (agg_obs,tag,e))

return _datadict

def aug_has_data(self, obs_type, agg_type, datadict):
if obs_type=='wind':
if agg_type=='max':
return 'windGust' in datadict and 'windGustDir' in datadict
else:
return 'windSpeed' in datadict and 'windDir' in datadict
return obs_type in datadict
3 changes: 3 additions & 0 deletions changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
0.25 19apr2022
* added calculation and output of aggregated values

0.24 16apr2022
* renamed option 'units' to 'unit', although either will be accepted.

Expand Down