Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added operation_id for async APIs #997

Merged
merged 4 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ Create new backup: `curl -s localhost:7171/backup/create -X POST | jq .`
- Optional query argument `schema` works the same as the `--schema` CLI argument (backup schema only).
- Optional query argument `rbac` works the same as the `--rbac` CLI argument (backup RBAC).
- Optional query argument `configs` works the same as the `--configs` CLI argument (backup configs).
- Optional query argument `callback` allow pass callback URL which will call with POST with `application/json` with payload `{"status":"error|success","error":"not empty when error happens"}`.
- Optional query argument `callback` allow pass callback URL which will call with POST with `application/json` with payload `{"status":"error|success","error":"not empty when error happens", "operation_id" : "<random_uuid>"}`.
- Additional example: `curl -s 'localhost:7171/backup/create?table=default.billing&name=billing_test' -X POST`

Note: this operation is asynchronous, so the API will return once the operation has started.
Expand Down Expand Up @@ -443,7 +443,7 @@ Upload backup to remote storage: `curl -s localhost:7171/backup/upload/<BACKUP_N
- Optional query argument `partitions` works the same as the `--partitions value` CLI argument.
- Optional query argument `schema` works the same as the `--schema` CLI argument (upload schema only).
- Optional query argument `resumable` works the same as the `--resumable` CLI argument (save intermediate upload state and resume upload if data already exists on remote storage).
- Optional query argument `callback` allow pass callback URL which will call with POST with `application/json` with payload `{"status":"error|success","error":"not empty when error happens"}`.
- Optional query argument `callback` allow pass callback URL which will call with POST with `application/json` with payload `{"status":"error|success","error":"not empty when error happens", "operation_id" : "<random_uuid>"}`.

Note: this operation is asynchronous, so the API will return once the operation has started.

Expand All @@ -464,7 +464,7 @@ Download backup from remote storage: `curl -s localhost:7171/backup/download/<BA
- Optional query argument `partitions` works the same as the `--partitions value` CLI argument.
- Optional query argument `schema` works the same as the `--schema` CLI argument (download schema only).
- Optional query argument `resumable` works the same as the `--resumable` CLI argument (save intermediate download state and resume download if it already exists on local storage).
- Optional query argument `callback` allow pass callback URL which will call with POST with `application/json` with payload `{"status":"error|success","error":"not empty when error happens"}`.
- Optional query argument `callback` allow pass callback URL which will call with POST with `application/json` with payload `{"status":"error|success","error":"not empty when error happens", "operation_id" : "<random_uuid>"}`.

Note: this operation is asynchronous, so the API will return once the operation has started.

Expand All @@ -482,7 +482,7 @@ Create schema and restore data from backup: `curl -s localhost:7171/backup/resto
- Optional query argument `configs` works the same as the `--configs` CLI argument (restore configs).
- Optional query argument `restore_database_mapping` works the same as the `--restore-database-mapping` CLI argument.
- Optional query argument `restore_table_mapping` works the same as the `--restore-table-mapping` CLI argument.
- Optional query argument `callback` allow pass callback URL which will call with POST with `application/json` with payload `{"status":"error|success","error":"not empty when error happens"}`.
- Optional query argument `callback` allow pass callback URL which will call with POST with `application/json` with payload `{"status":"error|success","error":"not empty when error happens", "operation_id" : "<random_uuid>"}`.

### POST /backup/delete

Expand Down
86 changes: 50 additions & 36 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"github.com/Altinity/clickhouse-backup/v2/pkg/server/metrics"
"github.com/Altinity/clickhouse-backup/v2/pkg/status"
"github.com/Altinity/clickhouse-backup/v2/pkg/utils"
"github.com/google/uuid"
)

type APIServer struct {
Expand Down Expand Up @@ -874,6 +875,8 @@
checkPartsColumns := true
fullCommand := "create"
query := r.URL.Query()
operationId, _ := uuid.NewUUID()

if tp, exist := query["table"]; exist {
tablePattern = tp[0]
fullCommand = fmt.Sprintf("%s --tables=\"%s\"", fullCommand, tablePattern)
Expand Down Expand Up @@ -930,7 +933,7 @@
if err != nil {
log.Error().Msgf("API /backup/create error: %v", err)
status.Current.Stop(commandId, err)
api.errorCallback(context.Background(), err, callback)
api.errorCallback(context.Background(), err, operationId.String(), callback)
return
}
go func() {
Expand All @@ -940,16 +943,18 @@
}()

status.Current.Stop(commandId, nil)
api.successCallback(context.Background(), callback)
api.successCallback(context.Background(), operationId.String(), callback)
}()
api.sendJSONEachRow(w, http.StatusCreated, struct {
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
OperationId string `json:"operation_id"`
}{
Status: "acknowledged",
Operation: "create",
BackupName: backupName,
Status: "acknowledged",
Operation: "create",
BackupName: backupName,
OperationId: operationId.String(),
})
}

Expand Down Expand Up @@ -1118,6 +1123,7 @@
schemaOnly := false
resume := false
fullCommand := "upload"
operationId, _ := uuid.NewUUID()

if _, exist := query["delete-source"]; exist {
deleteSource = true
Expand Down Expand Up @@ -1167,7 +1173,7 @@
if err != nil {
log.Error().Msgf("Upload error: %v", err)
status.Current.Stop(commandId, err)
api.errorCallback(context.Background(), err, callback)
api.errorCallback(context.Background(), err, operationId.String(), callback)
return
}
go func() {
Expand All @@ -1176,20 +1182,22 @@
}
}()
status.Current.Stop(commandId, nil)
api.successCallback(context.Background(), callback)
api.successCallback(context.Background(), operationId.String(), callback)
}()
api.sendJSONEachRow(w, http.StatusOK, struct {
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
BackupFrom string `json:"backup_from,omitempty"`
Diff bool `json:"diff"`
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
BackupFrom string `json:"backup_from,omitempty"`
Diff bool `json:"diff"`
OperationId string `json:"operation_id"`
}{
Status: "acknowledged",
Operation: "upload",
BackupName: name,
BackupFrom: diffFrom,
Diff: diffFrom != "",
Status: "acknowledged",
Operation: "upload",
BackupName: name,
BackupFrom: diffFrom,
Diff: diffFrom != "",
OperationId: operationId.String(),
})
}

Expand Down Expand Up @@ -1219,6 +1227,7 @@
restoreRBAC := false
restoreConfigs := false
fullCommand := "restore"
operationId, _ := uuid.NewUUID()

query := r.URL.Query()
if tp, exist := query["table"]; exist {
Expand Down Expand Up @@ -1310,19 +1319,21 @@
status.Current.Stop(commandId, err)
if err != nil {
log.Error().Msgf("API /backup/restore error: %v", err)
api.errorCallback(context.Background(), err, callback)
api.errorCallback(context.Background(), err, operationId.String(), callback)
return
}
api.successCallback(context.Background(), callback)
api.successCallback(context.Background(), operationId.String(), callback)
}()
api.sendJSONEachRow(w, http.StatusOK, struct {
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
OperationId string `json:"operation_id`

