Skip to content

Commit 2980ed0

Browse files
authored
Use query param to intake API to signal that function completed (#82)
* Signal that function is complete if query param flushed=true is received * Update comment * Check length of query param slice before accessing index * Use three channels for coordination * Minor updates to comments
1 parent 1903769 commit 2980ed0

File tree

3 files changed

+134
-29
lines changed

3 files changed

+134
-29
lines changed

apm-lambda-extension/extension/http_server_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
package extension
1919

2020
import (
21+
"bytes"
2122
"io/ioutil"
2223
"net"
2324
"net/http"
2425
"net/http/httptest"
2526
"strings"
2627
"testing"
28+
"time"
2729

2830
"gotest.tools/assert"
2931
)
@@ -165,3 +167,94 @@ func Test_handleInfoRequest(t *testing.T) {
165167
assert.Equal(t, 202, resp.StatusCode)
166168
}
167169
}
170+
171+
func Test_handleIntakeV2EventsQueryParam(t *testing.T) {
172+
body := []byte(`{"metadata": {}`)
173+
174+
AgentDoneSignal = make(chan struct{})
175+
176+
// Create apm server and handler
177+
apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
178+
}))
179+
defer apmServer.Close()
180+
181+
// Create extension config and start the server
182+
dataChannel := make(chan AgentData, 100)
183+
config := extensionConfig{
184+
apmServerUrl: apmServer.URL,
185+
dataReceiverServerPort: ":1234",
186+
dataReceiverTimeoutSeconds: 15,
187+
}
188+
189+
StartHttpServer(dataChannel, &config)
190+
defer agentDataServer.Close()
191+
192+
hosts, _ := net.LookupHost("localhost")
193+
url := "http://" + hosts[0] + ":1234/intake/v2/events?flushed=true"
194+
195+
// Create a request to send to the extension
196+
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
197+
if err != nil {
198+
t.Logf("Could not create request")
199+
}
200+
201+
// Send the request to the extension
202+
client := &http.Client{}
203+
go func() {
204+
_, err := client.Do(req)
205+
if err != nil {
206+
t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err)
207+
t.Fail()
208+
}
209+
}()
210+
211+
timer := time.NewTimer(1 * time.Second)
212+
defer timer.Stop()
213+
214+
select {
215+
case <-AgentDoneSignal:
216+
<-dataChannel
217+
case <-timer.C:
218+
t.Log("Timed out waiting for server to send FuncDone signal")
219+
t.Fail()
220+
}
221+
}
222+
223+
func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) {
224+
body := []byte(`{"metadata": {}`)
225+
226+
// Create apm server and handler
227+
apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
228+
}))
229+
defer apmServer.Close()
230+
231+
// Create extension config and start the server
232+
dataChannel := make(chan AgentData, 100)
233+
config := extensionConfig{
234+
apmServerUrl: apmServer.URL,
235+
dataReceiverServerPort: ":1234",
236+
dataReceiverTimeoutSeconds: 15,
237+
}
238+
239+
StartHttpServer(dataChannel, &config)
240+
defer agentDataServer.Close()
241+
242+
hosts, _ := net.LookupHost("localhost")
243+
url := "http://" + hosts[0] + ":1234/intake/v2/events"
244+
245+
// Create a request to send to the extension
246+
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
247+
if err != nil {
248+
t.Logf("Could not create request")
249+
}
250+
251+
// Send the request to the extension
252+
client := &http.Client{}
253+
resp, err := client.Do(req)
254+
if err != nil {
255+
t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err)
256+
t.Fail()
257+
}
258+
<-dataChannel
259+
assert.Equal(t, 202, resp.StatusCode)
260+
}

apm-lambda-extension/extension/route_handlers.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ type AgentData struct {
2929
ContentEncoding string
3030
}
3131

32+
var AgentDoneSignal chan struct{}
33+
3234
// URL: http://server/
3335
func handleInfoRequest(apmServerUrl string) func(w http.ResponseWriter, r *http.Request) {
3436
return func(w http.ResponseWriter, r *http.Request) {
@@ -97,5 +99,9 @@ func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWrit
9799
}
98100
log.Println("Adding agent data to buffer to be sent to apm server")
99101
agentDataChan <- agentData
102+
103+
if len(r.URL.Query()["flushed"]) > 0 && r.URL.Query()["flushed"][0] == "true" {
104+
AgentDoneSignal <- struct{}{}
105+
}
100106
}
101107
}

