forked from mainflux/mainflux
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadapter.go
147 lines (116 loc) · 4.42 KB
/
adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package opcua
import (
"context"
"fmt"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/opcua/db"
)
// Service specifies an API that must be fullfiled by the domain service
// implementation, and all of its decorators (e.g. logging & metrics).
type Service interface {
// CreateThing creates thingID:OPC-UA-nodeID route-map
CreateThing(ctx context.Context, thingID, nodeID string) error
// UpdateThing updates thingID:OPC-UA-nodeID route-map
UpdateThing(ctx context.Context, thingID, nodeID string) error
// RemoveThing removes thingID:OPC-UA-nodeID route-map
RemoveThing(ctx context.Context, thingID string) error
// CreateChannel creates channelID:OPC-UA-serverURI route-map
CreateChannel(ctx context.Context, chanID, serverURI string) error
// UpdateChannel updates channelID:OPC-UA-serverURI route-map
UpdateChannel(ctx context.Context, chanID, serverURI string) error
// RemoveChannel removes channelID:OPC-UA-serverURI route-map
RemoveChannel(ctx context.Context, chanID string) error
// ConnectThing creates thingID:channelID route-map
ConnectThing(ctx context.Context, chanID, thingID string) error
// DisconnectThing removes thingID:channelID route-map
DisconnectThing(ctx context.Context, chanID, thingID string) error
// Browse browses available nodes for a given OPC-UA Server URI and NodeID
Browse(ctx context.Context, serverURI, namespace, identifier string) ([]BrowsedNode, error)
}
// Config OPC-UA Server.
type Config struct {
ServerURI string
NodeID string
Interval string `env:"MF_OPCUA_ADAPTER_INTERVAL_MS" envDefault:"1000"`
Policy string `env:"MF_OPCUA_ADAPTER_POLICY" envDefault:""`
Mode string `env:"MF_OPCUA_ADAPTER_MODE" envDefault:""`
CertFile string `env:"MF_OPCUA_ADAPTER_CERT_FILE" envDefault:""`
KeyFile string `env:"MF_OPCUA_ADAPTER_KEY_FILE" envDefault:""`
}
var _ Service = (*adapterService)(nil)
type adapterService struct {
subscriber Subscriber
browser Browser
thingsRM RouteMapRepository
channelsRM RouteMapRepository
connectRM RouteMapRepository
cfg Config
logger logger.Logger
}
// New instantiates the OPC-UA adapter implementation.
func New(sub Subscriber, brow Browser, thingsRM, channelsRM, connectRM RouteMapRepository, cfg Config, log logger.Logger) Service {
return &adapterService{
subscriber: sub,
browser: brow,
thingsRM: thingsRM,
channelsRM: channelsRM,
connectRM: connectRM,
cfg: cfg,
logger: log,
}
}
func (as *adapterService) CreateThing(ctx context.Context, thingID, nodeID string) error {
return as.thingsRM.Save(ctx, thingID, nodeID)
}
func (as *adapterService) UpdateThing(ctx context.Context, thingID, nodeID string) error {
return as.thingsRM.Save(ctx, thingID, nodeID)
}
func (as *adapterService) RemoveThing(ctx context.Context, thingID string) error {
return as.thingsRM.Remove(ctx, thingID)
}
func (as *adapterService) CreateChannel(ctx context.Context, chanID, serverURI string) error {
return as.channelsRM.Save(ctx, chanID, serverURI)
}
func (as *adapterService) UpdateChannel(ctx context.Context, chanID, serverURI string) error {
return as.channelsRM.Save(ctx, chanID, serverURI)
}
func (as *adapterService) RemoveChannel(ctx context.Context, chanID string) error {
return as.channelsRM.Remove(ctx, chanID)
}
func (as *adapterService) ConnectThing(ctx context.Context, chanID, thingID string) error {
serverURI, err := as.channelsRM.Get(ctx, chanID)
if err != nil {
return err
}
nodeID, err := as.thingsRM.Get(ctx, thingID)
if err != nil {
return err
}
as.cfg.NodeID = nodeID
as.cfg.ServerURI = serverURI
c := fmt.Sprintf("%s:%s", chanID, thingID)
if err := as.connectRM.Save(ctx, c, c); err != nil {
return err
}
go func() {
if err := as.subscriber.Subscribe(ctx, as.cfg); err != nil {
as.logger.Warn(fmt.Sprintf("subscription failed: %s", err))
}
}()
// Store subscription details
return db.Save(serverURI, nodeID)
}
func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, identifier string) ([]BrowsedNode, error) {
nodeID := fmt.Sprintf("%s;%s", namespace, identifier)
nodes, err := as.browser.Browse(serverURI, nodeID)
if err != nil {
return nil, err
}
return nodes, nil
}
func (as *adapterService) DisconnectThing(ctx context.Context, chanID, thingID string) error {
c := fmt.Sprintf("%s:%s", chanID, thingID)
return as.connectRM.Remove(ctx, c)
}