Skip to content

Commit 9c0f3e0

Browse files
authored
Merge pull request #2 from filecoin-project/feat/parallel-writer
Make commp writer compute in parallel
2 parents b88f7a9 + 5af72b5 commit 9c0f3e0

File tree

1 file changed

+55
-15
lines changed

1 file changed

+55
-15
lines changed

writer/writer.go

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package writer
33
import (
44
"bytes"
55
"math/bits"
6+
"runtime"
67

78
"github.com/ipfs/go-cid"
89
"golang.org/x/xerrors"
@@ -24,13 +25,31 @@ type DataCIDSize struct {
2425
const commPBufPad = abi.PaddedPieceSize(8 << 20)
2526
const CommPBuf = abi.UnpaddedPieceSize(commPBufPad - (commPBufPad / 128)) // can't use .Unpadded() for const
2627

28+
type ciderr struct {
29+
c cid.Cid
30+
err error
31+
}
32+
2733
type Writer struct {
2834
len int64
2935
buf [CommPBuf]byte
30-
leaves []cid.Cid
36+
leaves []chan ciderr
37+
38+
tbufs [][CommPBuf]byte
39+
throttle chan int
3140
}
3241

3342
func (w *Writer) Write(p []byte) (int, error) {
43+
if w.throttle == nil {
44+
w.throttle = make(chan int, runtime.NumCPU())
45+
for i := 0; i < cap(w.throttle); i++ {
46+
w.throttle <- i
47+
}
48+
}
49+
if w.tbufs == nil {
50+
w.tbufs = make([][CommPBuf]byte, cap(w.throttle))
51+
}
52+
3453
n := len(p)
3554
for len(p) > 0 {
3655
buffered := int(w.len % int64(len(w.buf)))
@@ -44,10 +63,22 @@ func (w *Writer) Write(p []byte) (int, error) {
4463
w.len += int64(copied)
4564

4665
if copied > 0 && w.len%int64(len(w.buf)) == 0 {
47-
leaf, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, bytes.NewReader(w.buf[:]), CommPBuf)
48-
if err != nil {
49-
return 0, err
50-
}
66+
leaf := make(chan ciderr, 1)
67+
bufIdx := <-w.throttle
68+
copy(w.tbufs[bufIdx][:], w.buf[:])
69+
70+
go func() {
71+
defer func() {
72+
w.throttle <- bufIdx
73+
}()
74+
75+
l, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, bytes.NewReader(w.tbufs[bufIdx][:]), CommPBuf)
76+
leaf <- ciderr{
77+
c: l,
78+
err: err,
79+
}
80+
}()
81+
5182
w.leaves = append(w.leaves, leaf)
5283
}
5384
}
@@ -59,9 +90,18 @@ func (w *Writer) Sum() (DataCIDSize, error) {
5990
lastLen := w.len % int64(len(w.buf))
6091
rawLen := w.len
6192

93+
leaves := make([]cid.Cid, len(w.leaves))
94+
for i, leaf := range w.leaves {
95+
r := <- leaf
96+
if r.err != nil {
97+
return DataCIDSize{}, xerrors.Errorf("processing leaf %d: %w", i, r.err)
98+
}
99+
leaves[i] = r.c
100+
}
101+
62102
// process remaining bit of data
63103
if lastLen != 0 {
64-
if len(w.leaves) != 0 {
104+
if len(leaves) != 0 {
65105
copy(w.buf[lastLen:], make([]byte, int(int64(CommPBuf)-lastLen)))
66106
lastLen = int64(CommPBuf)
67107
}
@@ -80,25 +120,25 @@ func (w *Writer) Sum() (DataCIDSize, error) {
80120
}, nil
81121
}
82122

83-
w.leaves = append(w.leaves, p)
123+
leaves = append(leaves, p)
84124
}
85125

86126
// pad with zero pieces to power-of-two size
87-
fillerLeaves := (1 << (bits.Len(uint(len(w.leaves) - 1)))) - len(w.leaves)
127+
fillerLeaves := (1 << (bits.Len(uint(len(leaves) - 1)))) - len(leaves)
88128
for i := 0; i < fillerLeaves; i++ {
89-
w.leaves = append(w.leaves, zerocomm.ZeroPieceCommitment(CommPBuf))
129+
leaves = append(leaves, zerocomm.ZeroPieceCommitment(CommPBuf))
90130
}
91131

92-
if len(w.leaves) == 1 {
132+
if len(leaves) == 1 {
93133
return DataCIDSize{
94134
PayloadSize: rawLen,
95-
PieceSize: abi.PaddedPieceSize(len(w.leaves)) * commPBufPad,
96-
PieceCID: w.leaves[0],
135+
PieceSize: abi.PaddedPieceSize(len(leaves)) * commPBufPad,
136+
PieceCID: leaves[0],
97137
}, nil
98138
}
99139

100-
pieces := make([]abi.PieceInfo, len(w.leaves))
101-
for i, leaf := range w.leaves {
140+
pieces := make([]abi.PieceInfo, len(leaves))
141+
for i, leaf := range leaves {
102142
pieces[i] = abi.PieceInfo{
103143
Size: commPBufPad,
104144
PieceCID: leaf,
@@ -112,7 +152,7 @@ func (w *Writer) Sum() (DataCIDSize, error) {
112152

113153
return DataCIDSize{
114154
PayloadSize: rawLen,
115-
PieceSize: abi.PaddedPieceSize(len(w.leaves)) * commPBufPad,
155+
PieceSize: abi.PaddedPieceSize(len(leaves)) * commPBufPad,
116156
PieceCID: p,
117157
}, nil
118158
}

0 commit comments

Comments
 (0)