Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add vega datanode migrate-ipfs subcommand #10597

Merged
merged 5 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [9945](https://github.com/vegaprotocol/vega/issues/9945) - Add liquidation strategy.
- [10215](https://github.com/vegaprotocol/vega/issues/10215) - Listing transactions on block explorer does not support the field `limit` any more.
- [8056](https://github.com/vegaprotocol/vega/issues/8056) - Getting a transfer by ID now returns a `TransferNode`.
- [10597](https://github.com/vegaprotocol/vega/pull/10597) - Migrate the `IPFS` store for the network history to version 15.

### 🗑️ Deprecation

Expand Down
52 changes: 52 additions & 0 deletions cmd/data-node/commands/migrate-ipfs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (C) 2023 Gobalsky Labs Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package commands

import (
"context"
"path/filepath"

"code.vegaprotocol.io/vega/datanode/config"
"code.vegaprotocol.io/vega/datanode/networkhistory/ipfs"
"code.vegaprotocol.io/vega/logging"
"code.vegaprotocol.io/vega/paths"

"github.com/jessevdk/go-flags"
)

type MigrateIpfsCmd struct {
config.VegaHomeFlag
}

func MigrateIpfs(ctx context.Context, parser *flags.Parser) error {
migrateIpfsCmd = MigrateIpfsCmd{}

_, err := parser.AddCommand("migrate-ipfs", "Update IPFS store version", "Migrate IPFS store to the latest version supported by Vega", &migrateIpfsCmd)

return err
}

var migrateIpfsCmd MigrateIpfsCmd

func (cmd *MigrateIpfsCmd) Execute(_ []string) error {
log := logging.NewLoggerFromConfig(logging.NewDefaultConfig())
defer log.AtExit()

vegaPaths := paths.New(cmd.VegaHome)
ipfsDir := filepath.Join(vegaPaths.StatePathFor(paths.DataNodeNetworkHistoryHome), "store", "ipfs")

return ipfs.MigrateIpfsStorageVersion(log, ipfsDir)
}
1 change: 1 addition & 0 deletions cmd/data-node/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func Execute(ctx context.Context) error {
LastBlock,
UnsafeResetAll,
networkhistory.NetworkHistory,
MigrateIpfs,
); err != nil {
fmt.Printf("%+v\n", err)
return err
Expand Down
19 changes: 19 additions & 0 deletions cmd/data-node/commands/start/node_pre.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (
"code.vegaprotocol.io/vega/datanode/broker"
"code.vegaprotocol.io/vega/datanode/config"
"code.vegaprotocol.io/vega/datanode/networkhistory"
"code.vegaprotocol.io/vega/datanode/networkhistory/ipfs"
"code.vegaprotocol.io/vega/datanode/networkhistory/snapshot"
"code.vegaprotocol.io/vega/datanode/networkhistory/store"
"code.vegaprotocol.io/vega/datanode/sqlstore"
"code.vegaprotocol.io/vega/libs/fs"
"code.vegaprotocol.io/vega/libs/pprof"
"code.vegaprotocol.io/vega/libs/subscribers"
"code.vegaprotocol.io/vega/logging"
Expand Down Expand Up @@ -97,6 +99,23 @@ func (l *NodeCommand) persistentPre([]string) (err error) {
if ResetDatabaseAndNetworkHistory(l.ctx, l.Log, l.vegaPaths, l.conf.SQLStore.ConnectionConfig); err != nil {
return fmt.Errorf("failed to reset database and network history: %w", err)
}
} else if !l.conf.SQLStore.WipeOnStartup && l.conf.NetworkHistory.Enabled {
ipfsDir := filepath.Join(l.vegaPaths.StatePathFor(paths.DataNodeNetworkHistoryHome), "store", "ipfs")
ipfsExists, err := fs.PathExists(ipfsDir)
if err != nil {
return fmt.Errorf("failed to check if ipfs store is already initialized")
}

// We do not care for migration when the ipfs store does not exist on the local file system
if ipfsExists {
preLog.Info("Migrating the IPFS storage to the latest version")
if err := ipfs.MigrateIpfsStorageVersion(preLog, ipfsDir); err != nil {
return fmt.Errorf("failed to migrate the ipfs version")
}
preLog.Info("Migrating the IPFS storage finished")
} else {
preLog.Info("IPFS store not initialized. Migration not needed")
}
}

initialisedFromNetworkHistory := false
Expand Down
102 changes: 102 additions & 0 deletions datanode/networkhistory/ipfs/ipfsfetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright (C) 2023 Gobalsky Labs Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package ipfs

import (
"bytes"
"context"
"fmt"
"io"
"path"
"strings"
"time"

kuboClient "github.com/ipfs/kubo/client/rpc"
"github.com/ipfs/kubo/repo/fsrepo/migrations"
)

const (
shellUpTimeout = 2 * time.Second
defaultFetchLimit = 1024 * 1024 * 512
)

type ipfsFetcher struct {
distPath string
ipfsDir string
limit int64
}

// newIpfsFetcher creates a new IpfsFetcher
//
// Specifying "" for distPath sets the default IPNS path.
// Specifying 0 for fetchLimit sets the default, -1 means no limit.
func newIpfsFetcher(distPath string, ipfsDir string, fetchLimit int64) *ipfsFetcher {
f := &ipfsFetcher{
limit: defaultFetchLimit,
distPath: migrations.LatestIpfsDist,
ipfsDir: ipfsDir,
}

if distPath != "" {
if !strings.HasPrefix(distPath, "/") {
distPath = "/" + distPath
}
f.distPath = distPath
}

if fetchLimit != 0 {
if fetchLimit == -1 {
fetchLimit = 0
}
f.limit = fetchLimit
}

return f
}

func (f *ipfsFetcher) Close() error {
return nil
}

// Fetch attempts to fetch the file at the given path, from the distribution
// site configured for this HttpFetcher. Returns io.ReadCloser on success,
// which caller must close.
func (f *ipfsFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) {
sh, err := kuboClient.NewPathApi(f.ipfsDir)
if err != nil {
return nil, fmt.Errorf("failed to create a ipfs shell migration: %w", err)
}
resp, err := sh.Request("cat", path.Join(f.distPath, filePath)).Send(ctx)
if err != nil {
return nil, fmt.Errorf("failed to read file from the ipfs node: %w", err)
}
if resp.Error != nil {
return nil, resp.Error
}
defer resp.Close()

var output io.Reader
if f.limit != 0 {
output = migrations.NewLimitReadCloser(resp.Output, f.limit)
} else {
output = resp.Output
}

buf := new(bytes.Buffer)
buf.ReadFrom(output)

return buf.Bytes(), nil
}
87 changes: 87 additions & 0 deletions datanode/networkhistory/ipfs/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (C) 2023 Gobalsky Labs Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package ipfs

import (
"context"
"fmt"

"code.vegaprotocol.io/vega/logging"

"github.com/ipfs/kubo/repo/fsrepo"
"github.com/ipfs/kubo/repo/fsrepo/migrations"
)

func createFetcher(distPath string, ipfsDir string) migrations.Fetcher {
const userAgent = "fs-repo-migrations"

if distPath == "" {
distPath = migrations.GetDistPathEnv(migrations.LatestIpfsDist)
}

return migrations.NewMultiFetcher(
newIpfsFetcher(distPath, ipfsDir, 0),
migrations.NewHttpFetcher(distPath, "", userAgent, 0))
}

// LatestSupportedVersion returns the latest version supported by the kubo library.
func latestSupportedVersion() int {
// TODO: Maybe We should hardcode it to be safe and control when the migration happens?
return fsrepo.RepoVersion
}

// IsMigrationNeeded check if migration of the IPFS repository is needed.
func isMigrationNeeded(ipfsDir string) (bool, error) {
repoVersion, err := migrations.RepoVersion(ipfsDir)
if err != nil {
return false, fmt.Errorf("failed to check version for the %s IPFS repository: %w", ipfsDir, err)
}

return repoVersion < latestSupportedVersion(), nil
}

// MigrateIpfsStorageVersion migrates the IPFS store to the latest supported by the
// library version.
// High level overview:
// 1. Check version of the local store,
// 2. Check max supported version for the kubo library,
// 3. Connect to local or remote IPFS node and download required migration binaries,
// 4. Run downloaded binaries to migrate the file system.
func MigrateIpfsStorageVersion(log *logging.Logger, ipfsDir string) error {
isMigrationNeeded, err := isMigrationNeeded(ipfsDir)
if err != nil {
return fmt.Errorf("failed to check if the ipfs migration is needed: %w", err)
}
if !isMigrationNeeded {
if log != nil {
log.Info("The IPFS for the network-history is up to date. Migration not needed")
}
return nil
}

localIpfsDir, err := migrations.IpfsDir(ipfsDir)
if err != nil {
return fmt.Errorf("failed to find local ipfs directory: %w", err)
}

fetcher := createFetcher("", localIpfsDir)
err = migrations.RunMigration(context.Background(), fetcher, latestSupportedVersion(), localIpfsDir, false)
if err != nil {
return fmt.Errorf("failed to execute the ipfs migration: %w", err)
}

return nil
}
Loading