Skip to content

Commit

Permalink
instance heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
fspmarshall committed Dec 30, 2022
1 parent ff9065b commit ecef346
Show file tree
Hide file tree
Showing 27 changed files with 5,979 additions and 2,454 deletions.
25 changes: 25 additions & 0 deletions api/client/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/gravitational/trace/trail"

"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/internalutils/stream"
"github.com/gravitational/teleport/api/types"
)

// DownstreamInventoryControlStream is the client/agent side of a bidirectional stream established
Expand Down Expand Up @@ -221,6 +223,29 @@ func (c *Client) PingInventory(ctx context.Context, req proto.InventoryPingReque
return *rsp, nil
}

func (c *Client) GetInstances(ctx context.Context, filter types.InstanceFilter) stream.Stream[types.Instance] {
// set up cancelable context so that Stream.Done can close the stream if the caller
// halts early.
ctx, cancel := context.WithCancel(ctx)

instances, err := c.grpc.GetInstances(ctx, &filter, c.callOpts...)
if err != nil {
cancel()
return stream.Fail[types.Instance](trail.FromGRPC(err))
}
return stream.Func[types.Instance](func() (types.Instance, error) {
instance, err := instances.Recv()
if err != nil {
if trace.IsEOF(err) {
// io.EOF signals that stream has completed successfully
return nil, io.EOF
}
return nil, trail.FromGRPC(err)
}
return instance, nil
}, cancel)
}

