Skip to content

Commit

Permalink
WIP: use stable client indexes to pick target vtgates
Browse files Browse the repository at this point in the history
Signed-off-by: Henry Robinson <[email protected]>
  • Loading branch information
henryr committed Jan 21, 2025
1 parent 0b70c76 commit 35e4285
Show file tree
Hide file tree
Showing 2 changed files with 253 additions and 2 deletions.
70 changes: 68 additions & 2 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"math/rand"
"net/http"
"os"
"sort"
"sync"
Expand Down Expand Up @@ -319,11 +320,16 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) {
// target lock and then comparing the previous targets with the current
// targets and only resetting pools which disappear.
targetCount.ResetAll()

idx := getClientIndex()
for poolType := range targets {
b.sorter.shuffleSort(targets[poolType], b.affinityField, b.affinityValue)
if len(targets[poolType]) > *numConnections {
targets[poolType] = targets[poolType][:b.numConnections]
targetPool := []targetHost{}
for i := 0; i < 4; i++ {
targetPool = append(targetPool, targets[poolType][(idx+i)%len(targets[poolType])])
}
log.Infof("My index is: %d, picking %+v", idx, targetPool)
targets[poolType] = targetPool
}
targetCount.Set(poolType, int64(len(targets[poolType])))
}
Expand Down Expand Up @@ -393,6 +399,66 @@ func (s *shuffleSorter) shuffleSort(targets []targetHost, affinityField, affinit
}
}

func getWebappDiscovery() *RotorJson {
payload := `{"node":{"cluster":"webapp"}, "resource_names":["hhvm-metrics@dev-us-east-1"]}`
res, err := http.Post("http://rotor-http-dev-us-east-1.internal.ec2.tinyspeck.com:50001/v3/discovery:endpoints", "application/json", bytes.NewBuffer([]byte(payload)))

Check failure on line 404 in go/vt/vtgateproxy/discovery.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

response body must be closed (bodyclose)

Check failure on line 404 in go/vt/vtgateproxy/discovery.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

response body must be closed (bodyclose)
if err != nil {
return nil
}
// fmt.Println("Non-nil response")
if res.StatusCode != 200 {
return nil
}
// fmt.Println("200 response")
body, err := io.ReadAll(res.Body)
if err != nil {
return nil
}
// fmt.Printf("Unmarshalling %+v\n", string(body))
result := RotorJson{}
json.Unmarshal(body, &result)

Check failure on line 419 in go/vt/vtgateproxy/discovery.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

Error return value of `json.Unmarshal` is not checked (errcheck)

Check failure on line 419 in go/vt/vtgateproxy/discovery.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

Error return value of `json.Unmarshal` is not checked (errcheck)
// fmt.Printf("%+v\n", result)
return &result
}

func getHosts() []string {
discovery := getWebappDiscovery()
nodenames := []string{}
for _, r := range discovery.Resources {
for _, e := range r.Endpoints {
for _, lb := range e.LBEndpoints {
node := lb.Metadata.FilterMetadata.EnvoyLB
nodenames = append(nodenames, node.NodeName)
fmt.Printf("Nodename: %+v\n", node.NodeName)
}
}
}

return nodenames
}

func getClientIndex() int {
nodenames := getHosts()
sort.Strings(nodenames)
hostname, err := os.Hostname()
fmt.Printf("Hostname: %s\n", hostname)
if err != nil {
return -1
}

for i, n := range nodenames {
if n == hostname {
return i
}
}

return -1
}

func DoTest() int {
return getClientIndex()
}

// Update the current list of hosts for the given resolver
func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) error {
log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections)
Expand Down
185 changes: 185 additions & 0 deletions go/vt/vtgateproxy/rotor_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package vtgateproxy

type RotorJson struct {
VersionInfo string `json:"version_info"`
Resources []Resource `json:"resources"`
TypeURL string `json:"type_url"`
ControlPlane ControlPlane `json:"control_plane"`
}

type ControlPlane struct {
Identifier string `json:"identifier"`
}

type Resource struct {
Type string `json:"@type"`
ClusterName string `json:"cluster_name"`
Endpoints []EndpointElement `json:"endpoints"`
}

type EndpointElement struct {
LBEndpoints []LBEndpoint `json:"lb_endpoints"`
}

type LBEndpoint struct {
Endpoint LBEndpointEndpoint `json:"endpoint"`
HealthStatus HealthStatus `json:"health_status"`
Metadata Metadata `json:"metadata"`
}

type LBEndpointEndpoint struct {
Address Address `json:"address"`
}

