Skip to content

Commit

Permalink
introduce nats client library (ncl) (#4257)
Browse files Browse the repository at this point in the history
# Bacalhau NCL (NATS Client Library)

## Overview

The NCL (NATS Client Library) is an internal library for Bacalhau,
designed to provide reliable, scalable, and efficient communication
between orchestrator and compute nodes. It leverages NATS for messaging
and implements an event-driven architecture with support for
asynchronous communication, granular event logging, and robust state
management.

## Key Components

1. **EnvelopedRawMessageSerDe**: Handles serialization and
deserialization of RawMessages with versioning and CRC checks.
2. **MessageSerDeRegistry**: Manages serialization and deserialization
of different message types.
3. **Publisher**: Handles asynchronous message publishing.
4. **Subscriber**: Manages message consumption and processing.
5. **MessageHandler**: Interface for processing received messages.
6. **MessageFilter**: Interface for filtering incoming messages.
7. **Checkpointer**: Interface for managing checkpoints in message
processing.

## Technical Details

### Message Flow

1. **Publishing**:
- The publisher accepts a `Message` struct through its `Publish` method.
- The `MessageSerDeRegistry` serializes the `Message` into a
`RawMessage` using the appropriate `MessageSerDe` for the message type.
- The `EnvelopedRawMessageSerDe` serializes the `RawMessage` into a byte
slice with an envelope containing a version byte and a CRC checksum.
- The serialized message is published to NATS using the configured
subject.

2. **Subscribing**:
   - The subscriber sets up a NATS subscription for specified subjects.
- When a message is received, it's passed to the `processMessage`
method.
- The `EnvelopedRawMessageSerDe` deserializes the raw bytes into a
`RawMessage`. The envelope version helps determine the deserialization
method, and the CRC checksum is used to verify the message integrity.
- The message filter is applied to determine if the message should be
processed.
- The `MessageSerDeRegistry` deserializes the `RawMessage` into a
`Message` using the appropriate `MessageSerDe` for the message type.
- The deserialized `Message` is passed to each configured
`MessageHandler`.

### Serialization/Deserialization (SerDe) Flow

1. **Message to bytes (for sending)**:
`Message` -> `MessageSerDe.Serialize()` -> `RawMessage` ->
`EnvelopedRawMessageSerDe.Serialize()` -> `[]byte`

2. **Bytes to Message (when receiving)**:
`[]byte` -> `EnvelopedRawMessageSerDe.Deserialize()` -> `RawMessage` ->
`MessageSerDe.Deserialize()` -> `Message`

### EnvelopedRawMessageSerDe

The `EnvelopedRawMessageSerDe` adds a version byte and a CRC checksum to
each serialized `RawMessage`. The envelope structure is as follows:

```
+----------------+----------------+--------------------+
| Version (1 byte)| CRC (4 bytes) | Serialized Message |
+----------------+----------------+--------------------+
```

This allows for future extensibility, backward compatibility, and data
integrity verification.

### MessageSerDeRegistry

The `MessageSerDeRegistry` manages the serialization and deserialization
of different message types. It allows registering custom message types
with unique names and provides methods for serializing and deserializing
messages.

Key methods:
- `Register(name string, messageType any, serde MessageSerDe) error`
- `Serialize(message *Message) (*RawMessage, error)`
- `Deserialize(rawMessage *RawMessage) (*Message, error)`

### Publisher

The `publisher` struct handles message publishing. It supports
asynchronous publishing and can be configured with options like message
serializer, MessageSerDeRegistry, and destination subject or prefix.

Key method:
- `Publish(ctx context.Context, message *Message) error`

### Subscriber

The `subscriber` struct manages message consumption. It sets up NATS
subscriptions, processes incoming messages, and routes them to the
appropriate message handlers.

Key methods:
- `Subscribe(subjects ...string) error`
- `Close(ctx context.Context) error`

## Usage Within Bacalhau

This library is designed to be used internally within the Bacalhau
project. It should be integrated into the orchestrator and compute node
components to handle all inter-node communication.

Example integration points:
1. Job assignment from orchestrator to compute nodes
2. Status updates from compute nodes to orchestrator
3. Heartbeat messages for health monitoring

**This PR only migrated heartbeats to NCL**

## Remaining Work

A lot of work remains to support progress checkpointing, event
sequencing for optimistic concurrency, demuxing of subscribers, and
migration of existing Bacalhau internal APIs. Those will be handled in
future PRs

## Reference

https://www.notion.so/expanso/Reliable-Orchestration-Design-397083a957794668adb15322553e6652?pvs=4
  • Loading branch information
wdbaruni authored Jul 29, 2024
1 parent 50628bd commit 16538b3
Show file tree
Hide file tree
Showing 56 changed files with 3,359 additions and 506 deletions.
3 changes: 1 addition & 2 deletions pkg/bidstrategy/semantic/provider_installed.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/bacalhau-project/bacalhau/pkg/bidstrategy"
"github.com/bacalhau-project/bacalhau/pkg/lib/provider"
"github.com/bacalhau-project/bacalhau/pkg/lib/validate"
"github.com/bacalhau-project/bacalhau/pkg/models"
)

