Skip to content

Commit

Permalink
fix: multiple plugin installation
Browse files Browse the repository at this point in the history
  • Loading branch information
Yeuoly committed Dec 31, 2024
1 parent 0d51a59 commit f6aa10a
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ func (b *InstalledBucket) Get(

// List lists all the plugins in the installed bucket
func (b *InstalledBucket) List() ([]plugin_entities.PluginUniqueIdentifier, error) {
// check if the patch exists
exists, err := b.oss.Exists(b.installedPath)
if err != nil {
return nil, err
}
if !exists {
return []plugin_entities.PluginUniqueIdentifier{}, nil
}

paths, err := b.oss.List(b.installedPath)
if err != nil {
return nil, err
Expand Down
20 changes: 16 additions & 4 deletions internal/server/controllers/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,25 @@ func InstallPluginFromIdentifiers(app *app.Config) gin.HandlerFunc {
TenantID string `uri:"tenant_id" validate:"required"`
PluginUniqueIdentifiers []plugin_entities.PluginUniqueIdentifier `json:"plugin_unique_identifiers" validate:"required,max=64,dive,plugin_unique_identifier"`
Source string `json:"source" validate:"required"`
Meta map[string]any `json:"meta" validate:"omitempty"`
Metas []map[string]any `json:"metas" validate:"omitempty"`
}) {
if request.Meta == nil {
request.Meta = map[string]any{}
if request.Metas == nil {
request.Metas = []map[string]any{}
}

if len(request.Metas) != len(request.PluginUniqueIdentifiers) {
c.JSON(http.StatusOK, exception.BadRequestError(errors.New("the number of metas must be equal to the number of plugin unique identifiers")).ToResponse())
return
}

for i := range request.Metas {
if request.Metas[i] == nil {
request.Metas[i] = map[string]any{}
}
}

c.JSON(http.StatusOK, service.InstallPluginFromIdentifiers(
app, request.TenantID, request.PluginUniqueIdentifiers, request.Source, request.Meta,
app, request.TenantID, request.PluginUniqueIdentifiers, request.Source, request.Metas,
))
})
}
Expand Down
24 changes: 14 additions & 10 deletions internal/service/install_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ type InstallPluginResponse struct {
type InstallPluginOnDoneHandler func(
pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
declaration *plugin_entities.PluginDeclaration,
meta map[string]any,
) error

func InstallPluginRuntimeToTenant(
config *app.Config,
tenant_id string,
plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
source string,
meta map[string]any,
metas []map[string]any,
onDone InstallPluginOnDoneHandler, // since installing plugin is a async task, we need to call it asynchronously
) (*InstallPluginResponse, error) {
response := &InstallPluginResponse{}
Expand Down Expand Up @@ -85,7 +86,7 @@ func InstallPluginRuntimeToTenant(
})

if err == nil {
if err := onDone(pluginUniqueIdentifier, pluginDeclaration); err != nil {
if err := onDone(pluginUniqueIdentifier, pluginDeclaration, metas[i]); err != nil {
return nil, errors.Join(err, errors.New("failed on plugin installation"))
} else {
task.CompletedPlugins++
Expand Down Expand Up @@ -118,7 +119,7 @@ func InstallPluginRuntimeToTenant(
manager := plugin_manager.Manager()

tasks := []func(){}
for _, pluginUniqueIdentifier := range pluginsWaitForInstallation {
for i, pluginUniqueIdentifier := range pluginsWaitForInstallation {
// copy the variable to avoid race condition
pluginUniqueIdentifier := pluginUniqueIdentifier

Expand All @@ -131,6 +132,7 @@ func InstallPluginRuntimeToTenant(
return nil, err
}

i := i
tasks = append(tasks, func() {
updateTaskStatus := func(modifier func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus)) {
if err := db.WithTransaction(func(tx *gorm.DB) error {
Expand Down Expand Up @@ -213,9 +215,9 @@ func InstallPluginRuntimeToTenant(
})
return
}
stream, err = manager.InstallToAWSFromPkg(zipDecoder, source, meta)
stream, err = manager.InstallToAWSFromPkg(zipDecoder, source, metas[i])
} else if config.Platform == app.PLATFORM_LOCAL {
stream, err = manager.InstallToLocal(pluginUniqueIdentifier, source, meta)
stream, err = manager.InstallToLocal(pluginUniqueIdentifier, source, metas[i])
} else {
updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
task.Status = models.InstallTaskStatusFailed
Expand Down Expand Up @@ -255,7 +257,7 @@ func InstallPluginRuntimeToTenant(
}

if message.Event == plugin_manager.PluginInstallEventDone {
if err := onDone(pluginUniqueIdentifier, declaration); err != nil {
if err := onDone(pluginUniqueIdentifier, declaration, metas[i]); err != nil {
updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
task.Status = models.InstallTaskStatusFailed
plugin.Status = models.InstallTaskStatusFailed
Expand All @@ -280,7 +282,7 @@ func InstallPluginRuntimeToTenant(
}

// submit async tasks
routine.WithMaxRoutine(3, tasks)
routine.WithMaxRoutine(5, tasks)

return response, nil
}
Expand All @@ -290,17 +292,18 @@ func InstallPluginFromIdentifiers(
tenant_id string,
plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
source string,
meta map[string]any,
metas []map[string]any,
) *entities.Response {
response, err := InstallPluginRuntimeToTenant(
config,
tenant_id,
plugin_unique_identifiers,
source,
meta,
metas,
func(
pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
declaration *plugin_entities.PluginDeclaration,
meta map[string]any,
) error {
runtimeType := plugin_entities.PluginRuntimeType("")

Expand Down Expand Up @@ -371,10 +374,11 @@ func UpgradePlugin(
tenant_id,
[]plugin_entities.PluginUniqueIdentifier{new_plugin_unique_identifier},
source,
meta,
[]map[string]any{meta},
func(
pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
declaration *plugin_entities.PluginDeclaration,
meta map[string]any,
) error {
// uninstall the original plugin
upgradeResponse, err := curd.UpgradePlugin(
Expand Down

0 comments on commit f6aa10a

Please sign in to comment.