-
Notifications
You must be signed in to change notification settings - Fork 0
/
sdk.go
176 lines (145 loc) · 4.53 KB
/
sdk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package cpsdk
import (
"context"
"fmt"
"net/http"
"net/url"
"time"
"github.com/rudderlabs/rudder-cp-sdk/identity"
"github.com/rudderlabs/rudder-cp-sdk/internal/cache"
"github.com/rudderlabs/rudder-cp-sdk/internal/clients/admin"
"github.com/rudderlabs/rudder-cp-sdk/internal/clients/base"
"github.com/rudderlabs/rudder-cp-sdk/internal/clients/namespace"
"github.com/rudderlabs/rudder-cp-sdk/internal/clients/workspace"
"github.com/rudderlabs/rudder-cp-sdk/internal/poller"
"github.com/rudderlabs/rudder-cp-sdk/modelv2"
"github.com/rudderlabs/rudder-cp-sdk/notifications"
"github.com/rudderlabs/rudder-go-kit/logger"
)
const (
defaultBaseUrl = "https://api.rudderstack.com"
defaultBaseUrlV2 = "https://dp.api.rudderstack.com"
)
type ControlPlane struct {
baseUrl *url.URL
baseUrlV2 *url.URL
workspaceIdentity *identity.Workspace
namespaceIdentity *identity.Namespace
adminCredentials *identity.AdminCredentials
httpClient RequestDoer
Client Client
AdminClient *admin.Client
log logger.Logger
configsCache *cache.WorkspaceConfigCache
pollingInterval time.Duration
poller *poller.Poller
pollerStop context.CancelFunc
}
type Client interface {
GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error)
GetCustomWorkspaceConfigs(ctx context.Context, object any) error
GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) (*modelv2.WorkspaceConfigs, error)
}
type RequestDoer interface {
Do(req *http.Request) (*http.Response, error)
}
func New(options ...Option) (*ControlPlane, error) {
url, _ := url.Parse(defaultBaseUrl)
urlV2, _ := url.Parse(defaultBaseUrlV2)
cp := &ControlPlane{
baseUrl: url,
baseUrlV2: urlV2,
log: logger.NOP,
pollingInterval: 1 * time.Second,
configsCache: &cache.WorkspaceConfigCache{},
}
for _, option := range options {
if err := option(cp); err != nil {
return nil, err
}
}
if err := cp.setupClients(); err != nil {
return nil, err
}
if err := cp.setupPoller(); err != nil {
return nil, err
}
return cp, nil
}
func (cp *ControlPlane) setupClients() error {
if cp.httpClient == nil {
cp.httpClient = http.DefaultClient
}
baseClient := &base.Client{HTTPClient: cp.httpClient, BaseURL: cp.baseUrl}
baseClientV2 := &base.Client{HTTPClient: cp.httpClient, BaseURL: cp.baseUrlV2}
// set admin client
if cp.adminCredentials != nil {
cp.AdminClient = &admin.Client{
Client: baseClient,
Username: cp.adminCredentials.AdminUsername,
Password: cp.adminCredentials.AdminPassword,
}
}
// set client based on identity
if cp.workspaceIdentity != nil {
cp.Client = &workspace.Client{
Client: baseClient,
Identity: cp.workspaceIdentity,
}
} else if cp.namespaceIdentity != nil {
cp.Client = &namespace.Client{
Client: baseClientV2,
Identity: cp.namespaceIdentity,
}
} else {
return fmt.Errorf("workspace or namespace identity must be set")
}
return nil
}
func (cp *ControlPlane) setupPoller() error {
if cp.pollingInterval == 0 {
return nil
}
handle := func(wc *modelv2.WorkspaceConfigs) error {
cp.configsCache.Set(wc)
return nil
}
p, err := poller.New(handle,
poller.WithClient(cp.Client),
poller.WithPollingInterval(cp.pollingInterval),
poller.WithLogger(cp.log))
if err != nil {
return err
}
cp.poller = p
ctx, cancel := context.WithCancel(context.Background())
cp.pollerStop = cancel
cp.poller.Start(ctx)
return nil
}
// Close stops any background processes such as polling for workspace configs.
func (cp *ControlPlane) Close() {
if cp.poller != nil {
cp.pollerStop()
}
}
// GetWorkspaceConfigs returns the latest workspace configs.
// If polling is enabled, this will return the cached configs, if they have been retrieved at least once.
// Otherwise, it will make a request to the control plane to get the latest configs.
func (cp *ControlPlane) GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error) {
if cp.poller != nil {
return cp.configsCache.Get(), nil
} else {
return cp.Client.GetWorkspaceConfigs(ctx)
}
}
// GetCustomWorkspaceConfigs streams the JSON payload directly in the target object. It does not support for incremental updates.
func (cp *ControlPlane) GetCustomWorkspaceConfigs(ctx context.Context, object any) error {
return cp.Client.GetCustomWorkspaceConfigs(ctx, object)
}
type Subscriber interface {
Notifications() chan notifications.WorkspaceConfigNotification
}
func (cp *ControlPlane) Subscribe() chan notifications.WorkspaceConfigNotification {
return cp.configsCache.Subscribe()
}