Check failure on line 1331 in pkg/server/server.go

View workflow job for this annotation

GitHub Actions / Build (1.22)

struct field tag `json:"operation_id` not compatible with reflect.StructTag.Get: bad syntax for struct tag value
manasmulay marked this conversation as resolved.
Show resolved Hide resolved
}{
Status: "acknowledged",
Operation: "restore",
BackupName: name,
Status: "acknowledged",
Operation: "restore",
BackupName: name,
OperationId: operationId.String(),
})
}

Expand All @@ -1346,6 +1357,7 @@
schemaOnly := false
resume := false
fullCommand := "download"
operationId, _ := uuid.NewUUID()

if tp, exist := query["table"]; exist {
tablePattern = tp[0]
Expand Down Expand Up @@ -1381,7 +1393,7 @@
if err != nil {
log.Error().Msgf("API /backup/download error: %v", err)
status.Current.Stop(commandId, err)
api.errorCallback(context.Background(), err, callback)
api.errorCallback(context.Background(), err, operationId.String(), callback)
return
}
go func() {
Expand All @@ -1390,16 +1402,18 @@
}
}()
status.Current.Stop(commandId, nil)
api.successCallback(context.Background(), callback)
api.successCallback(context.Background(), operationId.String(), callback)
}()
api.sendJSONEachRow(w, http.StatusOK, struct {
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
OperationId string `json:"operation_id"`
}{
Status: "acknowledged",
Operation: "download",
BackupName: name,
Status: "acknowledged",
Operation: "download",
BackupName: name,
OperationId: operationId.String(),
})
}

Expand Down
22 changes: 13 additions & 9 deletions pkg/server/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/rs/zerolog/log"
"net/http"
"reflect"

"github.com/rs/zerolog/log"
)

func (api *APIServer) flushOutput(w http.ResponseWriter, out string) {
Expand Down Expand Up @@ -61,26 +62,29 @@ func (api *APIServer) sendJSONEachRow(w http.ResponseWriter, statusCode int, v i

// CallbackResponse is the response that is returned to callers
type CallbackResponse struct {
Status string `json:"status"`
Error string `json:"error"`
Status string `json:"status"`
Error string `json:"error"`
OperationId string `json:"operation_id"`
}

// errorCallback executes callbacks with a payload notifying callers that the operation has failed
func (api *APIServer) errorCallback(ctx context.Context, err error, callback callbackFn) {
func (api *APIServer) errorCallback(ctx context.Context, err error, operationId string, callback callbackFn) {
payload := &CallbackResponse{
Status: "error",
Error: err.Error(),
Status: "error",
Error: err.Error(),
OperationId: operationId,
}
for _, e := range callback(ctx, payload) {
log.Error().Err(e).Send()
}
}

// successCallback executes callbacks with a payload notifying callers that the operation succeeded
func (api *APIServer) successCallback(ctx context.Context, callback callbackFn) {
func (api *APIServer) successCallback(ctx context.Context, operationId string, callback callbackFn) {
payload := &CallbackResponse{
Status: "success",
Error: "",
Status: "success",
Error: "",
OperationId: operationId,
}
for _, e := range callback(ctx, payload) {
log.Error().Err(e).Send()
Expand Down
Loading