Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/asyncwrite/asyncwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (bfw *BackgroundWriter) writeWorker() {
pool.Put(data)
if writeErr != nil {
err = writeErr
// Drain remaining buffers from channel to avoid memory leak
for remaining := range bfw.ch {
pool.Put(remaining)
}
break
}
}
Expand All @@ -53,6 +57,7 @@ func (bfw *BackgroundWriter) Write(p []byte) (n int, err error) {
case bfw.ch <- b:
return len(b), nil
case err := <-bfw.done:
pool.Put(b)
return 0, err
}
}
Expand Down
6 changes: 6 additions & 0 deletions lib/ffi/cunative/decode_sdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ func Decode(replica, key io.Reader, out io.Writer) error {
// Read replica
rn, err := io.ReadFull(replica, rbuf)
if err != nil && err != io.ErrUnexpectedEOF {
pool.Put(rbuf)
pool.Put(kbuf)
if err == io.EOF {
return
}
Expand All @@ -131,11 +133,15 @@ func Decode(replica, key io.Reader, out io.Writer) error {
// Read key
kn, err := io.ReadFull(key, kbuf[:rn])
if err != nil && err != io.ErrUnexpectedEOF {
pool.Put(rbuf)
pool.Put(kbuf)
errChan <- err
return
}

if kn != rn {
pool.Put(rbuf)
pool.Put(kbuf)
errChan <- io.ErrUnexpectedEOF
return
}
Expand Down
6 changes: 6 additions & 0 deletions lib/ffi/cunative/decode_snap.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ func DecodeSnap(spt abi.RegisteredSealProof, commD, commK cid.Cid, key, replica
// Read replica
rn, err := io.ReadFull(replica, rbuf)
if err != nil && err != io.ErrUnexpectedEOF {
pool.Put(rbuf)
pool.Put(kbuf)
if err == io.EOF {
return
}
Expand All @@ -132,11 +134,15 @@ func DecodeSnap(spt abi.RegisteredSealProof, commD, commK cid.Cid, key, replica
// Read key
kn, err := io.ReadFull(key, kbuf[:rn])
if err != nil && err != io.ErrUnexpectedEOF {
pool.Put(rbuf)
pool.Put(kbuf)
errChan <- err
return
}

if kn != rn {
pool.Put(rbuf)
pool.Put(kbuf)
errChan <- io.ErrUnexpectedEOF
return
}
Expand Down
2 changes: 2 additions & 0 deletions lib/pieceprovider/sector_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,15 @@ func (p *SectorReader) ReadPiece(ctx context.Context, sector storiface.SectorRef

upr, err := fr32.NewUnpadReaderBuf(r, pieceSize.Padded(), buf)
if err != nil {
pool.Put(buf)
r.Close() // nolint
return nil, xerrors.Errorf("creating unpadded reader: %w", err)
}

bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(startOffsetDiff); err != nil {
pool.Put(buf)
r.Close() // nolint
return nil, xerrors.Errorf("discarding bytes for startOffset: %w", err)
}
Expand Down
4 changes: 4 additions & 0 deletions tasks/pdp/task_prove.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ipfs/go-cid"
pool "github.com/libp2p/go-buffer-pool"
"github.com/minio/sha256-simd"
"github.com/oklog/ulid"
"github.com/samber/lo"
Expand Down Expand Up @@ -435,6 +436,7 @@ func (p *ProveTask) proveRoot(ctx context.Context, dataSetID int64, pieceID int6
if err != nil {
return contract.IPDPTypesProof{}, xerrors.Errorf("failed to build memtree: %w", err)
}
defer pool.Put(memTree)
log.Debugw("provePiece", "rootChallengeOffset", rootChallengeOffset, "challengedLeaf", challengedLeaf)

mProof, err := proof.MemtreeProof(memTree, challengedLeaf)
Expand Down Expand Up @@ -519,6 +521,7 @@ func (p *ProveTask) proveRoot(ctx context.Context, dataSetID int64, pieceID int6
if err != nil {
return contract.IPDPTypesProof{}, xerrors.Errorf("failed to build memtree: %w", err)
}
defer pool.Put(memtree)

// Get challenge leaf in subTree
subTreeChallenge := challengedLeaf - startLeaf
Expand Down Expand Up @@ -552,6 +555,7 @@ func (p *ProveTask) proveRoot(ctx context.Context, dataSetID int64, pieceID int6
if err != nil {
return contract.IPDPTypesProof{}, xerrors.Errorf("failed to build memtree from snapshot: %w", err)
}
defer pool.Put(mtree)

// Generate merkle proof from snapShot node to commP
proofs, err := proof.MemtreeProof(mtree, snapshotNodeIndex)
Expand Down