Skip to content

Commit

Permalink
- Added operation_id in payload for APIs backup/create, backup/restor…
Browse files Browse the repository at this point in the history
…e/{name}, backup/upload/{name}, backup/download/{name}

- Added operation_id to be sent in success and error callback
  • Loading branch information
Manas Mulay committed Aug 27, 2024
1 parent 4d83171 commit 3904982
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 45 deletions.
86 changes: 50 additions & 36 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/Altinity/clickhouse-backup/v2/pkg/server/metrics"
"github.com/Altinity/clickhouse-backup/v2/pkg/status"
"github.com/Altinity/clickhouse-backup/v2/pkg/utils"
"github.com/google/uuid"
)

type APIServer struct {
Expand Down Expand Up @@ -874,6 +875,8 @@ func (api *APIServer) httpCreateHandler(w http.ResponseWriter, r *http.Request)
checkPartsColumns := true
fullCommand := "create"
query := r.URL.Query()
operationId, _ := uuid.NewUUID()

if tp, exist := query["table"]; exist {
tablePattern = tp[0]
fullCommand = fmt.Sprintf("%s --tables=\"%s\"", fullCommand, tablePattern)
Expand Down Expand Up @@ -930,7 +933,7 @@ func (api *APIServer) httpCreateHandler(w http.ResponseWriter, r *http.Request)
if err != nil {
log.Error().Msgf("API /backup/create error: %v", err)
status.Current.Stop(commandId, err)
api.errorCallback(context.Background(), err, callback)
api.errorCallback(context.Background(), err, operationId.String(), callback)
return
}
go func() {
Expand All @@ -940,16 +943,18 @@ func (api *APIServer) httpCreateHandler(w http.ResponseWriter, r *http.Request)
}()

status.Current.Stop(commandId, nil)
api.successCallback(context.Background(), callback)
api.successCallback(context.Background(), operationId.String(), callback)
}()
api.sendJSONEachRow(w, http.StatusCreated, struct {
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
OperationId string `json:"operation_id"`
}{
Status: "acknowledged",
Operation: "create",
BackupName: backupName,
Status: "acknowledged",
Operation: "create",
BackupName: backupName,
OperationId: operationId.String(),
})
}

Expand Down Expand Up @@ -1118,6 +1123,7 @@ func (api *APIServer) httpUploadHandler(w http.ResponseWriter, r *http.Request)
schemaOnly := false
resume := false
fullCommand := "upload"
operationId, _ := uuid.NewUUID()

if _, exist := query["delete-source"]; exist {
deleteSource = true
Expand Down Expand Up @@ -1167,7 +1173,7 @@ func (api *APIServer) httpUploadHandler(w http.ResponseWriter, r *http.Request)
if err != nil {
log.Error().Msgf("Upload error: %v", err)
status.Current.Stop(commandId, err)
api.errorCallback(context.Background(), err, callback)
api.errorCallback(context.Background(), err, operationId.String(), callback)
return
}
go func() {
Expand All @@ -1176,20 +1182,22 @@ func (api *APIServer) httpUploadHandler(w http.ResponseWriter, r *http.Request)
}
}()
status.Current.Stop(commandId, nil)
api.successCallback(context.Background(), callback)
api.successCallback(context.Background(), operationId.String(), callback)
}()
api.sendJSONEachRow(w, http.StatusOK, struct {
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
BackupFrom string `json:"backup_from,omitempty"`
Diff bool `json:"diff"`
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
BackupFrom string `json:"backup_from,omitempty"`
Diff bool `json:"diff"`
OperationId string `json:"operation_id"`
}{
Status: "acknowledged",
Operation: "upload",
BackupName: name,
BackupFrom: diffFrom,
Diff: diffFrom != "",
Status: "acknowledged",
Operation: "upload",
BackupName: name,
BackupFrom: diffFrom,
Diff: diffFrom != "",
OperationId: operationId.String(),
})
}

Expand Down Expand Up @@ -1219,6 +1227,7 @@ func (api *APIServer) httpRestoreHandler(w http.ResponseWriter, r *http.Request)
restoreRBAC := false
restoreConfigs := false
fullCommand := "restore"
operationId, _ := uuid.NewUUID()

