Skip to content

Commit

Permalink
runtime: add stat and resize APIs to containerd-shim-v2
Browse files Browse the repository at this point in the history
To query fs stats and resize fs, the requests need to be passed to
kata agent through containerd-shim-v2. So we're adding to rest APIs
on the shim management endpoint.
Also refactor shim management client to its own go file.

Fixes: kata-containers#3454

Signed-off-by: Feng Wang <[email protected]>
  • Loading branch information
fengwang-db committed Mar 4, 2022
1 parent 6e0090a commit e9b5a25
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 63 deletions.
4 changes: 2 additions & 2 deletions src/runtime/cmd/kata-runtime/kata-exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"time"

"github.com/containerd/console"
kataMonitor "github.com/kata-containers/kata-containers/src/runtime/pkg/kata-monitor"
"github.com/kata-containers/kata-containers/src/runtime/pkg/katautils"
"github.com/kata-containers/kata-containers/src/runtime/pkg/utils/shimclient"
clientUtils "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/agent/protocols/client"
"github.com/pkg/errors"
"github.com/urfave/cli"
Expand Down Expand Up @@ -154,7 +154,7 @@ func (s *iostream) Read(data []byte) (n int, err error) {
}

func getConn(sandboxID string, port uint64) (net.Conn, error) {
client, err := kataMonitor.BuildShimClient(sandboxID, defaultTimeout)
client, err := shimclient.BuildShimClient(sandboxID, defaultTimeout)
if err != nil {
return nil, err
}
Expand Down
41 changes: 39 additions & 2 deletions src/runtime/cmd/kata-runtime/kata-volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@
package main

import (
"encoding/json"
"net/url"

containerdshim "github.com/kata-containers/kata-containers/src/runtime/pkg/containerd-shim-v2"
"github.com/kata-containers/kata-containers/src/runtime/pkg/direct-volume"
"github.com/kata-containers/kata-containers/src/runtime/pkg/utils/shimclient"

"github.com/urfave/cli"
)

Expand Down Expand Up @@ -78,7 +84,7 @@ var statsCommand = cli.Command{
},
},
Action: func(c *cli.Context) (string, error) {
stats, err := volume.Stats(volumePath)
stats, err := Stats(volumePath)
if err != nil {
return "", err
}
Expand All @@ -103,6 +109,37 @@ var resizeCommand = cli.Command{
},
},
Action: func(c *cli.Context) error {
return volume.Resize(volumePath, size)
return Resize(volumePath, size)
},
}

// Stats retrieves the filesystem stats of the direct volume inside the guest.
func Stats(volumePath string) ([]byte, error) {
sandboxId, err := volume.GetSandboxIdForVolume(volumePath)
if err != nil {
return nil, err
}
urlSafeDevicePath := url.PathEscape(volumePath)
body, err := shimclient.DoGet(sandboxId, defaultTimeout, containerdshim.DirectVolumeStatUrl+"/"+urlSafeDevicePath)
if err != nil {
return nil, err
}
return body, nil
}

// Resize resizes a direct volume inside the guest.
func Resize(volumePath string, size uint64) error {
sandboxId, err := volume.GetSandboxIdForVolume(volumePath)
if err != nil {
return err
}
resizeReq := containerdshim.ResizeRequest{
VolumePath: volumePath,
Size: size,
}
encoded, err := json.Marshal(resizeReq)
if err != nil {
return err
}
return shimclient.DoPost(sandboxId, defaultTimeout, containerdshim.DirectVolumeResizeUrl, encoded)
}
66 changes: 63 additions & 3 deletions src/runtime/pkg/containerd-shim-v2/shim_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,45 @@ package containerdshim

import (
"context"
"encoding/json"
"expvar"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/pprof"
"net/url"
"path/filepath"
"strconv"
"strings"

"google.golang.org/grpc/codes"

cdshim "github.com/containerd/containerd/runtime/v2/shim"
mutils "github.com/kata-containers/kata-containers/src/runtime/pkg/utils"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
vcAnnotations "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/annotations"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)

"google.golang.org/grpc/codes"

