Skip to content

Commit

Permalink
Merge branch 'master' of github.com:Altinity/clickhouse-backup into v2.6
Browse files Browse the repository at this point in the history
# Conflicts:
#	ReadMe.md
  • Loading branch information
Slach committed Aug 29, 2024
2 parents e19019f + 3109f69 commit ed34fe3
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 50 deletions.
11 changes: 6 additions & 5 deletions ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,9 @@ Create new backup: `curl -s localhost:7171/backup/create -X POST | jq .`
- Optional boolean query argument `rbac-only` works the same as the `--rbac-only` CLI argument (backup only RBAC).
- Optional boolean query argument `configs` works the same as the `--configs` CLI argument (backup configs).
- Optional boolean query argument `configs-only` works the same as the `--configs-only` CLI argument (backup only configs).
- Optional string query argument `callback` allow pass callback URL which will call with POST with `application/json` with payload `{"status":"error|success","error":"not empty when error happens"}`.
- Additional example: `curl -s 'localhost:7171/backup/create?table=default.billing&name=billing_test' -X POST`
- Optional string query argument `callback` allow pass callback URL which will call with POST with `application/json` with payload `{"status":"error|success","error":"not empty when error happens", "operation_id" : "<random_uuid>"}`.

Additional example: `curl -s 'localhost:7171/backup/create?table=default.billing&name=billing_test' -X POST`

Note: this operation is asynchronous, so the API will return once the operation has started.

Expand Down Expand Up @@ -450,7 +451,7 @@ Upload backup to remote storage: `curl -s localhost:7171/backup/upload/<BACKUP_N
- Optional string query argument `partitions` works the same as the `--partitions value` CLI argument.
- Optional boolean query argument `schema` works the same as the `--schema` CLI argument (upload schema only).
- Optional boolean query argument `resumable` works the same as the `--resumable` CLI argument (save intermediate upload state and resume upload if data already exists on remote storage).
- Optional string query argument `callback` allow pass callback URL which will call with POST with `application/json` with payload `{"status":"error|success","error":"not empty when error happens"}`.
- Optional string query argument `callback` allow pass callback URL which will call with POST with `application/json` with payload `{"status":"error|success","error":"not empty when error happens", "operation_id" : "<random_uuid>"}`.

Note: this operation is asynchronous, so the API will return once the operation has started.

Expand All @@ -471,7 +472,7 @@ Download backup from remote storage: `curl -s localhost:7171/backup/download/<BA
- Optional string query argument `partitions` works the same as the `--partitions value` CLI argument.
- Optional boolean query argument `schema` works the same as the `--schema` CLI argument (download schema only).
- Optional boolean query argument `resumable` works the same as the `--resumable` CLI argument (save intermediate download state and resume download if it already exists on local storage).
- Optional string query argument `callback` allow pass callback URL which will call with POST with `application/json` with payload `{"status":"error|success","error":"not empty when error happens"}`.
- Optional string query argument `callback` allow pass callback URL which will call with POST with `application/json` with payload `{"status":"error|success","error":"not empty when error happens", "operation_id" : "<random_uuid>"}`.

Note: this operation is asynchronous, so the API will return once the operation has started.

