From cb4be142bfcb314571832d1a5126dd777e49192f Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Wed, 10 Nov 2021 19:55:35 +0800 Subject: [PATCH] feat: calculate piece metadata digest (#787) * feat: calculate piece metadata digest * fix: back source not work in midway * chore: add partial back source test * chore: optimize digest log Signed-off-by: Jim Ma --- client/daemon/daemon.go | 2 +- client/daemon/peer/peertask_base.go | 37 ++- client/daemon/peer/peertask_file.go | 12 +- client/daemon/peer/peertask_file_callback.go | 19 ++ client/daemon/peer/peertask_manager.go | 10 +- .../daemon/peer/peertask_manager_mock_test.go | 28 +- client/daemon/peer/peertask_manager_test.go | 5 +- client/daemon/peer/peertask_stream.go | 17 +- ...peertask_stream_backsource_partial_test.go | 268 ++++++++++++++++++ .../daemon/peer/peertask_stream_callback.go | 19 ++ client/daemon/peer/peertask_stream_test.go | 8 +- client/daemon/peer/piece_downloader.go | 4 +- client/daemon/peer/piece_downloader_test.go | 2 + client/daemon/peer/piece_manager.go | 30 +- client/daemon/peer/piece_manager_test.go | 3 +- client/daemon/storage/local_storage.go | 127 +++++++-- client/daemon/storage/metadata.go | 4 + client/daemon/storage/storage_manager.go | 39 ++- client/daemon/test/mock/storage/manager.go | 58 ++++ .../hdfsprotocol/hdfs_source_client_test.go | 2 + pkg/util/digestutils/digest_reader.go | 12 +- pkg/util/digestutils/digest_reader_test.go | 4 +- 22 files changed, 646 insertions(+), 64 deletions(-) create mode 100644 client/daemon/peer/peertask_stream_backsource_partial_test.go diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index c8c2e6b21f3..22a6eab1736 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -134,7 +134,7 @@ func New(opt *config.DaemonOption) (Daemon, error) { return nil, err } peerTaskManager, err := peer.NewPeerTaskManager(host, pieceManager, storageManager, sched, opt.Scheduler, - opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex) + opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.CalculateDigest) if err != nil { return nil, err } diff --git a/client/daemon/peer/peertask_base.go b/client/daemon/peer/peertask_base.go index eb8f6d58785..7d88cdbb245 100644 --- a/client/daemon/peer/peertask_base.go +++ b/client/daemon/peer/peertask_base.go @@ -87,6 +87,7 @@ type peerTask struct { peerID string taskID string totalPiece int32 + md5 string contentLength *atomic.Int64 completedLength *atomic.Int64 usedTraffic *atomic.Int64 @@ -134,6 +135,7 @@ type peerTask struct { type pieceTaskResult struct { piece *base.PieceInfo pieceResult *scheduler.PieceResult + notRetry bool err error } @@ -177,6 +179,14 @@ func (pt *peerTask) SetTotalPieces(i int32) { pt.setTotalPiecesFunc(i) } +func (pt *peerTask) SetPieceMd5Sign(md5 string) { + pt.md5 = md5 +} + +func (pt *peerTask) GetPieceMd5Sign() string { + return pt.md5 +} + func (pt *peerTask) Context() context.Context { return pt.ctx } @@ -337,6 +347,8 @@ func (pt *peerTask) pullSinglePiece(cleanUnfinishedFunc func()) { span.SetAttributes(config.AttributePiece.Int(int(pt.singlePiece.PieceInfo.PieceNum))) pt.contentLength.Store(int64(pt.singlePiece.PieceInfo.RangeSize)) + pt.SetTotalPieces(1) + pt.SetPieceMd5Sign(pt.singlePiece.PieceInfo.PieceMd5) if err := pt.callback.Init(pt); err != nil { pt.failedReason = err.Error() pt.failedCode = dfcodes.ClientError @@ -352,6 +364,7 @@ func (pt *peerTask) pullSinglePiece(cleanUnfinishedFunc func()) { DstPid: pt.singlePiece.DstPid, DstAddr: pt.singlePiece.DstAddr, piece: pt.singlePiece.PieceInfo, + log: pt.Log(), } if pt.pieceManager.DownloadPiece(ctx, pt, request) { pt.Infof("single piece download success") @@ -448,6 +461,13 @@ loop: pt.Debugf("update total piece count: %d", pt.totalPiece) } + // update md5 digest + if len(piecePacket.PieceMd5Sign) > 0 && len(pt.md5) == 0 { + pt.md5 = piecePacket.PieceMd5Sign + _ = pt.callback.Update(pt) + pt.Debugf("update digest: %s", pt.md5) + } + // 3. dispatch piece request to all workers pt.dispatchPieceRequest(pieceRequestCh, piecePacket) @@ -554,6 +574,7 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) { pt.Infof("start download from source due to dfcodes.SchedNeedBackSource") pt.span.AddEvent("back source due to scheduler says need back source ") pt.needBackSource = true + // TODO optimize back source when already downloaded some pieces pt.backSource() case <-time.After(pt.schedulerOption.ScheduleTimeout.Duration): if pt.schedulerOption.DisableAutoBackSource { @@ -574,7 +595,8 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) { func (pt *peerTask) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceRequest, piecePacket *base.PiecePacket) { for _, piece := range piecePacket.PieceInfos { - pt.Infof("get piece %d from %s/%s", piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid) + pt.Infof("get piece %d from %s/%s, md5: %s, start: %d, size: %d", + piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize) if !pt.requestedPieces.IsSet(piece.PieceNum) { pt.requestedPieces.Set(piece.PieceNum) } @@ -583,6 +605,7 @@ func (pt *peerTask) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceReque DstPid: piecePacket.DstPid, DstAddr: piecePacket.DstAddr, piece: piece, + log: pt.Log(), } select { case pieceRequestCh <- req: @@ -721,10 +744,10 @@ func (pt *peerTask) preparePieceTasksByPeer(curPeerPacket *scheduler.PeerPacket, // when cdn returns dfcodes.CdnTaskNotFound, report it to scheduler and wait cdn download it. retry: - pt.Debugf("get piece task from peer %s, piece num: %d, limit: %d\"", peer.PeerId, request.StartNum, request.Limit) + pt.Debugf("try get piece task from peer %s, piece num: %d, limit: %d\"", peer.PeerId, request.StartNum, request.Limit) p, err := pt.getPieceTasks(span, curPeerPacket, peer, request) if err == nil { - pt.Infof("get piece task from peer %s ok, pieces length: %d", peer.PeerId, len(p.PieceInfos)) + pt.Infof("got piece task from peer %s ok, pieces length: %d", peer.PeerId, len(p.PieceInfos)) span.SetAttributes(config.AttributeGetPieceCount.Int(len(p.PieceInfos))) return p, nil } @@ -740,7 +763,7 @@ retry: // context canceled, just exit if se.GRPCStatus().Code() == codes.Canceled { span.AddEvent("context canceled") - pt.Warnf("get piece task from peer(%s) canceled: %s", peer.PeerId, err) + pt.Warnf("get piece task from peer %s canceled: %s", peer.PeerId, err) return nil, err } } @@ -750,7 +773,7 @@ retry: pt.Debugf("get piece task from peer %s with df error, code: %d", peer.PeerId, de.Code) code = de.Code } - pt.Errorf("get piece task from peer(%s) error: %s, code: %d", peer.PeerId, err, code) + pt.Errorf("get piece task from peer %s error: %s, code: %d", peer.PeerId, err, code) perr := pt.peerPacketStream.Send(&scheduler.PieceResult{ TaskId: pt.taskID, SrcPid: pt.peerID, @@ -763,7 +786,7 @@ retry: }) if perr != nil { span.RecordError(perr) - pt.Errorf("send piece result error: %s, code: %d", err, code) + pt.Errorf("send piece result error: %s, code to send: %d", err, code) } if code == dfcodes.CdnTaskNotFound && curPeerPacket == pt.peerPacket.Load().(*scheduler.PeerPacket) { @@ -808,7 +831,7 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer }) if er != nil { span.RecordError(er) - pt.Errorf("send piece result error: %s, code: %d", peer.PeerId, er) + pt.Errorf("send piece result with dfcodes.ClientWaitPieceReady error: %s", er) } // fast way to exit retry lastPeerPacket := pt.peerPacket.Load().(*scheduler.PeerPacket) diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index 7ff3acd6dcb..4d3a46f9019 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -238,13 +238,17 @@ func (pt *filePeerTask) ReportPieceResult(result *pieceTaskResult) error { if !result.pieceResult.Success { result.pieceResult.FinishedCount = pt.readyPieces.Settled() _ = pt.peerPacketStream.Send(result.pieceResult) + if result.notRetry { + pt.Warnf("piece %d download failed, no retry", result.piece.PieceNum) + return nil + } select { case <-pt.done: pt.Infof("peer task done, stop to send failed piece") case <-pt.ctx.Done(): pt.Debugf("context done due to %s, stop to send failed piece", pt.ctx.Err()) case pt.failedPieceCh <- result.pieceResult.PieceInfo.PieceNum: - pt.Warnf("%d download failed, retry later", result.piece.PieceNum) + pt.Warnf("piece %d download failed, retry later", result.piece.PieceNum) } return nil @@ -295,6 +299,12 @@ func (pt *filePeerTask) ReportPieceResult(result *pieceTaskResult) error { func (pt *filePeerTask) finish() error { var err error + if err = pt.callback.ValidateDigest(pt); err != nil { + pt.Errorf("validate digest error: %s", err) + pt.span.RecordError(err) + pt.cleanUnfinished() + return err + } // send last progress pt.once.Do(func() { defer pt.recoverFromPanic() diff --git a/client/daemon/peer/peertask_file_callback.go b/client/daemon/peer/peertask_file_callback.go index 826b29c4b91..984a7f1fbad 100644 --- a/client/daemon/peer/peertask_file_callback.go +++ b/client/daemon/peer/peertask_file_callback.go @@ -53,6 +53,7 @@ func (p *filePeerTaskCallback) Init(pt Task) error { }, ContentLength: pt.GetContentLength(), TotalPieces: pt.GetTotalPieces(), + PieceMd5Sign: pt.GetPieceMd5Sign(), }) if err != nil { pt.Log().Errorf("register task to storage manager failed: %s", err) @@ -70,6 +71,7 @@ func (p *filePeerTaskCallback) Update(pt Task) error { }, ContentLength: pt.GetContentLength(), TotalPieces: pt.GetTotalPieces(), + PieceMd5Sign: pt.GetPieceMd5Sign(), }) if err != nil { pt.Log().Errorf("update task to storage manager failed: %s", err) @@ -150,3 +152,20 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro } return nil } + +func (p *filePeerTaskCallback) ValidateDigest(pt Task) error { + if !p.ptm.calculateDigest { + return nil + } + err := p.ptm.storageManager.ValidateDigest( + &storage.PeerTaskMetaData{ + PeerID: pt.GetPeerID(), + TaskID: pt.GetTaskID(), + }) + if err != nil { + pt.Log().Errorf("%s", err) + } else { + pt.Log().Debugf("validated digest") + } + return err +} diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index 959da175c22..b96ab3ab656 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -73,6 +73,8 @@ type Task interface { SetCallback(TaskCallback) AddTraffic(int64) GetTraffic() int64 + SetPieceMd5Sign(string) + GetPieceMd5Sign() string } // TaskCallback inserts some operations for peer task download lifecycle @@ -82,6 +84,7 @@ type TaskCallback interface { Update(pt Task) error Fail(pt Task, code base.Code, reason string) error GetStartTime() time.Time + ValidateDigest(pt Task) error } type TinyData struct { @@ -113,6 +116,8 @@ type peerTaskManager struct { // currently, only check completed peer task after register to scheduler // TODO multiplex the running peer task enableMultiplex bool + + calculateDigest bool } func NewPeerTaskManager( @@ -122,7 +127,8 @@ func NewPeerTaskManager( schedulerClient schedulerclient.SchedulerClient, schedulerOption config.SchedulerOption, perPeerRateLimit rate.Limit, - multiplex bool) (TaskManager, error) { + multiplex bool, + calculateDigest bool) (TaskManager, error) { ptm := &peerTaskManager{ host: host, @@ -133,6 +139,7 @@ func NewPeerTaskManager( schedulerOption: schedulerOption, perPeerRateLimit: perPeerRateLimit, enableMultiplex: multiplex, + calculateDigest: calculateDigest, } return ptm, nil } @@ -266,6 +273,7 @@ func (ptm *peerTaskManager) storeTinyPeerTask(ctx context.Context, tiny *TinyDat }, ContentLength: l, TotalPieces: 1, + // TODO check md5 digest }) if err != nil { logger.Errorf("register tiny data storage failed: %s", err) diff --git a/client/daemon/peer/peertask_manager_mock_test.go b/client/daemon/peer/peertask_manager_mock_test.go index 1ead1e6ba58..ba74bfefef4 100644 --- a/client/daemon/peer/peertask_manager_mock_test.go +++ b/client/daemon/peer/peertask_manager_mock_test.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: client/daemon/peer/peertask_manager.go +// Source: peertask_manager.go // Package peer is a generated GoMock package. package peer @@ -176,6 +176,20 @@ func (mr *MockTaskMockRecorder) GetPeerID() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPeerID", reflect.TypeOf((*MockTask)(nil).GetPeerID)) } +// GetPieceMd5Sign mocks base method. +func (m *MockTask) GetPieceMd5Sign() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPieceMd5Sign") + ret0, _ := ret[0].(string) + return ret0 +} + +// GetPieceMd5Sign indicates an expected call of GetPieceMd5Sign. +func (mr *MockTaskMockRecorder) GetPieceMd5Sign() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPieceMd5Sign", reflect.TypeOf((*MockTask)(nil).GetPieceMd5Sign)) +} + // GetTaskID mocks base method. func (m *MockTask) GetTaskID() string { m.ctrl.T.Helper() @@ -272,6 +286,18 @@ func (mr *MockTaskMockRecorder) SetContentLength(arg0 interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetContentLength", reflect.TypeOf((*MockTask)(nil).SetContentLength), arg0) } +// SetPieceMd5Sign mocks base method. +func (m *MockTask) SetPieceMd5Sign(arg0 string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetPieceMd5Sign", arg0) +} + +// SetPieceMd5Sign indicates an expected call of SetPieceMd5Sign. +func (mr *MockTaskMockRecorder) SetPieceMd5Sign(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetPieceMd5Sign", reflect.TypeOf((*MockTask)(nil).SetPieceMd5Sign), arg0) +} + // SetTotalPieces mocks base method. func (m *MockTask) SetTotalPieces(arg0 int32) { m.ctrl.T.Helper() diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index a49627ca206..12cfe2b05ab 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -64,7 +64,7 @@ type componentsOption struct { func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOption) ( schedulerclient.SchedulerClient, storage.Manager) { port := int32(freeport.GetPort()) - // 1. setup a mock daemon server for uploading pieces info + // 1. set up a mock daemon server for uploading pieces info var daemon = mock_daemon.NewMockDaemonServer(ctrl) daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func(ctx context.Context, request *base.PieceTaskRequest) (*base.PiecePacket, error) { var tasks []*base.PieceInfo @@ -147,10 +147,11 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio func(ctx context.Context, pr *scheduler.PeerResult, opts ...grpc.CallOption) error { return nil }) + tempDir, _ := ioutil.TempDir("", "d7y-test-*") storageManager, _ := storage.NewStorageManager( config.SimpleLocalTaskStoreStrategy, &config.StorageOption{ - DataPath: test.DataDir, + DataPath: tempDir, TaskExpireTime: clientutil.Duration{ Duration: -1 * time.Second, }, diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index 36757f4e5d3..a1f268c1a82 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -193,14 +193,19 @@ func (s *streamPeerTask) ReportPieceResult(result *pieceTaskResult) error { defer s.recoverFromPanic() // retry failed piece if !result.pieceResult.Success { + result.pieceResult.FinishedCount = s.readyPieces.Settled() _ = s.peerPacketStream.Send(result.pieceResult) + if result.notRetry { + s.Warnf("piece %d download failed, no retry", result.piece.PieceNum) + return nil + } select { case <-s.done: s.Infof("peer task done, stop to send failed piece") case <-s.ctx.Done(): s.Debugf("context done due to %s, stop to send failed piece", s.ctx.Err()) case s.failedPieceCh <- result.pieceResult.PieceInfo.PieceNum: - s.Warnf("%d download failed, retry later", result.piece.PieceNum) + s.Warnf("piece %d download failed, retry later", result.piece.PieceNum) } return nil } @@ -297,6 +302,10 @@ func (s *streamPeerTask) finish() error { // send last progress s.once.Do(func() { s.success = true + if err := s.callback.Update(s); err != nil { + s.span.RecordError(err) + s.Errorf("update callback error: %s", err) + } // let stream return immediately close(s.streamDone) // send EOF piece result to scheduler @@ -452,6 +461,12 @@ func (s *streamPeerTask) writeToPipe(firstPiece int32, pw *io.PipeWriter) { for { // all data wrote to local storage, and all data wrote to pipe write if s.readyPieces.Settled() == desired { + if err = s.callback.ValidateDigest(s); err != nil { + s.span.RecordError(err) + s.Errorf("validate digest error: %s", err) + _ = pw.CloseWithError(err) + return + } s.Debugf("all %d pieces wrote to pipe", desired) pw.Close() return diff --git a/client/daemon/peer/peertask_stream_backsource_partial_test.go b/client/daemon/peer/peertask_stream_backsource_partial_test.go new file mode 100644 index 00000000000..d3e8006ea50 --- /dev/null +++ b/client/daemon/peer/peertask_stream_backsource_partial_test.go @@ -0,0 +1,268 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package peer + +import ( + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "math" + "sync" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/phayes/freeport" + testifyassert "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + + "d7y.io/dragonfly/v2/client/clientutil" + "d7y.io/dragonfly/v2/client/config" + "d7y.io/dragonfly/v2/client/daemon/storage" + "d7y.io/dragonfly/v2/client/daemon/test" + mock_daemon "d7y.io/dragonfly/v2/client/daemon/test/mock/daemon" + mock_scheduler "d7y.io/dragonfly/v2/client/daemon/test/mock/scheduler" + "d7y.io/dragonfly/v2/internal/dfcodes" + "d7y.io/dragonfly/v2/internal/dferrors" + "d7y.io/dragonfly/v2/pkg/basic/dfnet" + "d7y.io/dragonfly/v2/pkg/rpc" + "d7y.io/dragonfly/v2/pkg/rpc/base" + daemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server" + "d7y.io/dragonfly/v2/pkg/rpc/scheduler" + schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" + "d7y.io/dragonfly/v2/pkg/source" + sourceMock "d7y.io/dragonfly/v2/pkg/source/mock" + "d7y.io/dragonfly/v2/pkg/util/digestutils" + rangers "d7y.io/dragonfly/v2/pkg/util/rangeutils" +) + +func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte, opt componentsOption) ( + schedulerclient.SchedulerClient, storage.Manager) { + port := int32(freeport.GetPort()) + // 1. set up a mock daemon server for uploading pieces info + var daemon = mock_daemon.NewMockDaemonServer(ctrl) + + var piecesMd5 []string + pieceCount := int32(math.Ceil(float64(opt.contentLength) / float64(opt.pieceSize))) + for i := int32(0); i < pieceCount; i++ { + if int64(i+1)*int64(opt.pieceSize) > opt.contentLength { + piecesMd5 = append(piecesMd5, digestutils.Md5Bytes(testBytes[int(i)*int(opt.pieceSize):])) + } else { + piecesMd5 = append(piecesMd5, digestutils.Md5Bytes(testBytes[int(i)*int(opt.pieceSize):int(i+1)*int(opt.pieceSize)])) + } + } + daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func(ctx context.Context, request *base.PieceTaskRequest) (*base.PiecePacket, error) { + var tasks []*base.PieceInfo + // only return first piece + if request.StartNum == 0 { + tasks = append(tasks, + &base.PieceInfo{ + PieceNum: request.StartNum, + RangeStart: uint64(0), + RangeSize: opt.pieceSize, + PieceMd5: digestutils.Md5Bytes(testBytes[0:opt.pieceSize]), + PieceOffset: 0, + PieceStyle: 0, + }) + } + return &base.PiecePacket{ + PieceMd5Sign: digestutils.Sha256(piecesMd5...), + TaskId: request.TaskId, + DstPid: "peer-x", + PieceInfos: tasks, + ContentLength: opt.contentLength, + TotalPiece: pieceCount, + }, nil + }) + ln, _ := rpc.Listen(dfnet.NetAddr{ + Type: "tcp", + Addr: fmt.Sprintf("0.0.0.0:%d", port), + }) + go daemonserver.New(daemon).Serve(ln) + time.Sleep(100 * time.Millisecond) + + // 2. setup a scheduler + pps := mock_scheduler.NewMockPeerPacketStream(ctrl) + wg := sync.WaitGroup{} + wg.Add(1) + pps.EXPECT().Send(gomock.Any()).AnyTimes().DoAndReturn( + func(pr *scheduler.PieceResult) error { + if pr.PieceInfo.PieceNum == 0 && pr.Success { + wg.Done() + } + return nil + }) + var ( + delayCount int + schedPeerPacket bool + ) + pps.EXPECT().Recv().AnyTimes().DoAndReturn( + func() (*scheduler.PeerPacket, error) { + if len(opt.peerPacketDelay) > delayCount { + if delay := opt.peerPacketDelay[delayCount]; delay > 0 { + time.Sleep(delay) + } + delayCount++ + } + if schedPeerPacket { + // send back source after piece 0 is done + wg.Wait() + return nil, dferrors.New(dfcodes.SchedNeedBackSource, "") + } + schedPeerPacket = true + return &scheduler.PeerPacket{ + Code: dfcodes.Success, + TaskId: opt.taskID, + SrcPid: "127.0.0.1", + ParallelCount: opt.pieceParallelCount, + MainPeer: &scheduler.PeerPacket_DestPeer{ + Ip: "127.0.0.1", + RpcPort: port, + PeerId: "peer-x", + }, + StealPeers: nil, + }, nil + }) + sched := mock_scheduler.NewMockSchedulerClient(ctrl) + sched.EXPECT().RegisterPeerTask(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn( + func(ctx context.Context, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (*scheduler.RegisterResult, error) { + return &scheduler.RegisterResult{ + TaskId: opt.taskID, + SizeScope: base.SizeScope_NORMAL, + DirectPiece: nil, + }, nil + }) + sched.EXPECT().ReportPieceResult(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn( + func(ctx context.Context, taskId string, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (schedulerclient.PeerPacketStream, error) { + return pps, nil + }) + sched.EXPECT().ReportPeerResult(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn( + func(ctx context.Context, pr *scheduler.PeerResult, opts ...grpc.CallOption) error { + return nil + }) + tempDir, _ := ioutil.TempDir("", "d7y-test-*") + storageManager, _ := storage.NewStorageManager( + config.SimpleLocalTaskStoreStrategy, + &config.StorageOption{ + DataPath: tempDir, + TaskExpireTime: clientutil.Duration{ + Duration: -1 * time.Second, + }, + }, func(request storage.CommonTaskRequest) {}) + return sched, storageManager +} + +func TestStreamPeerTask_BackSource_Partial_WithContentLength(t *testing.T) { + assert := testifyassert.New(t) + ctrl := gomock.NewController(t) + + testBytes, err := ioutil.ReadFile(test.File) + assert.Nil(err, "load test file") + + var ( + pieceParallelCount = int32(4) + pieceSize = 1024 + + mockContentLength = len(testBytes) + //mockPieceCount = int(math.Ceil(float64(mockContentLength) / float64(pieceSize))) + + peerID = "peer-back-source-partial-0" + taskID = "task-back-source-partial-0" + + url = "http://localhost/test/data" + ) + schedulerClient, storageManager := setupBackSourcePartialComponents( + ctrl, testBytes, + componentsOption{ + taskID: taskID, + contentLength: int64(mockContentLength), + pieceSize: int32(pieceSize), + pieceParallelCount: pieceParallelCount, + }) + defer storageManager.CleanUp() + + downloader := NewMockPieceDownloader(ctrl) + downloader.EXPECT().DownloadPiece(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn( + func(ctx context.Context, task *DownloadPieceRequest) (io.Reader, io.Closer, error) { + rc := ioutil.NopCloser( + bytes.NewBuffer( + testBytes[task.piece.RangeStart : task.piece.RangeStart+uint64(task.piece.RangeSize)], + )) + return rc, rc, nil + }) + + sourceClient := sourceMock.NewMockResourceClient(ctrl) + source.Register("http", sourceClient) + defer source.UnRegister("http") + sourceClient.EXPECT().GetContentLength(gomock.Any(), url, source.RequestHeader{}, gomock.Any()).DoAndReturn( + func(ctx context.Context, url string, headers source.RequestHeader, rang *rangers.Range) (int64, error) { + return int64(len(testBytes)), nil + }) + sourceClient.EXPECT().Download(gomock.Any(), url, source.RequestHeader{}, gomock.Any()).DoAndReturn( + func(ctx context.Context, url string, headers source.RequestHeader, rang *rangers.Range) (io.ReadCloser, error) { + return ioutil.NopCloser(bytes.NewBuffer(testBytes)), nil + }) + + pm := &pieceManager{ + calculateDigest: true, + storageManager: storageManager, + pieceDownloader: downloader, + computePieceSize: func(contentLength int64) int32 { + return int32(pieceSize) + }, + } + ptm := &peerTaskManager{ + calculateDigest: true, + host: &scheduler.PeerHost{ + Ip: "127.0.0.1", + }, + runningPeerTasks: sync.Map{}, + pieceManager: pm, + storageManager: storageManager, + schedulerClient: schedulerClient, + schedulerOption: config.SchedulerOption{ + ScheduleTimeout: clientutil.Duration{Duration: 10 * time.Minute}, + }, + } + req := &scheduler.PeerTaskRequest{ + Url: url, + UrlMeta: &base.UrlMeta{ + Tag: "d7y-test", + }, + PeerId: peerID, + PeerHost: &scheduler.PeerHost{}, + } + ctx := context.Background() + _, pt, _, err := newStreamPeerTask(ctx, ptm.host, pm, req, + ptm.schedulerClient, ptm.schedulerOption, 0) + assert.Nil(err, "new stream peer task") + pt.SetCallback(&streamPeerTaskCallback{ + ptm: ptm, + pt: pt, + req: req, + start: time.Now(), + }) + + rc, _, err := pt.Start(ctx) + assert.Nil(err, "start stream peer task") + + outputBytes, err := ioutil.ReadAll(rc) + assert.Nil(err, "load read data") + assert.Equal(testBytes, outputBytes, "output and desired output must match") +} diff --git a/client/daemon/peer/peertask_stream_callback.go b/client/daemon/peer/peertask_stream_callback.go index f96762c7b52..7d2e81cc2a7 100644 --- a/client/daemon/peer/peertask_stream_callback.go +++ b/client/daemon/peer/peertask_stream_callback.go @@ -52,6 +52,7 @@ func (p *streamPeerTaskCallback) Init(pt Task) error { }, ContentLength: pt.GetContentLength(), TotalPieces: pt.GetTotalPieces(), + PieceMd5Sign: pt.GetPieceMd5Sign(), }) if err != nil { pt.Log().Errorf("register task to storage manager failed: %s", err) @@ -69,6 +70,7 @@ func (p *streamPeerTaskCallback) Update(pt Task) error { }, ContentLength: pt.GetContentLength(), TotalPieces: pt.GetTotalPieces(), + PieceMd5Sign: pt.GetPieceMd5Sign(), }) if err != nil { pt.Log().Errorf("update task to storage manager failed: %s", err) @@ -148,3 +150,20 @@ func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) er } return nil } + +func (p *streamPeerTaskCallback) ValidateDigest(pt Task) error { + if !p.ptm.calculateDigest { + return nil + } + err := p.ptm.storageManager.ValidateDigest( + &storage.PeerTaskMetaData{ + PeerID: pt.GetPeerID(), + TaskID: pt.GetTaskID(), + }) + if err != nil { + pt.Log().Errorf("%s", err) + } else { + pt.Log().Debugf("validated digest") + } + return err +} diff --git a/client/daemon/peer/peertask_stream_test.go b/client/daemon/peer/peertask_stream_test.go index ad673d34efa..1aa554516de 100644 --- a/client/daemon/peer/peertask_stream_test.go +++ b/client/daemon/peer/peertask_stream_test.go @@ -53,8 +53,8 @@ func TestStreamPeerTask_BackSource_WithContentLength(t *testing.T) { mockContentLength = len(testBytes) //mockPieceCount = int(math.Ceil(float64(mockContentLength) / float64(pieceSize))) - peerID = "peer-0" - taskID = "task-0" + peerID = "peer-back-source-out-content-length" + taskID = "task-back-source-out-content-length" url = "http://localhost/test/data" ) @@ -159,8 +159,8 @@ func TestStreamPeerTask_BackSource_WithoutContentLength(t *testing.T) { mockContentLength = len(testBytes) //mockPieceCount = int(math.Ceil(float64(mockContentLength) / float64(pieceSize))) - peerID = "peer-0" - taskID = "task-0" + peerID = "peer-back-source-without-content-length" + taskID = "task-back-source-without-content-length" url = "http://localhost/test/data" ) diff --git a/client/daemon/peer/piece_downloader.go b/client/daemon/peer/piece_downloader.go index 00ec2c64c7d..50d8aabc82c 100644 --- a/client/daemon/peer/piece_downloader.go +++ b/client/daemon/peer/piece_downloader.go @@ -38,6 +38,7 @@ type DownloadPieceRequest struct { DstAddr string CalcDigest bool piece *base.PieceInfo + log *logger.SugaredLoggerOnWith } type PieceDownloader interface { @@ -107,7 +108,8 @@ func (p *pieceDownloader) DownloadPiece(ctx context.Context, d *DownloadPieceReq r := resp.Body.(io.Reader) c := resp.Body.(io.Closer) if d.CalcDigest { - r = digestutils.NewDigestReader(io.LimitReader(resp.Body, int64(d.piece.RangeSize)), d.piece.PieceMd5) + d.log.Debugf("calculate digest for piece %d, md5: %s", d.piece.PieceNum, d.piece.PieceMd5) + r = digestutils.NewDigestReader(d.log, io.LimitReader(resp.Body, int64(d.piece.RangeSize)), d.piece.PieceMd5) } return r, c, nil } diff --git a/client/daemon/peer/piece_downloader_test.go b/client/daemon/peer/piece_downloader_test.go index 2942ef95113..b12fb452176 100644 --- a/client/daemon/peer/piece_downloader_test.go +++ b/client/daemon/peer/piece_downloader_test.go @@ -35,6 +35,7 @@ import ( "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/daemon/test" "d7y.io/dragonfly/v2/client/daemon/upload" + logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc/base" _ "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server" ) @@ -134,6 +135,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) { PieceOffset: tt.rangeStart, PieceStyle: base.PieceStyle_PLAIN, }, + log: logger.With("test", "test"), }) assert.Nil(err, "downloaded piece should success") diff --git a/client/daemon/peer/piece_manager.go b/client/daemon/peer/piece_manager.go index 233cf81b95c..c177ed308ec 100644 --- a/client/daemon/peer/piece_manager.go +++ b/client/daemon/peer/piece_manager.go @@ -96,7 +96,7 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, pt Task, request *Dow if success { pm.pushSuccessResult(pt, request.DstPid, request.piece, start, end) } else { - pm.pushFailResult(pt, request.DstPid, request.piece, start, end, err) + pm.pushFailResult(pt, request.DstPid, request.piece, start, end, err, false) } rspan.End() }() @@ -183,7 +183,7 @@ func (pm *pieceManager) pushSuccessResult(peerTask Task, dstPid string, piece *b } } -func (pm *pieceManager) pushFailResult(peerTask Task, dstPid string, piece *base.PieceInfo, start int64, end int64, err error) { +func (pm *pieceManager) pushFailResult(peerTask Task, dstPid string, piece *base.PieceInfo, start int64, end int64, err error, notRetry bool) { err = peerTask.ReportPieceResult( &pieceTaskResult{ piece: piece, @@ -199,7 +199,8 @@ func (pm *pieceManager) pushFailResult(peerTask Task, dstPid string, piece *base HostLoad: nil, FinishedCount: 0, // update by peer task }, - err: err, + err: err, + notRetry: notRetry, }) if err != nil { peerTask.Log().Errorf("report piece task error: %v", err) @@ -245,7 +246,7 @@ func (pm *pieceManager) processPieceFromSource(pt Task, PieceMd5: "", PieceOffset: pieceOffset, PieceStyle: 0, - }, start, end, err) + }, start, end, err, true) } }() @@ -256,7 +257,8 @@ func (pm *pieceManager) processPieceFromSource(pt Task, } } if pm.calculateDigest { - reader = digestutils.NewDigestReader(reader) + pt.Log().Debugf("calculate digest") + reader = digestutils.NewDigestReader(pt.Log(), reader) } var n int64 n, err = pm.storageManager.WritePiece( @@ -338,7 +340,7 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *sc // calc total md5 if pm.calculateDigest && request.UrlMeta.Digest != "" { - reader = digestutils.NewDigestReader(body, request.UrlMeta.Digest) + reader = digestutils.NewDigestReader(pt.Log(), body, request.UrlMeta.Digest) } // 2. save to storage @@ -364,14 +366,14 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *sc PeerID: pt.GetPeerID(), TaskID: pt.GetTaskID(), }, - ContentLength: contentLength, + ContentLength: contentLength, + GenPieceDigest: true, }) pt.SetTotalPieces(pieceNum + 1) return pt.SetContentLength(contentLength) } } //unreachable code - //return nil } maxPieceNum := int32(math.Ceil(float64(contentLength) / float64(pieceSize))) @@ -386,7 +388,7 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *sc log.Debugf("download piece %d", pieceNum) n, er := pm.processPieceFromSource(pt, reader, contentLength, pieceNum, offset, size) if er != nil { - log.Errorf("download piece %d error: %s", pieceNum, err) + log.Errorf("download piece %d error: %s", pieceNum, er) return er } if n != int64(size) { @@ -396,6 +398,16 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *sc } pt.SetTotalPieces(maxPieceNum) pt.SetContentLength(contentLength) + pm.storageManager.UpdateTask(ctx, + &storage.UpdateTaskRequest{ + PeerTaskMetaData: storage.PeerTaskMetaData{ + PeerID: pt.GetPeerID(), + TaskID: pt.GetTaskID(), + }, + ContentLength: contentLength, + TotalPieces: maxPieceNum, + GenPieceDigest: true, + }) log.Infof("download from source ok") return nil } diff --git a/client/daemon/peer/piece_manager_test.go b/client/daemon/peer/piece_manager_test.go index fa261d8feb3..03a45e04d6a 100644 --- a/client/daemon/peer/piece_manager_test.go +++ b/client/daemon/peer/piece_manager_test.go @@ -59,10 +59,11 @@ func TestPieceManager_DownloadSource(t *testing.T) { ) pieceDownloadTimeout := 30 * time.Second + tempDir, _ := ioutil.TempDir("", "d7y-piece-manager-test-*") storageManager, _ := storage.NewStorageManager( config.SimpleLocalTaskStoreStrategy, &config.StorageOption{ - DataPath: test.DataDir, + DataPath: tempDir, TaskExpireTime: clientutil.Duration{ Duration: -1 * time.Second, }, diff --git a/client/daemon/storage/local_storage.go b/client/daemon/storage/local_storage.go index acd85c491a9..973c9d6133e 100644 --- a/client/daemon/storage/local_storage.go +++ b/client/daemon/storage/local_storage.go @@ -50,6 +50,9 @@ type localTaskStore struct { lastAccess atomic.Int64 reclaimMarked atomic.Bool gcCallback func(CommonTaskRequest) + + // when digest not match, invalid will be set + invalid atomic.Bool } var _ TaskStorageDriver = (*localTaskStore)(nil) @@ -67,6 +70,14 @@ func (t *localTaskStore) WritePiece(ctx context.Context, req *WritePieceRequest) t.RLock() if piece, ok := t.Pieces[req.Num]; ok { t.RUnlock() + // discard data for back source + n, err := io.Copy(ioutil.Discard, io.LimitReader(req.Reader, req.Range.Length)) + if err != nil && err != io.EOF { + return n, err + } + if n != piece.Range.Length { + return n, ErrShortRead + } return piece.Range.Length, nil } t.RUnlock() @@ -114,10 +125,14 @@ func (t *localTaskStore) WritePiece(ctx context.Context, req *WritePieceRequest) return n, ErrShortRead } } - // when Md5 is empty, try to get md5 from reader + // when Md5 is empty, try to get md5 from reader, it's useful for back source if req.PieceMetaData.Md5 == "" { + t.Warnf("piece md5 not found in metadata, read from reader") if get, ok := req.Reader.(digestutils.DigestReader); ok { req.PieceMetaData.Md5 = get.Digest() + t.Infof("read md5 from reader, value: %s", req.PieceMetaData.Md5) + } else { + t.Warnf("reader is not a DigestReader") } } t.Debugf("wrote %d bytes to file %s, piece %d, start %d, length: %d", @@ -137,14 +152,61 @@ func (t *localTaskStore) UpdateTask(ctx context.Context, req *UpdateTaskRequest) t.Lock() defer t.Unlock() t.persistentMetadata.ContentLength = req.ContentLength - if t.TotalPieces == 0 { + if req.TotalPieces > 0 { t.TotalPieces = req.TotalPieces } + if len(t.PieceMd5Sign) == 0 { + t.PieceMd5Sign = req.PieceMd5Sign + } + if req.GenPieceDigest { + var pieceDigests []string + for i := int32(0); i < t.TotalPieces; i++ { + pieceDigests = append(pieceDigests, t.Pieces[i].Md5) + } + + digest := digestutils.Sha256(pieceDigests...) + t.PieceMd5Sign = digest + t.Infof("generated digest: %s", digest) + } return nil } +func (t *localTaskStore) ValidateDigest(*PeerTaskMetaData) error { + t.Lock() + defer t.Unlock() + if t.persistentMetadata.PieceMd5Sign == "" { + return ErrDigestNotSet + } + if t.TotalPieces <= 0 { + t.Errorf("total piece count not set when validate digest") + t.invalid.Store(true) + return ErrPieceCountNotSet + } + + var pieceDigests []string + for i := int32(0); i < t.TotalPieces; i++ { + pieceDigests = append(pieceDigests, t.Pieces[i].Md5) + } + + digest := digestutils.Sha256(pieceDigests...) + if digest != t.PieceMd5Sign { + t.Errorf("invalid digest, desired: %s, actual: %s", t.PieceMd5Sign, digest) + return ErrInvalidDigest + } + return nil +} + +func (t *localTaskStore) IsInvalid(*PeerTaskMetaData) (bool, error) { + return t.invalid.Load(), nil +} + // ReadPiece get a LimitReadCloser from task data with seeked, caller should read bytes and close it. func (t *localTaskStore) ReadPiece(ctx context.Context, req *ReadPieceRequest) (io.Reader, io.Closer, error) { + if t.invalid.Load() { + t.Errorf("invalid digest, refuse to get pieces") + return nil, nil, ErrInvalidDigest + } + t.touch() file, err := os.Open(t.DataFilePath) if err != nil { @@ -175,6 +237,11 @@ func (t *localTaskStore) ReadPiece(ctx context.Context, req *ReadPieceRequest) ( } func (t *localTaskStore) ReadAllPieces(ctx context.Context, req *PeerTaskMetaData) (io.ReadCloser, error) { + if t.invalid.Load() { + t.Errorf("invalid digest, refuse to read all pieces") + return nil, ErrInvalidDigest + } + t.touch() file, err := os.Open(t.DataFilePath) if err != nil { @@ -194,7 +261,9 @@ func (t *localTaskStore) Store(ctx context.Context, req *StoreRequest) error { t.Done = true t.touch() if req.TotalPieces > 0 { + t.Lock() t.TotalPieces = req.TotalPieces + t.Unlock() } if !req.StoreOnly { err := t.saveMetadata() @@ -246,12 +315,17 @@ func (t *localTaskStore) Store(ctx context.Context, req *StoreRequest) error { } func (t *localTaskStore) GetPieces(ctx context.Context, req *base.PieceTaskRequest) (*base.PiecePacket, error) { + if t.invalid.Load() { + t.Errorf("invalid digest, refuse to get pieces") + return nil, ErrInvalidDigest + } + var pieces []*base.PieceInfo t.RLock() defer t.RUnlock() t.touch() if t.TotalPieces > 0 && req.StartNum >= t.TotalPieces { - logger.Errorf("invalid start num: %d", req.StartNum) + t.Errorf("invalid start num: %d", req.StartNum) return nil, dferrors.ErrInvalidArgument } for i := int32(0); i < req.Limit; i++ { @@ -292,88 +366,87 @@ func (t *localTaskStore) MarkReclaim() { TaskID: t.TaskID, }) t.reclaimMarked.Store(true) - logger.Infof("task %s/%s will be reclaimed, marked", t.TaskID, t.PeerID) + t.Infof("task %s/%s will be reclaimed, marked", t.TaskID, t.PeerID) } func (t *localTaskStore) Reclaim() error { - log := logger.With("gc", t.StoreStrategy, "task", t.TaskID) - log.Infof("start gc task data") - err := t.reclaimData(log) + t.Infof("start gc task data") + err := t.reclaimData() if err != nil && !os.IsNotExist(err) { return err } // close and remove metadata - err = t.reclaimMeta(log) + err = t.reclaimMeta() if err != nil && !os.IsNotExist(err) { return err } // remove task work metaDir if err = os.Remove(t.dataDir); err != nil && !os.IsNotExist(err) { - log.Warnf("remove task data directory %q error: %s", t.dataDir, err) + t.Warnf("remove task data directory %q error: %s", t.dataDir, err) return err } - log.Infof("purged task work directory: %s", t.dataDir) + t.Infof("purged task work directory: %s", t.dataDir) taskDir := path.Dir(t.dataDir) if dirs, err := ioutil.ReadDir(taskDir); err != nil { - log.Warnf("stat task directory %q error: %s", taskDir, err) + t.Warnf("stat task directory %q error: %s", taskDir, err) } else { if len(dirs) == 0 { if err := os.Remove(taskDir); err != nil { - log.Warnf("remove unused task directory %q error: %s", taskDir, err) + t.Warnf("remove unused task directory %q error: %s", taskDir, err) } } else { - log.Warnf("task directory %q is not empty", taskDir) + t.Warnf("task directory %q is not empty", taskDir) } } return nil } -func (t *localTaskStore) reclaimData(sLogger *logger.SugaredLoggerOnWith) error { +func (t *localTaskStore) reclaimData() error { // remove data data := path.Join(t.dataDir, taskData) stat, err := os.Lstat(data) if err != nil { - sLogger.Errorf("stat task data %q error: %s", data, err) + t.Errorf("stat task data %q error: %s", data, err) return err } - // remove sym link cache file + // remove symbol link cache file if stat.Mode()&os.ModeSymlink == os.ModeSymlink { dest, err0 := os.Readlink(data) if err0 == nil { if err = os.Remove(dest); err != nil && !os.IsNotExist(err) { - sLogger.Warnf("remove symlink target file %s error: %s", dest, err) + t.Warnf("remove symlink target file %s error: %s", dest, err) } else { - sLogger.Infof("remove data file %s", dest) + t.Infof("remove data file %s", dest) } } } else { // remove cache file if err = os.Remove(t.DataFilePath); err != nil && !os.IsNotExist(err) { - sLogger.Errorf("remove data file %s error: %s", data, err) + t.Errorf("remove data file %s error: %s", data, err) return err } } if err = os.Remove(data); err != nil && !os.IsNotExist(err) { - sLogger.Errorf("remove data file %s error: %s", data, err) + t.Errorf("remove data file %s error: %s", data, err) return err } - sLogger.Infof("purged task data: %s", data) + t.Infof("purged task data: %s", data) return nil } -func (t *localTaskStore) reclaimMeta(sLogger *logger.SugaredLoggerOnWith) error { +func (t *localTaskStore) reclaimMeta() error { if err := t.metadataFile.Close(); err != nil { - sLogger.Warnf("close task meta data %q error: %s", t.metadataFilePath, err) + t.Warnf("close task meta data %q error: %s", t.metadataFilePath, err) return err } - sLogger.Infof("start gc task metadata") + t.Infof("start gc task metadata") if err := os.Remove(t.metadataFilePath); err != nil && !os.IsNotExist(err) { - sLogger.Warnf("remove task meta data %q error: %s", t.metadataFilePath, err) + t.Warnf("remove task meta data %q error: %s", t.metadataFilePath, err) return err } - sLogger.Infof("purged task mata data: %s", t.metadataFilePath) + t.Infof("purged task mata data: %s", t.metadataFilePath) return nil } @@ -390,7 +463,7 @@ func (t *localTaskStore) saveMetadata() error { } _, err = t.metadataFile.Write(data) if err != nil { - logger.Errorf("save metadata error: %s", err) + t.Errorf("save metadata error: %s", err) } return err } diff --git a/client/daemon/storage/metadata.go b/client/daemon/storage/metadata.go index b584b2fc52f..9e594c2d4ae 100644 --- a/client/daemon/storage/metadata.go +++ b/client/daemon/storage/metadata.go @@ -59,6 +59,7 @@ type RegisterTaskRequest struct { CommonTaskRequest ContentLength int64 TotalPieces int32 + PieceMd5Sign string } type WritePieceRequest struct { @@ -84,6 +85,9 @@ type UpdateTaskRequest struct { PeerTaskMetaData ContentLength int64 TotalPieces int32 + PieceMd5Sign string + // GenPieceDigest is used when back source + GenPieceDigest bool } type ReusePeerTask = UpdateTaskRequest diff --git a/client/daemon/storage/storage_manager.go b/client/daemon/storage/storage_manager.go index 60fe194e0a1..28515ece1cf 100644 --- a/client/daemon/storage/storage_manager.go +++ b/client/daemon/storage/storage_manager.go @@ -60,6 +60,10 @@ type TaskStorageDriver interface { // Store stores task data to the target path Store(ctx context.Context, req *StoreRequest) error + + ValidateDigest(req *PeerTaskMetaData) error + + IsInvalid(req *PeerTaskMetaData) (bool, error) } // Reclaimer stands storage reclaimer @@ -87,8 +91,11 @@ type Manager interface { } var ( - ErrTaskNotFound = errors.New("task not found") - ErrPieceNotFound = errors.New("piece not found") + ErrTaskNotFound = errors.New("task not found") + ErrPieceNotFound = errors.New("piece not found") + ErrPieceCountNotSet = errors.New("total piece count not set") + ErrDigestNotSet = errors.New("piece digest not set") + ErrInvalidDigest = errors.New("invalid digest") ) const ( @@ -296,6 +303,7 @@ func (s *storageManager) CreateTask(req RegisterTaskRequest) error { TaskMeta: map[string]string{}, ContentLength: req.ContentLength, TotalPieces: req.TotalPieces, + PieceMd5Sign: req.PieceMd5Sign, PeerID: req.PeerID, Pieces: map[int32]PieceMetaData{}, }, @@ -389,6 +397,9 @@ func (s *storageManager) FindCompletedTask(taskID string) *ReusePeerTask { return nil } for _, t := range ts { + if t.invalid.Load() { + continue + } // touch it before marking reclaim t.touch() // already marked, skip @@ -430,6 +441,30 @@ func (s *storageManager) cleanIndex(taskID, peerID string) { s.indexTask2PeerTask[taskID] = remain } +func (s *storageManager) ValidateDigest(req *PeerTaskMetaData) error { + t, ok := s.LoadTask( + PeerTaskMetaData{ + TaskID: req.TaskID, + PeerID: req.PeerID, + }) + if !ok { + return ErrTaskNotFound + } + return t.(TaskStorageDriver).ValidateDigest(req) +} + +func (s *storageManager) IsInvalid(req *PeerTaskMetaData) (bool, error) { + t, ok := s.LoadTask( + PeerTaskMetaData{ + TaskID: req.TaskID, + PeerID: req.PeerID, + }) + if !ok { + return false, ErrTaskNotFound + } + return t.(TaskStorageDriver).IsInvalid(req) +} + func (s *storageManager) ReloadPersistentTask(gcCallback GCCallback) error { dirs, err := ioutil.ReadDir(s.storeOption.DataPath) if os.IsNotExist(err) { diff --git a/client/daemon/test/mock/storage/manager.go b/client/daemon/test/mock/storage/manager.go index 68f26060cbe..7c03c8c494b 100644 --- a/client/daemon/test/mock/storage/manager.go +++ b/client/daemon/test/mock/storage/manager.go @@ -53,6 +53,21 @@ func (mr *MockTaskStorageDriverMockRecorder) GetPieces(ctx, req interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPieces", reflect.TypeOf((*MockTaskStorageDriver)(nil).GetPieces), ctx, req) } +// IsInvalid mocks base method. +func (m *MockTaskStorageDriver) IsInvalid(req *storage.PeerTaskMetaData) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsInvalid", req) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IsInvalid indicates an expected call of IsInvalid. +func (mr *MockTaskStorageDriverMockRecorder) IsInvalid(req interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsInvalid", reflect.TypeOf((*MockTaskStorageDriver)(nil).IsInvalid), req) +} + // ReadAllPieces mocks base method. func (m *MockTaskStorageDriver) ReadAllPieces(ctx context.Context, req *storage.PeerTaskMetaData) (io.ReadCloser, error) { m.ctrl.T.Helper() @@ -112,6 +127,20 @@ func (mr *MockTaskStorageDriverMockRecorder) UpdateTask(ctx, req interface{}) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTask", reflect.TypeOf((*MockTaskStorageDriver)(nil).UpdateTask), ctx, req) } +// ValidateDigest mocks base method. +func (m *MockTaskStorageDriver) ValidateDigest(req *storage.PeerTaskMetaData) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ValidateDigest", req) + ret0, _ := ret[0].(error) + return ret0 +} + +// ValidateDigest indicates an expected call of ValidateDigest. +func (mr *MockTaskStorageDriverMockRecorder) ValidateDigest(req interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ValidateDigest", reflect.TypeOf((*MockTaskStorageDriver)(nil).ValidateDigest), req) +} + // WritePiece mocks base method. func (m *MockTaskStorageDriver) WritePiece(ctx context.Context, req *storage.WritePieceRequest) (int64, error) { m.ctrl.T.Helper() @@ -268,6 +297,21 @@ func (mr *MockManagerMockRecorder) GetPieces(ctx, req interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPieces", reflect.TypeOf((*MockManager)(nil).GetPieces), ctx, req) } +// IsInvalid mocks base method. +func (m *MockManager) IsInvalid(req *storage.PeerTaskMetaData) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsInvalid", req) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IsInvalid indicates an expected call of IsInvalid. +func (mr *MockManagerMockRecorder) IsInvalid(req interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsInvalid", reflect.TypeOf((*MockManager)(nil).IsInvalid), req) +} + // Keep mocks base method. func (m *MockManager) Keep() { m.ctrl.T.Helper() @@ -353,6 +397,20 @@ func (mr *MockManagerMockRecorder) UpdateTask(ctx, req interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTask", reflect.TypeOf((*MockManager)(nil).UpdateTask), ctx, req) } +// ValidateDigest mocks base method. +func (m *MockManager) ValidateDigest(req *storage.PeerTaskMetaData) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ValidateDigest", req) + ret0, _ := ret[0].(error) + return ret0 +} + +// ValidateDigest indicates an expected call of ValidateDigest. +func (mr *MockManagerMockRecorder) ValidateDigest(req interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ValidateDigest", reflect.TypeOf((*MockManager)(nil).ValidateDigest), req) +} + // WritePiece mocks base method. func (m *MockManager) WritePiece(ctx context.Context, req *storage.WritePieceRequest) (int64, error) { m.ctrl.T.Helper() diff --git a/pkg/source/hdfsprotocol/hdfs_source_client_test.go b/pkg/source/hdfsprotocol/hdfs_source_client_test.go index 4cc1caadfc9..bb5db877fb6 100644 --- a/pkg/source/hdfsprotocol/hdfs_source_client_test.go +++ b/pkg/source/hdfsprotocol/hdfs_source_client_test.go @@ -1,3 +1,5 @@ +//+build linux + /* * Copyright 2020 The Dragonfly Authors * diff --git a/pkg/util/digestutils/digest_reader.go b/pkg/util/digestutils/digest_reader.go index 35a7f8843a3..00645e90f34 100644 --- a/pkg/util/digestutils/digest_reader.go +++ b/pkg/util/digestutils/digest_reader.go @@ -36,6 +36,7 @@ type digestReader struct { r io.Reader hash hash.Hash digest string + *logger.SugaredLoggerOnWith } type DigestReader interface { @@ -45,13 +46,14 @@ type DigestReader interface { // TODO add AF_ALG digest https://github.com/golang/sys/commit/e24f485414aeafb646f6fca458b0bf869c0880a1 -func NewDigestReader(reader io.Reader, digest ...string) io.Reader { +func NewDigestReader(log *logger.SugaredLoggerOnWith, reader io.Reader, digest ...string) io.Reader { var d string if len(digest) > 0 { d = digest[0] } return &digestReader{ - digest: d, + SugaredLoggerOnWith: log, + digest: d, // TODO support more digest method like sha1, sha256 hash: md5.New(), r: reader, @@ -69,15 +71,15 @@ func (dr *digestReader) Read(p []byte) (int, error) { if err == io.EOF && dr.digest != "" { digest := dr.Digest() if digest != dr.digest { - logger.Warnf("digest not match, desired: %s, actual: %s", dr.digest, digest) + dr.Warnf("digest not match, desired: %s, actual: %s", dr.digest, digest) return n, ErrDigestNotMatch } - logger.Debugf("digests match: %s", digest) + dr.Debugf("digest match: %s", digest) } return n, err } -// GetDigest returns the digest of contents read. +// Digest returns the digest of contents. func (dr *digestReader) Digest() string { return hex.EncodeToString(dr.hash.Sum(nil)[:16]) } diff --git a/pkg/util/digestutils/digest_reader_test.go b/pkg/util/digestutils/digest_reader_test.go index 5c1473637b3..ddc6bdee317 100644 --- a/pkg/util/digestutils/digest_reader_test.go +++ b/pkg/util/digestutils/digest_reader_test.go @@ -25,6 +25,8 @@ import ( "testing" testifyassert "github.com/stretchr/testify/assert" + + logger "d7y.io/dragonfly/v2/internal/dflog" ) func TestMain(m *testing.M) { @@ -40,7 +42,7 @@ func TestNewDigestReader(t *testing.T) { digest := hex.EncodeToString(hash.Sum(nil)[:16]) buf := bytes.NewBuffer(testBytes) - reader := NewDigestReader(buf, digest) + reader := NewDigestReader(logger.With("test", "test"), buf, digest) data, err := ioutil.ReadAll(reader) assert.Nil(err)