From 77eca93c9dc74e7aa89739cbc129fd041ec962ae Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Wed, 13 Nov 2024 16:15:10 -0500 Subject: [PATCH] first iteration of work on the subordinate process framework (#789) --- cmd/zrok/subordinate/tail.go | 85 ++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 cmd/zrok/subordinate/tail.go diff --git a/cmd/zrok/subordinate/tail.go b/cmd/zrok/subordinate/tail.go new file mode 100644 index 00000000..a9f2a56a --- /dev/null +++ b/cmd/zrok/subordinate/tail.go @@ -0,0 +1,85 @@ +package subordinate + +import ( + "bytes" + "encoding/json" + "github.com/sirupsen/logrus" + "strings" +) + +const ( + MessageKey = "msg" + RawMessage = "raw" + BootMessage = "boot" +) + +type Message map[string]interface{} + +type MessageHandler struct { + BootHandler func(Message) + MessageHandler func(Message) + MalformedHandler func(Message) + readBuffer bytes.Buffer + booted bool + bootComplete chan struct{} + bootErr error +} + +func NewMessageHandler() *MessageHandler { + return &MessageHandler{ + bootComplete: make(chan struct{}), + } +} + +func (h *MessageHandler) Tail(data []byte) { + defer func() { + if r := recover(); r != nil { + logrus.Errorf("recovered: %v", r) + } + }() + + h.readBuffer.Write(data) + if line, err := h.readBuffer.ReadString('\n'); err == nil { + line = strings.Trim(line, "\n \t") + msg := make(map[string]interface{}) + if !h.booted { + if line[0] == '{' { + if err := json.Unmarshal([]byte(line), &msg); err == nil { + if v, found := msg[MessageKey]; found { + if vStr, ok := v.(string); ok { + if vStr == BootMessage { + h.BootHandler(msg) + h.booted = true + close(h.bootComplete) + } else { + h.MessageHandler(msg) + } + } else { + h.MalformedHandler(msg) + } + } else { + h.MalformedHandler(msg) + } + } else { + msg[MessageKey] = RawMessage + msg[RawMessage] = line + h.MessageHandler(msg) + } + } else { + msg[MessageKey] = RawMessage + msg[RawMessage] = line + h.MessageHandler(msg) + } + } else { + if line[0] == '{' { + if err := json.Unmarshal([]byte(line), &msg); err != nil { + logrus.Error(line) + } + } else { + msg[MessageKey] = RawMessage + msg[RawMessage] = line + } + h.MessageHandler(msg) + } + } +}