query := r.URL.Query()
if tp, exist := query["table"]; exist {
Expand Down Expand Up @@ -1310,19 +1319,21 @@ func (api *APIServer) httpRestoreHandler(w http.ResponseWriter, r *http.Request)
status.Current.Stop(commandId, err)
if err != nil {
log.Error().Msgf("API /backup/restore error: %v", err)
api.errorCallback(context.Background(), err, callback)
api.errorCallback(context.Background(), err, operationId.String(), callback)
return
}
api.successCallback(context.Background(), callback)
api.successCallback(context.Background(), operationId.String(), callback)
}()
api.sendJSONEachRow(w, http.StatusOK, struct {
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
OperationId string `json:"operation_id`
}{
Status: "acknowledged",
Operation: "restore",
BackupName: name,
Status: "acknowledged",
Operation: "restore",
BackupName: name,
OperationId: operationId.String(),
})
}

Expand All @@ -1346,6 +1357,7 @@ func (api *APIServer) httpDownloadHandler(w http.ResponseWriter, r *http.Request
schemaOnly := false
resume := false
fullCommand := "download"
operationId, _ := uuid.NewUUID()

if tp, exist := query["table"]; exist {
tablePattern = tp[0]
Expand Down Expand Up @@ -1381,7 +1393,7 @@ func (api *APIServer) httpDownloadHandler(w http.ResponseWriter, r *http.Request
if err != nil {
log.Error().Msgf("API /backup/download error: %v", err)
status.Current.Stop(commandId, err)
api.errorCallback(context.Background(), err, callback)
api.errorCallback(context.Background(), err, operationId.String(), callback)
return
}
go func() {
Expand All @@ -1390,16 +1402,18 @@ func (api *APIServer) httpDownloadHandler(w http.ResponseWriter, r *http.Request
}
}()
status.Current.Stop(commandId, nil)
api.successCallback(context.Background(), callback)
api.successCallback(context.Background(), operationId.String(), callback)
}()
api.sendJSONEachRow(w, http.StatusOK, struct {
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
OperationId string `json:"operation_id"`
}{
Status: "acknowledged",
Operation: "download",
BackupName: name,
Status: "acknowledged",
Operation: "download",
BackupName: name,
OperationId: operationId.String(),
})
}

Expand Down
22 changes: 13 additions & 9 deletions pkg/server/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/rs/zerolog/log"
"net/http"
"reflect"

"github.com/rs/zerolog/log"
)

func (api *APIServer) flushOutput(w http.ResponseWriter, out string) {
Expand Down Expand Up @@ -61,26 +62,29 @@ func (api *APIServer) sendJSONEachRow(w http.ResponseWriter, statusCode int, v i

// CallbackResponse is the response that is returned to callers
type CallbackResponse struct {
Status string `json:"status"`
Error string `json:"error"`
Status string `json:"status"`
Error string `json:"error"`
OperationId string `json:"operation_id"`
}

// errorCallback executes callbacks with a payload notifying callers that the operation has failed
func (api *APIServer) errorCallback(ctx context.Context, err error, callback callbackFn) {
func (api *APIServer) errorCallback(ctx context.Context, err error, operationId string, callback callbackFn) {
payload := &CallbackResponse{
Status: "error",
Error: err.Error(),
Status: "error",
Error: err.Error(),
OperationId: operationId,
}
for _, e := range callback(ctx, payload) {
log.Error().Err(e).Send()
}
}

// successCallback executes callbacks with a payload notifying callers that the operation succeeded
func (api *APIServer) successCallback(ctx context.Context, callback callbackFn) {
func (api *APIServer) successCallback(ctx context.Context, operationId string, callback callbackFn) {
payload := &CallbackResponse{
Status: "success",
Error: "",
Status: "success",
Error: "",
OperationId: operationId,
}
for _, e := range callback(ctx, payload) {
log.Error().Err(e).Send()
Expand Down

0 comments on commit 3904982

Please sign in to comment.