func newDownstreamInventoryControlStream(stream proto.AuthService_InventoryControlStreamClient, cancel context.CancelFunc) DownstreamInventoryControlStream {
ics := &downstreamICS{
sendC: make(chan upstreamSend),
Expand Down
1,644 changes: 906 additions & 738 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

18 changes: 14 additions & 4 deletions api/proto/teleport/legacy/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1899,10 +1899,11 @@ message UpstreamInventoryHello {
// its auth token. i.e. Services is the subset of SystemRoles that are currently
// active.
repeated string Services = 3 [(gogoproto.casttype) = "github.com/gravitational/teleport/api/types.SystemRole"];

// TODO(fspmarshall): look into what other info can safely be stated here once, instead of
// being repeatedly announced (e.g. addrs, static labels, etc). may be able to achieve a
// non-trivial reduction in network usage by doing this.
// Hostname is the hostname associated with the instance. This value is not required or guaranteed
// to be unique and its validity is not enforceable (i.e. join tokens do not constrian what an
// instance can claim its hostname to be). This value exists only to assist users in correlating
// instance resources with hosts.
string Hostname = 4;
}

// DownstreamInventoryHello is the hello message sent down the inventory control stream.
Expand Down Expand Up @@ -1939,7 +1940,13 @@ message InventoryStatusSummary {
// InventoryPingRequest is used to request that the specified server be sent an inventory ping
// if it has a control stream registered.
message InventoryPingRequest {
// ServerID is the ID of the instance to ping.
string ServerID = 1;

// ControlLog forces the ping to use the standard "commit then act" model of control log synchronization
// for the ping. This significantly increases the amount of time it takes for the ping request to
// complete, but is useful for testing/debugging control log issues.
bool ControlLog = 2;
}

// InventoryPingResponse returns the result of an inventory ping initiated via an
Expand Down Expand Up @@ -1991,6 +1998,9 @@ service AuthService {
// PingInventory attempts to trigger a downstream inventory ping (used in testing/debug).
rpc PingInventory(InventoryPingRequest) returns (InventoryPingResponse);

// GetInstances streams all instances matching the specified filter.
rpc GetInstances(types.InstanceFilter) returns (stream types.InstanceV1);

// GetClusterAlerts loads cluster-level alert messages.
rpc GetClusterAlerts(types.GetClusterAlertsRequest) returns (GetClusterAlertsResponse);

Expand Down
105 changes: 105 additions & 0 deletions api/proto/teleport/legacy/types/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,111 @@ message MySQLOptions {
string ServerVersion = 1 [(gogoproto.jsontag) = "server_version,omitempty"];
}

// InstanceV1 represents the state of a running teleport instance independent
// of the specific services that instance exposes.
message InstanceV1 {
ResourceHeader Header = 1 [
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "",
(gogoproto.embed) = true
];
InstanceSpecV1 Spec = 2 [
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "spec"
];
}

message InstanceSpecV1 {
// Version is the version of teleport this instance most recently advertised.
string Version = 1 [(gogoproto.jsontag) = "version,omitempty"];

// Services is the list of active services this instance most recently advertised.
repeated string Services = 2 [
(gogoproto.casttype) = "SystemRole",
(gogoproto.jsontag) = "services,omitemtpy"
];

// Hostname is the hostname this instance most recently advertised.
string Hostname = 3 [(gogoproto.jsontag) = "hostname,omitempty"];

// AuthID is the ID of the auth server that most recently observed this instance.
string AuthID = 4 [(gogoproto.jsontag) = "auth_id,omitempty"];

// LastSeen is the last time an auth server reported observing this instance.
google.protobuf.Timestamp LastSeen = 5 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "last_seen,omitempty"
];

// ControlLog is the log of recent important instance control events related to this instance. See comments
// on the InstanceControlLogEntry type for details.
repeated InstanceControlLogEntry ControlLog = 6 [
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "control_log,omitempty"
];
}

// InstanceControlLogEntry represents an entry in a given instance's control log. The control log of
// an instance is protected by CompareAndSwap semantics, allowing entries to function as a means of
// synchronization as well as recordkeeping. For example, an auth server intending to trigger an upgrade
// for a given instance can check its control log for 'upgrade-attempt' entries. If no such entry exists,
// it can attempt to write an 'upgrade-attempt' entry of its own. If that entry successfully writes without
// hiting a CompareFailed, the auth server knows that no other auth servers will make concurrent upgrade
// attempts while that entry persists.
//
// NOTE: Due to resource size and backend throughput limitations, care should be taken to minimize the
// use and size of instance control log entries.
//
message InstanceControlLogEntry {
// Type represents the type of control log entry this is (e.g. 'upgrade-attempt').
string Type = 1 [(gogoproto.jsontag) = "type,omitempty"];

// ID is a random identifier used to assist in uniquely identifying entries. This value may
// be unique, or it may be used to associate a collection of related entries (e.g. an upgrade
// attempt entry may use the same ID as an associated upgrade failure entry if appropriate).
uint64 ID = 2 [(gogoproto.jsontag) = "id,omitempty"];

// Time is the time at which the event represented by this entry occurred (used in determining
// ordering and expiry).
google.protobuf.Timestamp Time = 3 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "time,omitempty"
];

// TTL is an optional custom time to live for this control log entry. Some control log entries
// (e.g. an upgrade failure) may require longer than normal TTLs in order to ensure visibility.
// If a log entry's TTL results in it having an intended expiry further in the future than the
// expiry of the enclosing Instance resource, the instance resource's expiry will be bumped
// to accommodate preservation of the log. Because of this fact, custom entry TTLs should be
// used sparingly, as excess usage could result in unexpected backend growth for high churn
// clusters.
int64 TTL = 4 [
(gogoproto.jsontag) = "ttl,omitempty",
(gogoproto.casttype) = "time.Duration"
];

// Labels is an arbitrary collection of key-value pairs. The expected labels are determined by the
// type of the entry. Use of labels is preferable to adding new fields in some cases in order to
// preserve fields across auth downgrades (this is mostly relevant for the version-control system).
map<string, string> Labels = 5 [(gogoproto.jsontag) = "labels,omitempty"];
}

// InstanceFilter matches instance resources.
message InstanceFilter {
// ServerID matches exactly one instance by server ID if specified.
string ServerID = 1;

// Version matches instance version if specified.
string Version = 2;

// Services matches the instance services if specified. Note that this field matches all instances which
// expose *at least* one of the listed services. This is in contrast to service matching in version
// directives which match instances that expose a *at most* the listed services.
repeated string Services = 3 [(gogoproto.casttype) = "SystemRole"];
}

// ServerV2 represents a Node, App, Database, Proxy or Auth server in a Teleport cluster.
message ServerV2 {
option (gogoproto.goproto_stringer) = false;
Expand Down
3 changes: 3 additions & 0 deletions api/types/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ const (
// KindHostCert, this kind is not backed by a real resource.
KindUsageEvent = "usage_event"

// KindInstance represents a teleport instance independent of any specific service.
KindInstance = "instance"

// V5 is the fifth version of resources.
V5 = "v5"

Expand Down
Loading

0 comments on commit ecef346

Please sign in to comment.