Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workaround for erasure coding threading bug #881

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions codex/erasure.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,5 @@
## those terms.

import ./erasure/erasure
import ./erasure/backends/leopard

export erasure

func leoEncoderProvider*(
size, buffers, parity: int
): EncoderBackend {.raises: [Defect].} =
## create new Leo Encoder
LeoEncoderBackend.new(size, buffers, parity)

func leoDecoderProvider*(
size, buffers, parity: int
): DecoderBackend {.raises: [Defect].} =
## create new Leo Decoder
LeoDecoderBackend.new(size, buffers, parity)
177 changes: 58 additions & 119 deletions codex/erasure/asyncbackend.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import pkg/chronos
import pkg/chronos/threadsync
import pkg/questionable/results
import pkg/leopard

import ./backend
import ../errors
Expand All @@ -29,197 +30,135 @@
type
EncoderBackendPtr = ptr EncoderBackend
DecoderBackendPtr = ptr DecoderBackend
DecoderPtr = ptr LeoDecoder
EncoderPtr = ptr LeoEncoder

# Args objects are missing seq[seq[byte]] field, to avoid unnecessary data copy
EncodeTaskArgs = object
signal: ThreadSignalPtr
backend: EncoderBackendPtr
blockSize: int
ecM: int
encoder: EncoderPtr

DecodeTaskArgs = object
signal: ThreadSignalPtr
backend: DecoderBackendPtr
blockSize: int
ecK: int
decoder: DecoderPtr

SharedArrayHolder*[T] = object
data: ptr UncheckedArray[T]
size: int

EncodeTaskResult = Result[SharedArrayHolder[byte], cstring]
DecodeTaskResult = Result[SharedArrayHolder[byte], cstring]

proc encodeTask(args: EncodeTaskArgs, data: seq[seq[byte]]): EncodeTaskResult =
var
data = data.unsafeAddr
parity = newSeqWith[seq[byte]](args.ecM, newSeq[byte](args.blockSize))
TaskResult = Result[void, cstring]

proc encodeTask(args: EncodeTaskArgs): TaskResult =
try:
let res = args.backend[].encode(data[], parity)

if res.isOk:
let
resDataSize = parity.len * args.blockSize
resData = cast[ptr UncheckedArray[byte]](allocShared0(resDataSize))
arrHolder = SharedArrayHolder[byte](
data: resData,
size: resDataSize
)

for i in 0..<parity.len:
copyMem(addr resData[i * args.blockSize], addr parity[i][0], args.blockSize)

return ok(arrHolder)
else:
return err(res.error)
except CatchableError as exception:
return err(exception.msg.cstring)
return args.encoder[].encodePrepared()
finally:
if err =? args.signal.fireSync().mapFailure.errorOption():
error "Error firing signal", msg = err.msg

proc decodeTask(args: DecodeTaskArgs, data: seq[seq[byte]], parity: seq[seq[byte]]): DecodeTaskResult =
var
data = data.unsafeAddr
parity = parity.unsafeAddr
recovered = newSeqWith[seq[byte]](args.ecK, newSeq[byte](args.blockSize))

proc decodeTask(args: DecodeTaskArgs): TaskResult =
try:
let res = args.backend[].decode(data[], parity[], recovered)

if res.isOk:
let
resDataSize = recovered.len * args.blockSize
resData = cast[ptr UncheckedArray[byte]](allocShared0(resDataSize))
arrHolder = SharedArrayHolder[byte](
data: resData,
size: resDataSize
)

for i in 0..<recovered.len:
copyMem(addr resData[i * args.blockSize], addr recovered[i][0], args.blockSize)

return ok(arrHolder)
else:
return err(res.error)
except CatchableError as exception:
return err(exception.msg.cstring)
return args.decoder[].decodePrepared()
finally:
if err =? args.signal.fireSync().mapFailure.errorOption():
error "Error firing signal", msg = err.msg

proc proxySpawnEncodeTask(
tp: Taskpool,
args: EncodeTaskArgs,
data: ref seq[seq[byte]]
): Flowvar[EncodeTaskResult] =
# FIXME Uncomment the code below after addressing an issue:
# https://github.com/codex-storage/nim-codex/issues/854

# tp.spawn encodeTask(args, data[])

let fv = EncodeTaskResult.newFlowVar
fv.readyWith(encodeTask(args, data[]))
return fv
args: EncodeTaskArgs
): Flowvar[TaskResult] =
tp.spawn encodeTask(args)

proc proxySpawnDecodeTask(
tp: Taskpool,
args: DecodeTaskArgs,
data: ref seq[seq[byte]],
parity: ref seq[seq[byte]]
): Flowvar[DecodeTaskResult] =
# FIXME Uncomment the code below after addressing an issue:
# https://github.com/codex-storage/nim-codex/issues/854