mutils "github.com/kata-containers/kata-containers/src/runtime/pkg/utils"
const (
DirectVolumeStatUrl = "/direct-volume/stats"
DirectVolumeResizeUrl = "/direct-volume/resize"
)

var (
ifSupportAgentMetricsAPI = true
shimMgtLog = shimLog.WithField("subsystem", "shim-management")
)

type ResizeRequest struct {
VolumePath string
Size uint64
}

// agentURL returns URL for agent
func (s *service) agentURL(w http.ResponseWriter, r *http.Request) {
url, err := s.sandbox.GetAgentURL()
Expand Down Expand Up @@ -126,6 +138,52 @@ func decodeAgentMetrics(body string) []*dto.MetricFamily {
return list
}

func (s *service) serveVolumeStats(w http.ResponseWriter, r *http.Request) {
volumePath, err := url.PathUnescape(strings.TrimPrefix(r.URL.Path, DirectVolumeStatUrl))
if err != nil {
shimMgtLog.WithError(err).Error("failed to unescape the volume stat url path")
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}

buf, err := s.sandbox.GuestVolumeStats(context.Background(), volumePath)
if err != nil {
shimMgtLog.WithError(err).WithField("volume-path", volumePath).Error("failed to get volume stats")
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.Write(buf)
}

func (s *service) serveVolumeResize(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
shimMgtLog.WithError(err).Error("failed to read request body")
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
var resizeReq ResizeRequest
err = json.Unmarshal(body, &resizeReq)
if err != nil {
shimMgtLog.WithError(err).Error("failed to unmarshal the http request body")
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}

err = s.sandbox.ResizeGuestVolume(context.Background(), resizeReq.VolumePath, resizeReq.Size)
if err != nil {
shimMgtLog.WithError(err).WithField("volume-path", resizeReq.VolumePath).Error("failed to resize the volume")
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.Write([]byte(""))
}

func (s *service) startManagementServer(ctx context.Context, ociSpec *specs.Spec) {
// metrics socket will under sandbox's bundle path
metricsAddress := SocketAddress(s.id)
Expand All @@ -148,6 +206,8 @@ func (s *service) startManagementServer(ctx context.Context, ociSpec *specs.Spec
m := http.NewServeMux()
m.Handle("/metrics", http.HandlerFunc(s.serveMetrics))
m.Handle("/agent-url", http.HandlerFunc(s.agentURL))
m.Handle(DirectVolumeStatUrl, http.HandlerFunc(s.serveVolumeStats))
m.Handle(DirectVolumeResizeUrl, http.HandlerFunc(s.serveVolumeResize))
s.mountPprofHandle(m, ociSpec)

// register shim metrics
Expand Down
6 changes: 3 additions & 3 deletions src/runtime/pkg/kata-monitor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"time"

mutils "github.com/kata-containers/kata-containers/src/runtime/pkg/utils"

"github.com/kata-containers/kata-containers/src/runtime/pkg/utils/shimclient"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"

Expand Down Expand Up @@ -224,7 +224,7 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
}

func getParsedMetrics(sandboxID string, sandboxMetadata sandboxCRIMetadata) ([]*dto.MetricFamily, error) {
body, err := doGet(sandboxID, defaultTimeout, "metrics")
body, err := shimclient.DoGet(sandboxID, defaultTimeout, "metrics")
if err != nil {
return nil, err
}
Expand All @@ -234,7 +234,7 @@ func getParsedMetrics(sandboxID string, sandboxMetadata sandboxCRIMetadata) ([]*

// GetSandboxMetrics will get sandbox's metrics from shim
func GetSandboxMetrics(sandboxID string) (string, error) {
body, err := doGet(sandboxID, defaultTimeout, "metrics")
body, err := shimclient.DoGet(sandboxID, defaultTimeout, "metrics")
if err != nil {
return "", err
}
Expand Down
4 changes: 3 additions & 1 deletion src/runtime/pkg/kata-monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"sync"
"time"

"github.com/kata-containers/kata-containers/src/runtime/pkg/utils/shimclient"

"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -180,7 +182,7 @@ func (km *KataMonitor) GetAgentURL(w http.ResponseWriter, r *http.Request) {
return
}

data, err := doGet(sandboxID, defaultTimeout, "agent-url")
data, err := shimclient.DoGet(sandboxID, defaultTimeout, "agent-url")
if err != nil {
commonServeError(w, http.StatusBadRequest, err)
return
Expand Down
52 changes: 0 additions & 52 deletions src/runtime/pkg/kata-monitor/shim_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,9 @@ package katamonitor

import (
"fmt"
"io"
"net"
"net/http"
"time"

cdshim "github.com/containerd/containerd/runtime/v2/shim"

shim "github.com/kata-containers/kata-containers/src/runtime/pkg/containerd-shim-v2"
)

Expand All @@ -40,51 +36,3 @@ func getSandboxIDFromReq(r *http.Request) (string, error) {
func getSandboxFS() string {
return shim.GetSandboxesStoragePath()
}

// BuildShimClient builds and returns an http client for communicating with the provided sandbox
func BuildShimClient(sandboxID string, timeout time.Duration) (*http.Client, error) {
return buildUnixSocketClient(shim.SocketAddress(sandboxID), timeout)
}

// buildUnixSocketClient build http client for Unix socket
func buildUnixSocketClient(socketAddr string, timeout time.Duration) (*http.Client, error) {
transport := &http.Transport{
DisableKeepAlives: true,
Dial: func(proto, addr string) (conn net.Conn, err error) {
return cdshim.AnonDialer(socketAddr, timeout)
},
}

client := &http.Client{
Transport: transport,
}

if timeout > 0 {
client.Timeout = timeout
}

return client, nil
}

func doGet(sandboxID string, timeoutInSeconds time.Duration, urlPath string) ([]byte, error) {
client, err := BuildShimClient(sandboxID, timeoutInSeconds)
if err != nil {
return nil, err
}

resp, err := client.Get(fmt.Sprintf("http://shim/%s", urlPath))
if err != nil {
return nil, err
}

defer func() {
resp.Body.Close()
}()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

return body, nil
}
79 changes: 79 additions & 0 deletions src/runtime/pkg/utils/shimclient/shim_management_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) 2022 Databricks Inc.
//
// SPDX-License-Identifier: Apache-2.0
//

package shimclient

import (
"bytes"
"fmt"
"io"
"net"
"net/http"
"time"

cdshim "github.com/containerd/containerd/runtime/v2/shim"
shim "github.com/kata-containers/kata-containers/src/runtime/pkg/containerd-shim-v2"
)

// BuildShimClient builds and returns an http client for communicating with the provided sandbox
func BuildShimClient(sandboxID string, timeout time.Duration) (*http.Client, error) {
return buildUnixSocketClient(shim.SocketAddress(sandboxID), timeout)
}

// buildUnixSocketClient build http client for Unix socket
func buildUnixSocketClient(socketAddr string, timeout time.Duration) (*http.Client, error) {
transport := &http.Transport{
DisableKeepAlives: true,
Dial: func(proto, addr string) (conn net.Conn, err error) {
return cdshim.AnonDialer(socketAddr, timeout)
},
}

client := &http.Client{
Transport: transport,
}

if timeout > 0 {
client.Timeout = timeout
}

return client, nil
}

func DoGet(sandboxID string, timeoutInSeconds time.Duration, urlPath string) ([]byte, error) {
client, err := BuildShimClient(sandboxID, timeoutInSeconds)
if err != nil {
return nil, err
}

resp, err := client.Get(fmt.Sprintf("http://shim/%s", urlPath))
if err != nil {
return nil, err
}

defer func() {
resp.Body.Close()
}()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

return body, nil
}

func DoPost(sandboxID string, timeoutInSeconds time.Duration, urlPath string, payload []byte) error {
client, err := BuildShimClient(sandboxID, timeoutInSeconds)
if err != nil {
return err
}

resp, err := client.Post(fmt.Sprintf("http://shim/%s", urlPath), "application/json", bytes.NewBuffer(payload))
defer func() {
resp.Body.Close()
}()
return err
}

0 comments on commit e9b5a25

Please sign in to comment.