Expand All @@ -24,7 +23,7 @@ func NewProviderInstalledStrategy[P provider.Providable](
getter: func(j *models.Job) []string {
// handle optional specs, such as publisher
key := strings.TrimSpace(getter(j))
if validate.IsBlank(key) {
if len(key) == 0 {
return []string{}
}
return []string{key}
Expand Down
3 changes: 0 additions & 3 deletions pkg/compute/management_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,6 @@ func (m *ManagementClient) Start(ctx context.Context) {
heartbeatTicker.Stop()
resourceTicker.Stop()
infoTicker.Stop()

// Close the heartbeat client and it's resources
m.heartbeatClient.Close(ctx)
}()

// Send an initial heartbeat when we start up
Expand Down
3 changes: 1 addition & 2 deletions pkg/executor/docker/models/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/fatih/structs"

"github.com/bacalhau-project/bacalhau/pkg/lib/validate"
"github.com/bacalhau-project/bacalhau/pkg/models"
)

Expand All @@ -27,7 +26,7 @@ type EngineSpec struct {
}

func (c EngineSpec) Validate() error {
if validate.IsBlank(c.Image) {
if len(c.Image) == 0 {
return fmt.Errorf("invalid docker engine param: 'Image' cannot be empty")
}
if c.WorkingDirectory != "" {
Expand Down
86 changes: 86 additions & 0 deletions pkg/lib/ncl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Bacalhau NCL (NATS Client Library)

## Overview

The NCL (NATS Client Library) is an internal library for Bacalhau, designed to provide reliable, scalable, and efficient communication between orchestrator and compute nodes. It leverages NATS for messaging and implements an event-driven architecture with support for asynchronous communication, granular event logging, and robust state management.

## Key Components

1. **EnvelopedRawMessageSerDe**: Handles serialization and deserialization of RawMessages with versioning and CRC checks.
2. **MessageSerDeRegistry**: Manages serialization and deserialization of different message types.
3. **Publisher**: Handles asynchronous message publishing.
4. **Subscriber**: Manages message consumption and processing.
5. **MessageHandler**: Interface for processing received messages.
6. **MessageFilter**: Interface for filtering incoming messages.
7. **Checkpointer**: Interface for managing checkpoints in message processing.

## Technical Details

### Message Flow

1. **Publishing**:
- The publisher accepts a `Message` struct through its `Publish` method.
- The `MessageSerDeRegistry` serializes the `Message` into a `RawMessage` using the appropriate `MessageSerDe` for the message type.
- The `EnvelopedRawMessageSerDe` serializes the `RawMessage` into a byte slice with an envelope containing a version byte and a CRC checksum.
- The serialized message is published to NATS using the configured subject.

2. **Subscribing**:
- The subscriber sets up a NATS subscription for specified subjects.
- When a message is received, it's passed to the `processMessage` method.
- The `EnvelopedRawMessageSerDe` deserializes the raw bytes into a `RawMessage`. The envelope version helps determine the deserialization method, and the CRC checksum is used to verify the message integrity.
- The message filter is applied to determine if the message should be processed.
- The `MessageSerDeRegistry` deserializes the `RawMessage` into a `Message` using the appropriate `MessageSerDe` for the message type.
- The deserialized `Message` is passed to each configured `MessageHandler`.

### Serialization/Deserialization (SerDe) Flow

1. **Message to bytes (for sending)**:
`Message` -> `MessageSerDe.Serialize()` -> `RawMessage` -> `EnvelopedRawMessageSerDe.Serialize()` -> `[]byte`

2. **Bytes to Message (when receiving)**:
`[]byte` -> `EnvelopedRawMessageSerDe.Deserialize()` -> `RawMessage` -> `MessageSerDe.Deserialize()` -> `Message`

### EnvelopedRawMessageSerDe

The `EnvelopedRawMessageSerDe` adds a version byte and a CRC checksum to each serialized `RawMessage`. The envelope structure is as follows:

```
+----------------+----------------+--------------------+
| Version (1 byte)| CRC (4 bytes) | Serialized Message |
+----------------+----------------+--------------------+
```

This allows for future extensibility, backward compatibility, and data integrity verification.

### MessageSerDeRegistry

The `MessageSerDeRegistry` manages the serialization and deserialization of different message types. It allows registering custom message types with unique names and provides methods for serializing and deserializing messages.

Key methods:
- `Register(name string, messageType any, serde MessageSerDe) error`
- `Serialize(message *Message) (*RawMessage, error)`
- `Deserialize(rawMessage *RawMessage) (*Message, error)`

### Publisher

The `publisher` struct handles message publishing. It supports asynchronous publishing and can be configured with options like message serializer, MessageSerDeRegistry, and destination subject or prefix.

Key method:
- `Publish(ctx context.Context, message *Message) error`

### Subscriber

The `subscriber` struct manages message consumption. It sets up NATS subscriptions, processes incoming messages, and routes them to the appropriate message handlers.

Key methods:
- `Subscribe(subjects ...string) error`
- `Close(ctx context.Context) error`

## Usage Within Bacalhau

This library is designed to be used internally within the Bacalhau project. It should be integrated into the orchestrator and compute node components to handle all inter-node communication.

Example integration points:
1. Job assignment from orchestrator to compute nodes
2. Status updates from compute nodes to orchestrator
3. Heartbeat messages for health monitoring
13 changes: 13 additions & 0 deletions pkg/lib/ncl/checkpointer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ncl

// NoopCheckpointer is a Checkpointer that does nothing
type NoopCheckpointer struct{}

// Checkpoint does nothing
func (n *NoopCheckpointer) Checkpoint(_ *Message) error { return nil }

// GetLastCheckpoint returns 0
func (n *NoopCheckpointer) GetLastCheckpoint() (int64, error) { return 0, nil }

// compile time check for interface conformance
var _ Checkpointer = &NoopCheckpointer{}
140 changes: 140 additions & 0 deletions pkg/lib/ncl/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package ncl

import (
"fmt"
)

// ErrUnsupportedEncoding is returned when an unsupported encoding is encountered.
type ErrUnsupportedEncoding struct {
Encoding string
}

// NewErrUnsupportedEncoding creates a new ErrUnsupportedEncoding error.
func NewErrUnsupportedEncoding(encoding string) *ErrUnsupportedEncoding {
return &ErrUnsupportedEncoding{Encoding: encoding}
}

// Error implements the error interface for ErrUnsupportedEncoding.
func (e *ErrUnsupportedEncoding) Error() string {
return fmt.Sprintf("unsupported encoding: %s", e.Encoding)
}

// ErrUnsupportedMessageType is returned when an unsupported message type is encountered.
type ErrUnsupportedMessageType struct {
Type string
}

// NewErrUnsupportedMessageType creates a new ErrUnsupportedMessageType error.
func NewErrUnsupportedMessageType(messageType string) *ErrUnsupportedMessageType {
return &ErrUnsupportedMessageType{Type: messageType}
}

// Error implements the error interface for ErrUnsupportedMessageType.
func (e *ErrUnsupportedMessageType) Error() string {
return fmt.Sprintf("unsupported message type: %s", e.Type)
}

// ErrBadMessage is returned when a message is malformed or invalid.
type ErrBadMessage struct {
Reason string
}

// NewErrBadMessage creates a new ErrBadMessage error.
func NewErrBadMessage(reason string) *ErrBadMessage {
return &ErrBadMessage{Reason: reason}
}

// Error implements the error interface for ErrBadMessage.
func (e *ErrBadMessage) Error() string {
return fmt.Sprintf("bad message: %s", e.Reason)
}

// ErrBadPayload is returned when a payload is malformed or invalid.
type ErrBadPayload struct {
Reason string
}

// NewErrBadPayload creates a new ErrBadPayload error.
func NewErrBadPayload(reason string) *ErrBadPayload {
return &ErrBadPayload{Reason: reason}
}

// Error implements the error interface for ErrBadPayload.
func (e *ErrBadPayload) Error() string {
return fmt.Sprintf("bad payload: %s", e.Reason)
}

// ErrSerializationFailed is returned when serialization fails.
type ErrSerializationFailed struct {
Encoding string
Err error
}

// NewErrSerializationFailed creates a new ErrSerializationFailed error.
func NewErrSerializationFailed(encoding string, err error) error {
return &ErrSerializationFailed{Encoding: encoding, Err: err}
}

// Error implements the error interface for ErrSerializationFailed.
func (e *ErrSerializationFailed) Error() string {
return fmt.Sprintf("failed to serialize with encoding %s: %v", e.Encoding, e.Err)
}

// Unwrap returns the underlying error for ErrSerializationFailed.
func (e *ErrSerializationFailed) Unwrap() error {
return e.Err
}

// ErrDeserializationFailed is returned when payload deserialization fails.
type ErrDeserializationFailed struct {
Encoding string
Err error
}

// NewErrDeserializationFailed creates a new ErrDeserializationFailed error.
func NewErrDeserializationFailed(encoding string, err error) error {
return &ErrDeserializationFailed{Encoding: encoding, Err: err}
}

// Error implements the error interface for ErrDeserializationFailed.
func (e *ErrDeserializationFailed) Error() string {
return fmt.Sprintf("failed to deserialize with encoding %s: %v", e.Encoding, e.Err)
}

// Unwrap returns the underlying error for ErrDeserializationFailed.
func (e *ErrDeserializationFailed) Unwrap() error {
return e.Err
}

// ErrUnexpectedPayloadType is returned when the payload type is unexpected.
type ErrUnexpectedPayloadType struct {
Expected string
Actual string
}

// NewErrUnexpectedPayloadType creates a new ErrUnexpectedPayloadType error.
func NewErrUnexpectedPayloadType(expected, actual string) error {
return &ErrUnexpectedPayloadType{Expected: expected, Actual: actual}
}

// Error implements the error interface for ErrUnexpectedPayloadType.
func (e *ErrUnexpectedPayloadType) Error() string {
return fmt.Sprintf("unexpected payload type: expected %s, got %s", e.Expected, e.Actual)
}

// Ensure all custom error types implement the error interface.
var (
_ error = (*ErrUnsupportedEncoding)(nil)
_ error = (*ErrUnsupportedMessageType)(nil)
_ error = (*ErrBadMessage)(nil)
_ error = (*ErrBadPayload)(nil)
_ error = (*ErrSerializationFailed)(nil)
_ error = (*ErrDeserializationFailed)(nil)
_ error = (*ErrUnexpectedPayloadType)(nil)
)

// Ensure error types that wrap other errors implement the unwrap interface.
var (
_ interface{ Unwrap() error } = (*ErrSerializationFailed)(nil)
_ interface{ Unwrap() error } = (*ErrDeserializationFailed)(nil)
)
12 changes: 12 additions & 0 deletions pkg/lib/ncl/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package ncl

// NoopMessageFilter is a no-op message filter
type NoopMessageFilter struct{}

// ShouldFilter always returns false
func (n NoopMessageFilter) ShouldFilter(_ *Metadata) bool {
return false
}

// compile time check for the NoopMessageFilter interface
var _ MessageFilter = NoopMessageFilter{}
Loading

0 comments on commit 16538b3

Please sign in to comment.