-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.go
80 lines (69 loc) · 1.76 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package main
import (
"context"
"log"
"time"
"github.com/hamba/avro"
"github.com/twmb/franz-go/pkg/kgo"
)
func InsertOrderConsumer() {
opts := []kgo.Opt{
kgo.SeedBrokers(getRedPandaHosts()...),
kgo.ConsumeTopics(
topicOrdersInsertAVRO,
),
}
redPandaClient, err := kgo.NewClient(opts...)
if err != nil {
log.Println(err)
panic(err)
}
startTime := time.Now()
schema := avro.MustParse(schemaOrder)
var counter int64
consumerLoop:
for {
fetches := redPandaClient.PollRecords(context.Background(), 1000000)
for _, fetchErr := range fetches.Errors() {
log.Printf("error consuming from topic: topic=%s, partition=%d, err=%v\n",
fetchErr.Topic, fetchErr.Partition, fetchErr.Err)
break consumerLoop
}
dataRows := make([][]interface{}, 0)
records := fetches.Records()
log.Println("Num. of Records:", len(records))
for _, record := range fetches.Records() {
counter++
// parse avro to struct
var order orderAVRO
err := avro.Unmarshal(schema, record.Value, &order)
if err != nil {
log.Println(err)
continue
}
// prepare data to be ingested
row := []interface{}{
order.ID, // id
order.UserID, // user_id
order.StockCode, // stock_code
"B", // type
order.Lot, // lot
order.Price, // price
order.Status, // status
time.Now(), // created_at
}
dataRows = append(dataRows, row)
// if counter == 1000000 {
// log.Printf("%d %+v", counter, order)
// log.Println("Insert Order Speed:", time.Since(startTime).Nanoseconds())
// }
}
// ingest / copy
err = copyOrders(context.Background(), dataRows)
if err != nil {
log.Println(err)
return
}
log.Println("Insert Order Speed:", time.Since(startTime).Milliseconds())
}
}