Skip to content

Commit

Permalink
feat: calculate piece metadata digest (#787)
Browse files Browse the repository at this point in the history
* feat: calculate piece metadata digest
* fix: back source not work in midway
* chore: add partial back source test
* chore: optimize digest log

Signed-off-by: Jim Ma <[email protected]>
  • Loading branch information
jim3ma authored Nov 10, 2021
1 parent 1d452b7 commit cb4be14
Show file tree
Hide file tree
Showing 22 changed files with 646 additions and 64 deletions.
2 changes: 1 addition & 1 deletion client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func New(opt *config.DaemonOption) (Daemon, error) {
return nil, err
}
peerTaskManager, err := peer.NewPeerTaskManager(host, pieceManager, storageManager, sched, opt.Scheduler,
opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex)
opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.CalculateDigest)
if err != nil {
return nil, err
}
Expand Down
37 changes: 30 additions & 7 deletions client/daemon/peer/peertask_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type peerTask struct {
peerID string
taskID string
totalPiece int32
md5 string
contentLength *atomic.Int64
completedLength *atomic.Int64
usedTraffic *atomic.Int64
Expand Down Expand Up @@ -134,6 +135,7 @@ type peerTask struct {
type pieceTaskResult struct {
piece *base.PieceInfo
pieceResult *scheduler.PieceResult
notRetry bool
err error
}

Expand Down Expand Up @@ -177,6 +179,14 @@ func (pt *peerTask) SetTotalPieces(i int32) {
pt.setTotalPiecesFunc(i)
}

func (pt *peerTask) SetPieceMd5Sign(md5 string) {
pt.md5 = md5
}

func (pt *peerTask) GetPieceMd5Sign() string {
return pt.md5
}

func (pt *peerTask) Context() context.Context {
return pt.ctx
}
Expand Down Expand Up @@ -337,6 +347,8 @@ func (pt *peerTask) pullSinglePiece(cleanUnfinishedFunc func()) {
span.SetAttributes(config.AttributePiece.Int(int(pt.singlePiece.PieceInfo.PieceNum)))

pt.contentLength.Store(int64(pt.singlePiece.PieceInfo.RangeSize))
pt.SetTotalPieces(1)
pt.SetPieceMd5Sign(pt.singlePiece.PieceInfo.PieceMd5)
if err := pt.callback.Init(pt); err != nil {
pt.failedReason = err.Error()
pt.failedCode = dfcodes.ClientError
Expand All @@ -352,6 +364,7 @@ func (pt *peerTask) pullSinglePiece(cleanUnfinishedFunc func()) {
DstPid: pt.singlePiece.DstPid,
DstAddr: pt.singlePiece.DstAddr,
piece: pt.singlePiece.PieceInfo,
log: pt.Log(),
}
if pt.pieceManager.DownloadPiece(ctx, pt, request) {
pt.Infof("single piece download success")
Expand Down Expand Up @@ -448,6 +461,13 @@ loop:
pt.Debugf("update total piece count: %d", pt.totalPiece)
}

// update md5 digest
if len(piecePacket.PieceMd5Sign) > 0 && len(pt.md5) == 0 {
pt.md5 = piecePacket.PieceMd5Sign
_ = pt.callback.Update(pt)
pt.Debugf("update digest: %s", pt.md5)
}

// 3. dispatch piece request to all workers
pt.dispatchPieceRequest(pieceRequestCh, piecePacket)

Expand Down Expand Up @@ -554,6 +574,7 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) {
pt.Infof("start download from source due to dfcodes.SchedNeedBackSource")
pt.span.AddEvent("back source due to scheduler says need back source ")
pt.needBackSource = true
// TODO optimize back source when already downloaded some pieces
pt.backSource()
case <-time.After(pt.schedulerOption.ScheduleTimeout.Duration):
if pt.schedulerOption.DisableAutoBackSource {
Expand All @@ -574,7 +595,8 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) {

func (pt *peerTask) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceRequest, piecePacket *base.PiecePacket) {
for _, piece := range piecePacket.PieceInfos {
pt.Infof("get piece %d from %s/%s", piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid)
pt.Infof("get piece %d from %s/%s, md5: %s, start: %d, size: %d",
piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize)
if !pt.requestedPieces.IsSet(piece.PieceNum) {
pt.requestedPieces.Set(piece.PieceNum)
}
Expand All @@ -583,6 +605,7 @@ func (pt *peerTask) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceReque
DstPid: piecePacket.DstPid,
DstAddr: piecePacket.DstAddr,
piece: piece,
log: pt.Log(),
}
select {
case pieceRequestCh <- req:
Expand Down Expand Up @@ -721,10 +744,10 @@ func (pt *peerTask) preparePieceTasksByPeer(curPeerPacket *scheduler.PeerPacket,

// when cdn returns dfcodes.CdnTaskNotFound, report it to scheduler and wait cdn download it.
retry:
pt.Debugf("get piece task from peer %s, piece num: %d, limit: %d\"", peer.PeerId, request.StartNum, request.Limit)
pt.Debugf("try get piece task from peer %s, piece num: %d, limit: %d\"", peer.PeerId, request.StartNum, request.Limit)
p, err := pt.getPieceTasks(span, curPeerPacket, peer, request)
if err == nil {
pt.Infof("get piece task from peer %s ok, pieces length: %d", peer.PeerId, len(p.PieceInfos))
pt.Infof("got piece task from peer %s ok, pieces length: %d", peer.PeerId, len(p.PieceInfos))
span.SetAttributes(config.AttributeGetPieceCount.Int(len(p.PieceInfos)))
return p, nil
}
Expand All @@ -740,7 +763,7 @@ retry:
// context canceled, just exit
if se.GRPCStatus().Code() == codes.Canceled {
span.AddEvent("context canceled")
pt.Warnf("get piece task from peer(%s) canceled: %s", peer.PeerId, err)
pt.Warnf("get piece task from peer %s canceled: %s", peer.PeerId, err)
return nil, err
}
}
Expand All @@ -750,7 +773,7 @@ retry:
pt.Debugf("get piece task from peer %s with df error, code: %d", peer.PeerId, de.Code)
code = de.Code
}
pt.Errorf("get piece task from peer(%s) error: %s, code: %d", peer.PeerId, err, code)
pt.Errorf("get piece task from peer %s error: %s, code: %d", peer.PeerId, err, code)
perr := pt.peerPacketStream.Send(&scheduler.PieceResult{
TaskId: pt.taskID,
SrcPid: pt.peerID,
Expand All @@ -763,7 +786,7 @@ retry:
})
if perr != nil {
span.RecordError(perr)
pt.Errorf("send piece result error: %s, code: %d", err, code)
pt.Errorf("send piece result error: %s, code to send: %d", err, code)
}

if code == dfcodes.CdnTaskNotFound && curPeerPacket == pt.peerPacket.Load().(*scheduler.PeerPacket) {
Expand Down Expand Up @@ -808,7 +831,7 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer
})
if er != nil {
span.RecordError(er)
pt.Errorf("send piece result error: %s, code: %d", peer.PeerId, er)
pt.Errorf("send piece result with dfcodes.ClientWaitPieceReady error: %s", er)
}
// fast way to exit retry
lastPeerPacket := pt.peerPacket.Load().(*scheduler.PeerPacket)
Expand Down
12 changes: 11 additions & 1 deletion client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,17 @@ func (pt *filePeerTask) ReportPieceResult(result *pieceTaskResult) error {
if !result.pieceResult.Success {
result.pieceResult.FinishedCount = pt.readyPieces.Settled()
_ = pt.peerPacketStream.Send(result.pieceResult)
if result.notRetry {
pt.Warnf("piece %d download failed, no retry", result.piece.PieceNum)
return nil
}
select {
case <-pt.done:
pt.Infof("peer task done, stop to send failed piece")
case <-pt.ctx.Done():
pt.Debugf("context done due to %s, stop to send failed piece", pt.ctx.Err())
case pt.failedPieceCh <- result.pieceResult.PieceInfo.PieceNum:
pt.Warnf("%d download failed, retry later", result.piece.PieceNum)
pt.Warnf("piece %d download failed, retry later", result.piece.PieceNum)
}

return nil
Expand Down Expand Up @@ -295,6 +299,12 @@ func (pt *filePeerTask) ReportPieceResult(result *pieceTaskResult) error {

func (pt *filePeerTask) finish() error {
var err error
if err = pt.callback.ValidateDigest(pt); err != nil {
pt.Errorf("validate digest error: %s", err)
pt.span.RecordError(err)
pt.cleanUnfinished()
return err
}
// send last progress
pt.once.Do(func() {
defer pt.recoverFromPanic()
Expand Down
19 changes: 19 additions & 0 deletions client/daemon/peer/peertask_file_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (p *filePeerTaskCallback) Init(pt Task) error {
},
ContentLength: pt.GetContentLength(),
TotalPieces: pt.GetTotalPieces(),
PieceMd5Sign: pt.GetPieceMd5Sign(),
})
if err != nil {
pt.Log().Errorf("register task to storage manager failed: %s", err)
Expand All @@ -70,6 +71,7 @@ func (p *filePeerTaskCallback) Update(pt Task) error {
},
ContentLength: pt.GetContentLength(),
TotalPieces: pt.GetTotalPieces(),
PieceMd5Sign: pt.GetPieceMd5Sign(),
})
if err != nil {
pt.Log().Errorf("update task to storage manager failed: %s", err)
Expand Down Expand Up @@ -150,3 +152,20 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro
}
return nil
}

func (p *filePeerTaskCallback) ValidateDigest(pt Task) error {
if !p.ptm.calculateDigest {
return nil
}
err := p.ptm.storageManager.ValidateDigest(
&storage.PeerTaskMetaData{
PeerID: pt.GetPeerID(),
TaskID: pt.GetTaskID(),
})
if err != nil {
pt.Log().Errorf("%s", err)
} else {
pt.Log().Debugf("validated digest")
}
return err
}
10 changes: 9 additions & 1 deletion client/daemon/peer/peertask_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ type Task interface {
SetCallback(TaskCallback)
AddTraffic(int64)
GetTraffic() int64
SetPieceMd5Sign(string)
GetPieceMd5Sign() string
}

// TaskCallback inserts some operations for peer task download lifecycle
Expand All @@ -82,6 +84,7 @@ type TaskCallback interface {
Update(pt Task) error
Fail(pt Task, code base.Code, reason string) error
GetStartTime() time.Time
ValidateDigest(pt Task) error
}

type TinyData struct {
Expand Down Expand Up @@ -113,6 +116,8 @@ type peerTaskManager struct {
// currently, only check completed peer task after register to scheduler
// TODO multiplex the running peer task
enableMultiplex bool

calculateDigest bool
}

func NewPeerTaskManager(
Expand All @@ -122,7 +127,8 @@ func NewPeerTaskManager(
schedulerClient schedulerclient.SchedulerClient,
schedulerOption config.SchedulerOption,
perPeerRateLimit rate.Limit,
multiplex bool) (TaskManager, error) {
multiplex bool,
calculateDigest bool) (TaskManager, error) {

ptm := &peerTaskManager{
host: host,
Expand All @@ -133,6 +139,7 @@ func NewPeerTaskManager(
schedulerOption: schedulerOption,
perPeerRateLimit: perPeerRateLimit,
enableMultiplex: multiplex,
calculateDigest: calculateDigest,
}
return ptm, nil
}
Expand Down Expand Up @@ -266,6 +273,7 @@ func (ptm *peerTaskManager) storeTinyPeerTask(ctx context.Context, tiny *TinyDat
},
ContentLength: l,
TotalPieces: 1,
// TODO check md5 digest
})
if err != nil {
logger.Errorf("register tiny data storage failed: %s", err)
Expand Down
28 changes: 27 additions & 1 deletion client/daemon/peer/peertask_manager_mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions client/daemon/peer/peertask_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type componentsOption struct {
func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOption) (
schedulerclient.SchedulerClient, storage.Manager) {
port := int32(freeport.GetPort())
// 1. setup a mock daemon server for uploading pieces info
// 1. set up a mock daemon server for uploading pieces info
var daemon = mock_daemon.NewMockDaemonServer(ctrl)
daemon.EXPECT().GetPieceTasks(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func(ctx context.Context, request *base.PieceTaskRequest) (*base.PiecePacket, error) {
var tasks []*base.PieceInfo
Expand Down Expand Up @@ -147,10 +147,11 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio
func(ctx context.Context, pr *scheduler.PeerResult, opts ...grpc.CallOption) error {
return nil
})
tempDir, _ := ioutil.TempDir("", "d7y-test-*")
storageManager, _ := storage.NewStorageManager(
config.SimpleLocalTaskStoreStrategy,
&config.StorageOption{
DataPath: test.DataDir,
DataPath: tempDir,
TaskExpireTime: clientutil.Duration{
Duration: -1 * time.Second,
},
Expand Down
17 changes: 16 additions & 1 deletion client/daemon/peer/peertask_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,19 @@ func (s *streamPeerTask) ReportPieceResult(result *pieceTaskResult) error {
defer s.recoverFromPanic()
// retry failed piece
if !result.pieceResult.Success {
result.pieceResult.FinishedCount = s.readyPieces.Settled()
_ = s.peerPacketStream.Send(result.pieceResult)
if result.notRetry {
s.Warnf("piece %d download failed, no retry", result.piece.PieceNum)
return nil
}
select {
case <-s.done:
s.Infof("peer task done, stop to send failed piece")
case <-s.ctx.Done():
s.Debugf("context done due to %s, stop to send failed piece", s.ctx.Err())
case s.failedPieceCh <- result.pieceResult.PieceInfo.PieceNum:
s.Warnf("%d download failed, retry later", result.piece.PieceNum)
s.Warnf("piece %d download failed, retry later", result.piece.PieceNum)
}
return nil
}
Expand Down Expand Up @@ -297,6 +302,10 @@ func (s *streamPeerTask) finish() error {
// send last progress
s.once.Do(func() {
s.success = true
if err := s.callback.Update(s); err != nil {
s.span.RecordError(err)
s.Errorf("update callback error: %s", err)
}
// let stream return immediately
close(s.streamDone)
// send EOF piece result to scheduler
Expand Down Expand Up @@ -452,6 +461,12 @@ func (s *streamPeerTask) writeToPipe(firstPiece int32, pw *io.PipeWriter) {
for {
// all data wrote to local storage, and all data wrote to pipe write
if s.readyPieces.Settled() == desired {
if err = s.callback.ValidateDigest(s); err != nil {
s.span.RecordError(err)
s.Errorf("validate digest error: %s", err)
_ = pw.CloseWithError(err)
return
}
s.Debugf("all %d pieces wrote to pipe", desired)
pw.Close()
return
Expand Down
Loading

0 comments on commit cb4be14

Please sign in to comment.