Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Queue/Trigger the replay using sloctl [PC-13394] #237

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
121 changes: 87 additions & 34 deletions internal/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,11 @@ func (r *RootCmd) NewReplayCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "replay",
Short: "Retrieve historical SLI data and recalculate their SLO error budgets.",
Long: "Replay pulls in the historical data while your SLO collects new data in real-time. " +
"The historical and current data are merged, producing an error budget calculated for the entire period. " +
"Refer to https://docs.nobl9.com/replay for more details on Replay.\n\n" +
"The 'replay' command allows you to import data for multiple SLOs in bulk. " +
"Before running the Replays it will verify if the SLOs you've provided are eligible for Replay. " +
"It will only run a single Replay simultaneously (current limit for concurrent Replays). " +
"When any Replay fails, it will attempt the import for the next SLO. " +
"Importing data takes time: Replay for a single SLO may take several minutes up to an hour. " +
"During that time, the command keeps on running, periodically checking the status of Replay. " +
"If you cancel the program execution at any time, the current Replay in progress will not be revoked.",
Long: "`sloctl replay` creates Replays to retrieve historical data for SLOs. " +
"Use it to replay SLOs one-by-one or in bulk. Historical data retrieval is time-consuming: " +
"replaying a single SLO can take up to an hour. Considering the number of ongoing Replays is limited, " +
"`sloctl` queues Replays if the limit is exceeded. Replay queues is an experimental feature, currently " +
"unavailable to all organizations. We're working on improving and expanding its availability.",
Example: replayExample,
Args: replay.arguments,
PersistentPreRun: func(cmd *cobra.Command, args []string) {
Expand Down Expand Up @@ -95,31 +90,73 @@ func (r *ReplayCmd) RunReplays(cmd *cobra.Command, replays []ReplayConfig) (fail
return 0, err
}

arePlaylistEnabled := r.arePlaylistEnabled(cmd.Context())

if arePlaylistEnabled {
cmd.Println(colorstring.Color("[yellow]- Your organization has access to Replay queues!"))
cmd.Println(colorstring.Color("[yellow]- To learn more about Replay queues, follow this link: " +
"https://docs.nobl9.com/replay-canary/ [reset]"))
}

failedIndexes := make([]int, 0)
for i, replay := range replays {
cmd.Println(colorstring.Color(fmt.Sprintf(
"[cyan][%d/%d][reset] SLO: %s, Project: %s, From: %s, To: %s",
i+1, len(replays), replay.SLO, replay.Project,
replay.From.Format(timeLayout), time.Now().In(replay.From.Location()).Format(timeLayout))))

spinner := NewSpinner("Importing data...")
spinner.Go()
err = r.runReplay(cmd.Context(), replay)
spinner.Stop()
if arePlaylistEnabled {
cmd.Println("Replay is added to the queue...")
err = r.runReplay(cmd.Context(), replay)

if err != nil {
cmd.Println(colorstring.Color("[red]Import failed:[reset] " + err.Error()))
failedIndexes = append(failedIndexes, i)
continue
if err != nil {
cmd.Println(colorstring.Color("[red]Failed to add Replay to the queue:[reset] " + err.Error()))
failedIndexes = append(failedIndexes, i)
continue
}
cmd.Println(colorstring.Color("[green]Replay has been successfully added to the queue![reset]"))
} else {
spinner := NewSpinner("Importing data...")
spinner.Go()
err = r.runReplayWithStatusCheck(cmd.Context(), replay)
spinner.Stop()

if err != nil {
cmd.Println(colorstring.Color("[red]Import failed:[reset] " + err.Error()))
failedIndexes = append(failedIndexes, i)
continue
}
cmd.Println(colorstring.Color("[green]Import succeeded![reset]"))
}
cmd.Println(colorstring.Color("[green]Import succeeded![reset]"))
}
if len(replays) > 0 {
r.printSummary(cmd, replays, failedIndexes)
}
return len(failedIndexes), nil
}

func (r *ReplayCmd) arePlaylistEnabled(ctx context.Context) bool {
data, _, err := r.doRequest(
ctx,
http.MethodGet,
endpointPlanInfo,
"*",
nil,
nil)
if err != nil {
return true
}
var pc PlaylistConfiguration
if err = json.Unmarshal(data, &pc); err != nil {
return true
}
return pc.EnabledPlaylists
}

type PlaylistConfiguration struct {
EnabledPlaylists bool `json:"enabledPlaylists"`
}

type ReplayConfig struct {
Project string `json:"project" validate:"required"`
SLO string `json:"slo" validate:"required"`
Expand Down Expand Up @@ -264,7 +301,7 @@ func (r *ReplayCmd) verifySLOs(ctx context.Context, replays []ReplayConfig) erro

// Find non-existent or RBAC protected SLOs.
// We're also filling the Data Source spec here for ReplayConfig.
data, err := r.doRequest(
data, _, err := r.doRequest(
ctx,
http.MethodGet,
endpointGetSLO,
Expand Down Expand Up @@ -352,10 +389,10 @@ outer:

const replayStatusCheckInterval = 30 * time.Second

func (r *ReplayCmd) runReplay(ctx context.Context, config ReplayConfig) error {
_, err := r.doRequest(ctx, http.MethodPost, endpointReplayPost, config.Project, nil, config.ToReplay(time.Now()))
func (r *ReplayCmd) runReplayWithStatusCheck(ctx context.Context, config ReplayConfig) error {
err := r.runReplay(ctx, config)
if err != nil {
return errors.Wrap(err, "failed to start new Replay")
return err
}
ticker := time.NewTicker(replayStatusCheckInterval)
for {
Expand All @@ -379,6 +416,21 @@ func (r *ReplayCmd) runReplay(ctx context.Context, config ReplayConfig) error {
}
}

func (r *ReplayCmd) runReplay(ctx context.Context, config ReplayConfig) error {
_, httpCode, err := r.doRequest(ctx, http.MethodPost, endpointReplayPost, config.Project,
nil, config.ToReplay(time.Now()),
)
if err != nil {
switch httpCode {
case 409:
return errors.Errorf("Replay for SLO: '%s' in project: '%s' already exist", config.SLO, config.Project)
default:
return errors.Wrap(err, "failed to start new Replay")
}
}
return nil
}

func (r *ReplayCmd) getReplayAvailability(
ctx context.Context,
config ReplayConfig,
Expand All @@ -392,7 +444,7 @@ func (r *ReplayCmd) getReplayAvailability(
"durationUnit": {durationUnit},
"durationValue": {strconv.Itoa(durationValue)},
}
data, err := r.doRequest(ctx, http.MethodGet, endpointReplayGetAvailability, config.Project, values, nil)
data, _, err := r.doRequest(ctx, http.MethodGet, endpointReplayGetAvailability, config.Project, values, nil)
if err != nil {
return
}
Expand All @@ -406,7 +458,7 @@ func (r *ReplayCmd) getReplayStatus(
ctx context.Context,
config ReplayConfig,
) (string, error) {
data, err := r.doRequest(
data, _, err := r.doRequest(
ctx,
http.MethodGet,
fmt.Sprintf(endpointReplayGetStatus, config.SLO),
Expand All @@ -429,6 +481,7 @@ const (
endpointReplayList = "/timetravel/list"
endpointReplayGetStatus = "/timetravel/%s"
endpointReplayGetAvailability = "/internal/timemachine/availability"
endpointPlanInfo = "/internal/plan-info"
endpointGetSLO = "/get/slo"
)

Expand All @@ -437,30 +490,30 @@ func (r *ReplayCmd) doRequest(
method, endpoint, project string,
values url.Values,
payload interface{},
) ([]byte, error) {
) (data []byte, httpCode int, err error) {
var body io.Reader
if payload != nil {
buf := new(bytes.Buffer)
if err := json.NewEncoder(buf).Encode(payload); err != nil {
return nil, err
return nil, 0, err
}
body = buf
}
header := http.Header{sdk.HeaderProject: []string{project}}
req, err := r.client.CreateRequest(ctx, method, endpoint, header, values, body)
if err != nil {
return nil, err
return nil, 0, err
}
resp, err := r.client.HTTP.Do(req)
if err != nil {
return nil, err
return nil, 0, err
}
defer func() { _ = resp.Body.Close() }()
data, err = io.ReadAll(resp.Body)
if resp.StatusCode >= 300 {
data, _ := io.ReadAll(resp.Body)
return nil, errors.Errorf("bad response (status: %d): %s", resp.StatusCode, string(data))
return nil, resp.StatusCode, errors.Errorf("bad response (status: %d): %s", resp.StatusCode, string(data))
}
return io.ReadAll(resp.Body)
return data, resp.StatusCode, err
}

func (r *ReplayCmd) replayUnavailabilityReasonExplanation(
Expand Down Expand Up @@ -502,14 +555,14 @@ func (r *ReplayCmd) replayUnavailabilityReasonExplanation(

func (r *ReplayCmd) printSummary(cmd *cobra.Command, replays []ReplayConfig, failedIndexes []int) {
if len(failedIndexes) == 0 {
cmd.Printf("\nSuccessfully imported data for all %d SLOs.\n", len(replays))
cmd.Printf("\nSuccessfully finished operations for all %d SLOs.\n", len(replays))
} else {
failedDetails := make([]string, 0, len(failedIndexes))
for _, i := range failedIndexes {
fr, _ := json.Marshal(replays[i])
failedDetails = append(failedDetails, string(fr))
}
cmd.Printf("\nSuccessfully imported data for %d and failed for %d SLOs:\n - %s\n",
cmd.Printf("\nSuccessfully finished operations for %d and failed for %d SLOs:\n - %s\n",
len(replays)-len(failedIndexes), len(failedIndexes), strings.Join(failedDetails, "\n - "))
}
}
4 changes: 2 additions & 2 deletions internal/replay_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type deleteReplayRequest struct {
func (r *ReplayCmd) deleteAllReplays(cmd *cobra.Command) error {
cmd.Println(colorstring.Color("[yellow]Deleting all queued Replays[reset]"))

_, err := r.doRequest(
_, _, err := r.doRequest(
cmd.Context(),
http.MethodDelete,
endpointReplayDelete,
Expand Down Expand Up @@ -87,7 +87,7 @@ func (r *ReplayCmd) deleteReplaysForSLO(cmd *cobra.Command, sloName string) erro
),
)

_, err := r.doRequest(
_, _, err := r.doRequest(
cmd.Context(),
http.MethodDelete,
endpointReplayDelete,
Expand Down
2 changes: 1 addition & 1 deletion internal/replay_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type ReplayQueueItem struct {
func (r *ReplayCmd) listAllReplays(cmd *cobra.Command) error {
cmd.Println(colorstring.Color("[yellow]Listing all Replays[reset]"))

response, err := r.doRequest(
response, _, err := r.doRequest(
cmd.Context(),
http.MethodGet,
endpointReplayList,
Expand Down
Loading