apm-lambda-extension/main.go

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,17 @@ func main() {
6060
// pulls ELASTIC_ env variable into globals for easy access
6161
config := extension.ProcessEnv()
6262

63-
// setup http server to receive data from agent
64-
// and get a channel to listen for that data
63+
// Create a channel to buffer apm agent data
6564
agentDataChannel := make(chan extension.AgentData, 100)
6665

66+
// Start http server to receive data from agent
6767
extension.StartHttpServer(agentDataChannel, config)
6868

69+
// Create a client to use for sending data to the apm server
70+
client := &http.Client{
71+
Transport: http.DefaultTransport.(*http.Transport).Clone(),
72+
}
73+
6974
// Make channel for collecting logs and create a HTTP server to listen for them
7075
logsChannel := make(chan logsapi.LogEvent)
7176

@@ -74,7 +79,7 @@ func main() {
7479
extensionClient.ExtensionID,
7580
[]logsapi.EventType{logsapi.Platform})
7681
if err != nil {
77-
log.Printf("Could not subscribe to the logs API. Will instead flush APM data 100ms before the function deadline.")
82+
log.Printf("Could not subscribe to the logs API.")
7883
} else {
7984
logsAPIListener, err := logsapi.NewLogsAPIHttpListener(logsChannel)
8085
if err != nil {
@@ -88,9 +93,6 @@ func main() {
8893
}
8994
}
9095

91-
client := &http.Client{
92-
Transport: http.DefaultTransport.(*http.Transport).Clone(),
93-
}
9496
for {
9597
select {
9698
case <-ctx.Done():
@@ -107,31 +109,32 @@ func main() {
107109
}
108110
log.Printf("Received event: %v\n", extension.PrettyPrint(event))
109111

112+
// Make a channel for signaling that we received the agent flushed signal
113+
extension.AgentDoneSignal = make(chan struct{})
114+
// Make a channel for signaling that we received the runtimeDone logs API event
115+
runtimeDoneSignal := make(chan struct{})
116+
// Make a channel for signaling that the function invocation is complete
117+
funcDone := make(chan struct{})
118+
119+
// Flush any APM data, in case waiting for the agentDone or runtimeDone signals
120+
// timed out, the agent data wasn't available yet, and we got to the next event
121+
extension.FlushAPMData(client, agentDataChannel, config)
122+
110123
// A shutdown event indicates the execution environment is shutting down.
111124
// This is usually due to inactivity.
112125
if event.EventType == extension.Shutdown {
113126
extension.ProcessShutdown()
114127
return
115128
}
116129

117-
// Flush any APM data, in case waiting for the runtimeDone event timed out,
118-
// the agent data wasn't available yet, and we got to the next event
119-
extension.FlushAPMData(client, agentDataChannel, config)
120-
121-
// Make a channel for signaling that a runtimeDone event has been received
122-
runtimeDone := make(chan struct{})
123-
124-
// Make a channel for signaling that that function invocation has completed
125-
funcInvocDone := make(chan struct{})
126-
127130
// Receive agent data as it comes in and post it to the APM server.
128131
// Stop checking for, and sending agent data when the function invocation
129132
// has completed, signaled via a channel.
130133
go func() {
131134
for {
132135
select {
133-
case <-funcInvocDone:
134-
log.Println("Function invocation is complete, not receiving any more agent data")
136+
case <-funcDone:
137+
log.Println("funcDone signal received, not processing any more agent data")
135138
return
136139
case agentData := <-agentDataChannel:
137140
err := extension.PostToApmServer(client, agentData, config)
@@ -143,21 +146,21 @@ func main() {
143146
}()
144147

145148
// Receive Logs API events
146-
// Send to the runtimeDone channel to signal when a runtimeDone event is received
149+
// Send to the runtimeDoneSignal channel to signal when a runtimeDone event is received
147150
go func() {
148151
for {
149152
select {
150-
case <-funcInvocDone:
151-
log.Println("Function invocation is complete, not receiving any more log events")
153+
case <-funcDone:
154+
log.Println("funcDone signal received, not processing any more log events")
152155
return
153156
case logEvent := <-logsChannel:
154157
log.Printf("Received log event %v\n", logEvent.Type)
155158
// Check the logEvent for runtimeDone and compare the RequestID
156159
// to the id that came in via the Next API
157160
if logsapi.SubEventType(logEvent.Type) == logsapi.RuntimeDone {
158161
if logEvent.Record.RequestId == event.RequestID {
159-
log.Printf("Received runtimeDone event %v", logEvent)
160-
runtimeDone <- struct{}{}
162+
log.Println("Received runtimeDone event for this function invocation")
163+
runtimeDoneSignal <- struct{}{}
161164
return
162165
} else {
163166
log.Println("Log API runtimeDone event request id didn't match")
@@ -167,7 +170,7 @@ func main() {
167170
}
168171
}()
169172

170-
// Calculate how long to wait for a runtimeDone event
173+
// Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal
171174
flushDeadlineMs := event.DeadlineMs - 100
172175
durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0))
173176

@@ -176,17 +179,20 @@ func main() {
176179
defer timer.Stop()
177180

178181
select {
179-
case <-runtimeDone:
180-
log.Println("Received runtimeDone event signal")
182+
case <-extension.AgentDoneSignal:
183+
log.Println("Received agent done signal")
184+
case <-runtimeDoneSignal:
185+
log.Println("Received runtimeDone signal")
181186
case <-timer.C:
182-
log.Println("Time expired waiting for runtimeDone event")
187+
log.Println("Time expired waiting for agent signal or runtimeDone event")
183188
}
184189

185190
// Flush APM data now that the function invocation has completed
186191
extension.FlushAPMData(client, agentDataChannel, config)
187192

188-
// Signal that the function invocation has completed
189-
close(funcInvocDone)
193+
close(funcDone)
194+
close(runtimeDoneSignal)
195+
close(extension.AgentDoneSignal)
190196
}
191197
}
192198
}

0 commit comments

Comments
 (0)