type Address struct {
SocketAddress SocketAddress `json:"socket_address"`
}

type SocketAddress struct {
Address string `json:"address"`
PortValue int64 `json:"port_value"`
}

type Metadata struct {
FilterMetadata FilterMetadata `json:"filter_metadata"`
}

type FilterMetadata struct {
EnvoyLB EnvoyLB `json:"envoy.lb"`
}

type EnvoyLB struct {
Consul Consul `json:"consul"`
NodeAddress string `json:"node-address"`
NodeHealth NodeHealth `json:"node-health"`
NodeID string `json:"node-id"`
NodeName string `json:"node-name"`
NodeASG ASG `json:"node:asg"`
NodeAz NodeAz `json:"node:az"`
NodeAzID AzID `json:"node:az_id"`
NodeConsulNetworkSegment string `json:"node:consul-network-segment"`
NodeConsulVersion ConsulVersion `json:"node:consul-version"`
NodeEnv Env `json:"node:env"`
NodeLSBRelease string `json:"node:lsb_release"`
NodeNebulaAddress string `json:"node:nebula_address"`
NodeOmniServiceID string `json:"node:omni_service_id"`
NodePlatform Platform `json:"node:platform"`
NodeProvider Provider `json:"node:provider"`
NodeRegion Region `json:"node:region"`
NodeRole Role `json:"node:role"`
TagDatacenter TagDatacenterEnum `json:"tag:datacenter"`
TagLegacyDc TagDatacenterEnum `json:"tag:legacy_dc"`
}

type Consul struct {
NodeMeta NodeMeta `json:"nodeMeta"`
Tag Tag `json:"tag"`
}

type NodeMeta struct {
ASG ASG `json:"asg"`
Az NodeAz `json:"az"`
AzID AzID `json:"az_id"`
ConsulNetworkSegment string `json:"consul-network-segment"`
ConsulVersion ConsulVersion `json:"consul-version"`
Env Env `json:"env"`
LSBRelease string `json:"lsb_release"`
NebulaAddress string `json:"nebula_address"`
OmniServiceID string `json:"omni_service_id"`
Platform Platform `json:"platform"`
Provider Provider `json:"provider"`
Region Region `json:"region"`
Role Role `json:"role"`
}

type Tag struct {
DatacenterDevUsEast1 string `json:"datacenter:dev-us-east-1"`
LegacyDcDevUsEast1 string `json:"legacy_dc:dev-us-east-1"`
}

type HealthStatus string

const (
Healthy HealthStatus = "HEALTHY"
)

type ASG string

const (
ASGRdevCanvasWebapp ASG = "rdev-canvas-webapp"
DevContainersPoolLargeWhitecastle ASG = "dev-containers-pool-large-whitecastle"
DevContainersPoolTestWhitecastle ASG = "dev-containers-pool-test-whitecastle"
DevContainersPoolWhitecastle ASG = "dev-containers-pool-whitecastle"
Empty ASG = ""
QAEnvContainerPoolWc ASG = "qa-env-container-pool-wc"
QAEnvSandboxContainerPoolWc ASG = "qa-env-sandbox-container-pool-wc"
)

type NodeAz string

const (
UsEast1A NodeAz = "us-east-1a"
UsEast1B NodeAz = "us-east-1b"
UsEast1C NodeAz = "us-east-1c"
UsEast1D NodeAz = "us-east-1d"
)

type AzID string

const (
Use1Az1 AzID = "use1-az1"
Use1Az2 AzID = "use1-az2"
Use1Az4 AzID = "use1-az4"
Use1Az6 AzID = "use1-az6"
)

type ConsulVersion string

const (
The1201Slack16 ConsulVersion = "1.20.1+slack.16"
)

type Env string

const (
Dev Env = "dev"
)

type Platform string

const (
Chef Platform = "chef"
)

type Provider string

const (
Ec2 Provider = "ec2"
)

type Region string

const (
UsEast1 Region = "us-east-1"
)

type Role string

const (
RoleRdevCanvasWebapp Role = "rdev-canvas-webapp"
SlackDevContainer Role = "slack-dev-container"
SlackQAEnvContainer Role = "slack-qa-env-container"
SlackQAEnvSandboxContainer Role = "slack-qa-env-sandbox-container"
)

type NodeHealth string

const (
Passing NodeHealth = "passing"
)

type TagDatacenterEnum string

const (
DevUsEast1 TagDatacenterEnum = "dev-us-east-1"
)

0 comments on commit 35e4285

Please sign in to comment.