-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathcontroller_wunderground.go
171 lines (140 loc) · 4.93 KB
/
controller_wunderground.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package main
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"sync"
"time"
"go.uber.org/zap"
)
// WeatherUndergroundController holds our connection along with some mutexes for operation
type WeatherUndergroundController struct {
ctx context.Context
wg *sync.WaitGroup
config *Config
wuconfig WeatherUndergroundConfig
logger *zap.SugaredLogger
DB *TimescaleDBClient
}
// WeatherUndergroundconfig holds configuration for this controller
type WeatherUndergroundConfig struct {
StationID string `yaml:"station-id,omitempty"`
APIKey string `yaml:"api-key,omitempty"`
UploadInterval string `yaml:"upload-interval,omitempty"`
PullFromDevice string `yaml:"pull-from-device,omitempty"`
APIEndpoint string `yaml:"api-endpoint,omitempty"`
}
func NewWeatherUndergroundController(ctx context.Context, wg *sync.WaitGroup, c *Config, wuconfig WeatherUndergroundConfig, logger *zap.SugaredLogger) (*WeatherUndergroundController, error) {
wuc := WeatherUndergroundController{
ctx: ctx,
wg: wg,
config: c,
wuconfig: wuconfig,
logger: logger,
}
if wuc.config.Storage.TimescaleDB.ConnectionString == "" {
return &WeatherUndergroundController{}, fmt.Errorf("TimescaleDB storage must be configured for the Weather Underground controller to function")
}
if wuc.wuconfig.StationID == "" {
return &WeatherUndergroundController{}, fmt.Errorf("station ID must be set")
}
if wuc.wuconfig.APIKey == "" {
return &WeatherUndergroundController{}, fmt.Errorf("API key must be set")
}
if wuc.wuconfig.PullFromDevice == "" {
return &WeatherUndergroundController{}, fmt.Errorf("pull-from-device must be set")
}
if wuc.wuconfig.APIEndpoint == "" {
wuc.wuconfig.APIEndpoint = "https://rtupdate.wunderground.com/weatherstation/updateweatherstation.php"
}
if wuc.wuconfig.UploadInterval == "" {
// Use a default interval of 60 seconds
wuc.wuconfig.UploadInterval = "60"
}
wuc.DB = NewTimescaleDBClient(c, logger)
if !wuc.DB.validatePullFromStation(wuc.wuconfig.PullFromDevice) {
return &WeatherUndergroundController{}, fmt.Errorf("pull-from-device %v is not a valid station name", wuc.wuconfig.PullFromDevice)
}
err := wuc.DB.connectToTimescaleDB(c.Storage)
if err != nil {
return &WeatherUndergroundController{}, fmt.Errorf("could not connect to TimescaleDB: %v", err)
}
return &wuc, nil
}
func (p *WeatherUndergroundController) StartController() error {
go p.sendPeriodicReports()
return nil
}
func (p *WeatherUndergroundController) sendPeriodicReports() {
p.wg.Add(1)
defer p.wg.Done()
submitInterval, err := time.ParseDuration(fmt.Sprintf("%vs", p.wuconfig.UploadInterval))
if err != nil {
log.Errorf("error parsing duration: %v", err)
}
ticker := time.NewTicker(submitInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Debug("Sending reading to PWS Weather...")
br, err := p.DB.getReadingsFromTimescaleDB(p.wuconfig.PullFromDevice)
if err != nil {
log.Info("error getting readings from TimescaleDB:", err)
}
log.Debugf("readings fetched from TimescaleDB for PWS Weather: %+v", br)
err = p.sendReadingsToWeatherUnderground(&br)
if err != nil {
log.Errorf("error sending readings to PWS Weather: %v", err)
}
case <-p.ctx.Done():
return
}
}
}
func (p *WeatherUndergroundController) sendReadingsToWeatherUnderground(r *FetchedBucketReading) error {
v := url.Values{}
// Add our authentication parameters to our URL
v.Set("ID", p.wuconfig.StationID)
v.Set("PASSWORD", p.wuconfig.APIKey)
now := time.Now().In(time.UTC)
v.Set("dateutc", now.Format("2006-01-02 15:04:05"))
// This is a real-time weather update request (approx 2.5s interval)
v.Set("action", "updateraw")
v.Set("realtime", "1")
v.Set("rtfreq", "2.5")
// Set some values for our weather metrics
v.Set("winddir", strconv.FormatInt(int64(r.WindDir), 10))
v.Set("windspeedmph", strconv.FormatInt(int64(r.WindSpeed), 10))
v.Set("humidity", strconv.FormatInt(int64(r.InHumidity), 10))
v.Set("tempf", fmt.Sprintf("%.1f", r.OutTemp))
v.Set("dailyrainin", fmt.Sprintf("%.2f", r.DayRain))
v.Set("baromin", fmt.Sprintf("%.2f", r.Barometer))
v.Set("softwaretype", fmt.Sprintf("RemoteWeather %v", version))
client := http.Client{
Timeout: 5 * time.Second,
}
req, err := http.NewRequest("GET", fmt.Sprint(p.wuconfig.APIEndpoint+"?"+v.Encode()), nil)
if err != nil {
return fmt.Errorf("error creating PWS Weather HTTP request: %v", err)
}
log.Debugf("Making request to Weather Underground: %v?%v", p.wuconfig.APIEndpoint, v.Encode())
req = req.WithContext(p.ctx)
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("error sending report to PWS Weather: %v", err)
}
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return fmt.Errorf("error reading PWS Weather response body: %v", err)
}
if !bytes.Contains(body, []byte("success")) {
return fmt.Errorf("bad response from PWS Weather server: %v", string(body))
}
return nil
}