# tp.spawn decodeTask(args, data[], parity[])

let fv = DecodeTaskResult.newFlowVar
fv.readyWith(decodeTask(args, data[], parity[]))
return fv

proc awaitResult[T](signal: ThreadSignalPtr, handle: Flowvar[T]): Future[?!T] {.async.} =
args: DecodeTaskArgs
): Flowvar[TaskResult] =
tp.spawn decodeTask(args)

proc awaitTaskResult(signal: ThreadSignalPtr, handle: Flowvar[TaskResult]): Future[?!void] {.async.} =
await wait(signal)

var
res: T
res: TaskResult
awaitTotal: Duration
while awaitTotal < CompletitionTimeout:
if handle.tryComplete(res):
return success(res)
if handle.tryComplete(res):
if res.isOk:
return success()
else:
awaitTotal += CompletitionRetryDelay
await sleepAsync(CompletitionRetryDelay)
return failure($res.error)
else:
awaitTotal += CompletitionRetryDelay
await sleepAsync(CompletitionRetryDelay)

Check warning on line 91 in codex/erasure/asyncbackend.nim

View check run for this annotation

Codecov / codecov/patch

codex/erasure/asyncbackend.nim#L90-L91

Added lines #L90 - L91 were not covered by tests

return failure("Task signaled finish but didn't return any result within " & $CompletitionRetryDelay)

proc asyncEncode*(
tp: Taskpool,
backend: EncoderBackend,
encoder: sink LeoEncoder,
data: ref seq[seq[byte]],
blockSize: int,
ecM: int
): Future[?!ref seq[seq[byte]]] {.async.} =
if ecM == 0:
return success(seq[seq[byte]].new())

without signal =? ThreadSignalPtr.new().mapFailure, err:
return failure(err)

try:
if err =? encoder.prepareEncode(data[]).mapFailure.errorOption():
return failure(err)

Check warning on line 110 in codex/erasure/asyncbackend.nim

View check run for this annotation

Codecov / codecov/patch

codex/erasure/asyncbackend.nim#L110

Added line #L110 was not covered by tests

let
blockSize = data[0].len
args = EncodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecM: ecM)
handle = proxySpawnEncodeTask(tp, args, data)
args = EncodeTaskArgs(signal: signal, encoder: addr encoder)
handle = proxySpawnEncodeTask(tp, args)

without res =? await awaitResult(signal, handle), err:
if err =? (await awaitTaskResult(signal, handle)).errorOption():
return failure(err)

var parity = seq[seq[byte]].new()
parity[].setLen(ecM)

if res.isOk:
var parity = seq[seq[byte]].new()
parity[].setLen(ecM)

for i in 0..<parity[].len:
parity[i] = newSeq[byte](blockSize)
copyMem(addr parity[i][0], addr res.value.data[i * blockSize], blockSize)
for i in 0..<parity[].len:
parity[i] = newSeq[byte](blockSize)

deallocShared(res.value.data)
if err =? encoder.readParity(parity[]).mapFailure.errorOption():
return failure(err)

Check warning on line 126 in codex/erasure/asyncbackend.nim

View check run for this annotation

Codecov / codecov/patch

codex/erasure/asyncbackend.nim#L126

Added line #L126 was not covered by tests

return success(parity)
else:
return failure($res.error)
return success(parity)
finally:
if err =? signal.close().mapFailure.errorOption():
error "Error closing signal", msg = $err.msg

proc asyncDecode*(
tp: Taskpool,
backend: DecoderBackend,
decoder: sink LeoDecoder,
data, parity: ref seq[seq[byte]],
blockSize: int
): Future[?!ref seq[seq[byte]]] {.async.} =
without signal =? ThreadSignalPtr.new().mapFailure, err:
return failure(err)

try:
if err =? decoder.prepareDecode(data[], parity[]).mapFailure.errorOption():
return failure(err)

Check warning on line 144 in codex/erasure/asyncbackend.nim

View check run for this annotation

Codecov / codecov/patch

codex/erasure/asyncbackend.nim#L144

Added line #L144 was not covered by tests

let
ecK = data[].len
args = DecodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecK: ecK)
handle = proxySpawnDecodeTask(tp, args, data, parity)
args = DecodeTaskArgs(signal: signal, decoder: addr decoder)
handle = proxySpawnDecodeTask(tp, args)

without res =? await awaitResult(signal, handle), err:
if err =? (await awaitTaskResult(signal, handle)).errorOption():
return failure(err)