Expand All @@ -491,7 +492,7 @@ Create schema and restore data from backup: `curl -s localhost:7171/backup/resto
- Optional boolean query argument `configs-only` works the same as the `--configs-only` CLI argument (restore configs).
- Optional string query argument `restore_database_mapping` works the same as the `--restore-database-mapping=old_db:new_db` CLI argument.
- Optional string query argument `restore_table_mapping` works the same as the `--restore-table-mapping=old_table:new_table` CLI argument.
- Optional string query argument `callback` allow pass callback URL which will call with POST with `application/json` with payload `{"status":"error|success","error":"not empty when error happens"}`.
- Optional string query argument `callback` allow pass callback URL which will call with POST with `application/json` with payload `{"status":"error|success","error":"not empty when error happens", "operation_id" : "<random_uuid>"}`.

### POST /backup/delete

Expand Down
86 changes: 50 additions & 36 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/Altinity/clickhouse-backup/v2/pkg/server/metrics"
"github.com/Altinity/clickhouse-backup/v2/pkg/status"
"github.com/Altinity/clickhouse-backup/v2/pkg/utils"
"github.com/google/uuid"
)

type APIServer struct {
Expand Down Expand Up @@ -876,6 +877,8 @@ func (api *APIServer) httpCreateHandler(w http.ResponseWriter, r *http.Request)
checkPartsColumns := true
fullCommand := "create"
query := r.URL.Query()
operationId, _ := uuid.NewUUID()

if tp, exist := query["table"]; exist {
tablePattern = tp[0]
fullCommand = fmt.Sprintf("%s --tables=\"%s\"", fullCommand, tablePattern)
Expand Down Expand Up @@ -934,7 +937,7 @@ func (api *APIServer) httpCreateHandler(w http.ResponseWriter, r *http.Request)
if err != nil {
log.Error().Msgf("API /backup/create error: %v", err)
status.Current.Stop(commandId, err)
api.errorCallback(context.Background(), err, callback)
api.errorCallback(context.Background(), err, operationId.String(), callback)
return
}
go func() {
Expand All @@ -944,16 +947,18 @@ func (api *APIServer) httpCreateHandler(w http.ResponseWriter, r *http.Request)
}()

status.Current.Stop(commandId, nil)
api.successCallback(context.Background(), callback)
api.successCallback(context.Background(), operationId.String(), callback)
}()
api.sendJSONEachRow(w, http.StatusCreated, struct {
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
OperationId string `json:"operation_id"`
}{
Status: "acknowledged",
Operation: "create",
BackupName: backupName,
Status: "acknowledged",
Operation: "create",
BackupName: backupName,
OperationId: operationId.String(),
})
}

Expand Down Expand Up @@ -1122,6 +1127,7 @@ func (api *APIServer) httpUploadHandler(w http.ResponseWriter, r *http.Request)
schemaOnly := false
resume := false
fullCommand := "upload"
operationId, _ := uuid.NewUUID()

if _, exist := query["delete-source"]; exist {
deleteSource = true
Expand Down Expand Up @@ -1171,7 +1177,7 @@ func (api *APIServer) httpUploadHandler(w http.ResponseWriter, r *http.Request)
if err != nil {
log.Error().Msgf("Upload error: %v", err)
status.Current.Stop(commandId, err)
api.errorCallback(context.Background(), err, callback)
api.errorCallback(context.Background(), err, operationId.String(), callback)
return
}
go func() {
Expand All @@ -1180,20 +1186,22 @@ func (api *APIServer) httpUploadHandler(w http.ResponseWriter, r *http.Request)
}
}()
status.Current.Stop(commandId, nil)
api.successCallback(context.Background(), callback)
api.successCallback(context.Background(), operationId.String(), callback)
}()
api.sendJSONEachRow(w, http.StatusOK, struct {
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
BackupFrom string `json:"backup_from,omitempty"`
Diff bool `json:"diff"`
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
BackupFrom string `json:"backup_from,omitempty"`
Diff bool `json:"diff"`
OperationId string `json:"operation_id"`
}{
Status: "acknowledged",
Operation: "upload",
BackupName: name,
BackupFrom: diffFrom,
Diff: diffFrom != "",
Status: "acknowledged",
Operation: "upload",
BackupName: name,
BackupFrom: diffFrom,
Diff: diffFrom != "",
OperationId: operationId.String(),
})
}

Expand Down Expand Up @@ -1225,6 +1233,7 @@ func (api *APIServer) httpRestoreHandler(w http.ResponseWriter, r *http.Request)
restoreConfigs := false
configsOnly := false
fullCommand := "restore"
operationId, _ := uuid.NewUUID()

query := r.URL.Query()
if tp, exist := query["table"]; exist {
Expand Down Expand Up @@ -1324,19 +1333,21 @@ func (api *APIServer) httpRestoreHandler(w http.ResponseWriter, r *http.Request)
status.Current.Stop(commandId, err)
if err != nil {
log.Error().Msgf("API /backup/restore error: %v", err)
api.errorCallback(context.Background(), err, callback)
api.errorCallback(context.Background(), err, operationId.String(), callback)
return
}
api.successCallback(context.Background(), callback)
api.successCallback(context.Background(), operationId.String(), callback)
}()
api.sendJSONEachRow(w, http.StatusOK, struct {
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
OperationId string `json:"operation_id"`
}{
Status: "acknowledged",
Operation: "restore",
BackupName: name,
Status: "acknowledged",
Operation: "restore",
BackupName: name,
OperationId: operationId.String(),
})
}

Expand All @@ -1360,6 +1371,7 @@ func (api *APIServer) httpDownloadHandler(w http.ResponseWriter, r *http.Request
schemaOnly := false
resume := false
fullCommand := "download"
operationId, _ := uuid.NewUUID()

if tp, exist := query["table"]; exist {
tablePattern = tp[0]
Expand Down Expand Up @@ -1395,7 +1407,7 @@ func (api *APIServer) httpDownloadHandler(w http.ResponseWriter, r *http.Request
if err != nil {
log.Error().Msgf("API /backup/download error: %v", err)
status.Current.Stop(commandId, err)
api.errorCallback(context.Background(), err, callback)
api.errorCallback(context.Background(), err, operationId.String(), callback)
return
}
go func() {
Expand All @@ -1404,16 +1416,18 @@ func (api *APIServer) httpDownloadHandler(w http.ResponseWriter, r *http.Request
}
}()
status.Current.Stop(commandId, nil)
api.successCallback(context.Background(), callback)
api.successCallback(context.Background(), operationId.String(), callback)
}()
api.sendJSONEachRow(w, http.StatusOK, struct {
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
Status string `json:"status"`
Operation string `json:"operation"`
BackupName string `json:"backup_name"`
OperationId string `json:"operation_id"`
}{
Status: "acknowledged",
Operation: "download",
BackupName: name,
Status: "acknowledged",
Operation: "download",
BackupName: name,
OperationId: operationId.String(),
})
}

Expand Down
22 changes: 13 additions & 9 deletions pkg/server/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/rs/zerolog/log"
"net/http"
"reflect"

"github.com/rs/zerolog/log"
)

func (api *APIServer) flushOutput(w http.ResponseWriter, out string) {
Expand Down Expand Up @@ -61,26 +62,29 @@ func (api *APIServer) sendJSONEachRow(w http.ResponseWriter, statusCode int, v i

// CallbackResponse is the response that is returned to callers
type CallbackResponse struct {
Status string `json:"status"`
Error string `json:"error"`
Status string `json:"status"`
Error string `json:"error"`
OperationId string `json:"operation_id"`
}

// errorCallback executes callbacks with a payload notifying callers that the operation has failed
func (api *APIServer) errorCallback(ctx context.Context, err error, callback callbackFn) {
func (api *APIServer) errorCallback(ctx context.Context, err error, operationId string, callback callbackFn) {
payload := &CallbackResponse{
Status: "error",
Error: err.Error(),
Status: "error",
Error: err.Error(),
OperationId: operationId,
}
for _, e := range callback(ctx, payload) {
log.Error().Err(e).Send()
}
}

// successCallback executes callbacks with a payload notifying callers that the operation succeeded
func (api *APIServer) successCallback(ctx context.Context, callback callbackFn) {
func (api *APIServer) successCallback(ctx context.Context, operationId string, callback callbackFn) {
payload := &CallbackResponse{
Status: "success",
Error: "",
Status: "success",
Error: "",
OperationId: operationId,
}
for _, e := range callback(ctx, payload) {
log.Error().Err(e).Send()
Expand Down

0 comments on commit ed34fe3

Please sign in to comment.