if res.isOk:
var recovered = seq[seq[byte]].new()
recovered[].setLen(ecK)
var recovered = seq[seq[byte]].new()
recovered[].setLen(data[].len)
for i in 0..<recovered[].len:
recovered[i] = newSeq[byte](blockSize)

for i in 0..<recovered[].len:
recovered[i] = newSeq[byte](blockSize)
copyMem(addr recovered[i][0], addr res.value.data[i * blockSize], blockSize)

deallocShared(res.value.data)
if err =? decoder.readDecoded(recovered[]).mapFailure.errorOption():
return failure(err)

Check warning on line 159 in codex/erasure/asyncbackend.nim

View check run for this annotation

Codecov / codecov/patch

codex/erasure/asyncbackend.nim#L159

Added line #L159 was not covered by tests

return success(recovered)
else:
return failure($res.error)
return success(recovered)
finally:
if err =? signal.close().mapFailure.errorOption():
error "Error closing signal", msg = $err.msg
28 changes: 9 additions & 19 deletions codex/erasure/erasure.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import pkg/chronos
import pkg/libp2p/[multicodec, cid, multihash]
import pkg/libp2p/protobuf/minprotobuf
import pkg/taskpools
import pkg/leopard

import ../logutils
import ../manifest
Expand All @@ -31,11 +32,8 @@ import ../errors

import pkg/stew/byteutils

import ./backend
import ./asyncbackend

export backend

logScope:
topics = "codex erasure"

Expand Down Expand Up @@ -63,15 +61,7 @@ type
## or any combination there of.
##

EncoderProvider* = proc(size, blocks, parity: int): EncoderBackend
{.raises: [Defect], noSideEffect.}

DecoderProvider* = proc(size, blocks, parity: int): DecoderBackend
{.raises: [Defect], noSideEffect.}

Erasure* = ref object
encoderProvider*: EncoderProvider
decoderProvider*: DecoderProvider
store*: BlockStore
taskpool: Taskpool

Expand Down Expand Up @@ -285,11 +275,13 @@ proc encodeData(

var
cids = seq[Cid].new()
encoder = self.encoderProvider(manifest.blockSize.int, params.ecK, params.ecM)
emptyBlock = newSeq[byte](manifest.blockSize.int)

cids[].setLen(params.blocksCount)

without var encoder =? LeoEncoder.init(manifest.blockSize.int, params.ecK, params.ecM).mapFailure, err:
return failure(err)

try:
for step in 0..<params.steps:
# TODO: Don't allocate a new seq every time, allocate once and zero out
Expand Down Expand Up @@ -349,7 +341,7 @@ proc encodeData(
trace "Erasure coding encoding error", exc = exc.msg
return failure(exc)
finally:
encoder.release()
encoder.free()

proc encode*(
self: Erasure,
Expand Down Expand Up @@ -390,9 +382,11 @@ proc decode*(
var
cids = seq[Cid].new()
recoveredIndices = newSeq[Natural]()
decoder = self.decoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM)
emptyBlock = newSeq[byte](encoded.blockSize.int)

without var decoder =? LeoDecoder.init(encoded.blockSize.int, encoded.ecK, encoded.ecM).mapFailure, err:
return failure(err)

cids[].setLen(encoded.blocksCount)
try:
for step in 0..<encoded.steps:
Expand Down Expand Up @@ -439,7 +433,7 @@ proc decode*(
trace "Erasure coding decoding error", exc = exc.msg
return failure(exc)
finally:
decoder.release()
decoder.free()

without tree =? CodexTree.init(cids[0..<encoded.originalBlocksCount]), err:
return failure(err)
Expand Down Expand Up @@ -469,14 +463,10 @@ proc stop*(self: Erasure) {.async.} =
proc new*(
T: type Erasure,
store: BlockStore,
encoderProvider: EncoderProvider,
decoderProvider: DecoderProvider,
taskpool: Taskpool): Erasure =
## Create a new Erasure instance for encoding and decoding manifests
##

Erasure(
store: store,
encoderProvider: encoderProvider,
decoderProvider: decoderProvider,
taskpool: taskpool)
4 changes: 0 additions & 4 deletions codex/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,6 @@ proc streamEntireDataset(
let
erasure = Erasure.new(
self.networkStore,
leoEncoderProvider,
leoDecoderProvider,
self.taskpool)
without _ =? (await erasure.decode(manifest)), error:
error "Unable to erasure decode manifest", manifestCid, exc = error.msg
Expand Down Expand Up @@ -433,8 +431,6 @@ proc setupRequest(
let
erasure = Erasure.new(
self.networkStore.localStore,
leoEncoderProvider,
leoDecoderProvider,
self.taskpool)

without encoded =? (await erasure.encode(manifest, ecK, ecM)), error:
Expand Down
Loading