Skip to content

Commit

Permalink
v2: move state vector to App Parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Dec 15, 2023
1 parent 7b5577b commit f998c2b
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 44 deletions.
104 changes: 61 additions & 43 deletions ndn-svs/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ SVSyncCore::onSyncInterest(const Interest& interest)
void
SVSyncCore::onSyncInterestValidated(const Interest& interest)
{
const auto& n = interest.getName();

// Get incoming face
// Get incoming face (this is needed by NLSR)
uint64_t incomingFace = 0;
{
auto tag = interest.getTag<ndn::lp::IncomingFaceIdTag>();
Expand All @@ -129,44 +127,60 @@ SVSyncCore::onSyncInterestValidated(const Interest& interest)
}
}

// Get state vector
std::shared_ptr<VersionVector> vvOther;
try
{
ndn::Block vvBlock = n.get(-2);
// Check for invalid Interest
if (!interest.hasApplicationParameters()) {
return;
}

// Decode state parameters
ndn::Block params = interest.getApplicationParameters();
params.parse();

// Decompress if necessary
if (vvBlock.type() == tlv::StateVectorLzma) {
#ifdef NDN_SVS_COMPRESSION
boost::iostreams::filtering_istreambuf in;
in.push(boost::iostreams::lzma_decompressor());
in.push(boost::iostreams::array_source(reinterpret_cast<const char*>(vvBlock.value()), vvBlock.value_size()));
ndn::OBufferStream decompressed;
boost::iostreams::copy(in, decompressed);
auto inner = ndn::Block::fromBuffer(decompressed.buf());
if (!std::get<0>(inner)) {
NDN_THROW(ndn::tlv::Error("Failed to decode inner block"));
}
vvBlock = std::get<1>(inner);
#else
NDN_THROW(ndn::tlv::Error("SVS was compiled without compression support"));
#endif
// Decompress if necessary. The spec requires that if an LZMA block is present,
// then no other blocks are present (everything is compressed together)
if (params.find(tlv::LzmaBlock) != params.elements_end()) {
auto lzmaBlock = params.get(tlv::LzmaBlock);

boost::iostreams::filtering_istreambuf in;
in.push(boost::iostreams::lzma_decompressor());
in.push(boost::iostreams::array_source(reinterpret_cast<const char*>(lzmaBlock.value()), lzmaBlock.value_size()));
ndn::OBufferStream decompressed;
boost::iostreams::copy(in, decompressed);

auto parsed = ndn::Block::fromBuffer(decompressed.buf());
if (!std::get<0>(parsed)) {
// TODO: log error parsing inner block
return;
}

vvOther = std::make_shared<VersionVector>(vvBlock);
params = std::get<1>(parsed);
params.parse();
}
#endif

// Get state vector
std::shared_ptr<VersionVector> vvOther;
try
{
vvOther = std::make_shared<VersionVector>(params.get(tlv::StateVector));
}
catch (ndn::tlv::Error&)
{
// TODO: log error
return;
}

if (m_recvExtraBlock && interest.hasApplicationParameters())
// Read extra mapping blocks
if (m_recvExtraBlock)
{
try {
m_recvExtraBlock(interest.getApplicationParameters().blockFromValue(), *vvOther);
m_recvExtraBlock(params.get(tlv::MappingData), *vvOther);
}
catch (std::exception&)
{
// TODO: log error but continue
}
catch (std::exception&) {}
}

// Merge state vector
Expand Down Expand Up @@ -245,36 +259,40 @@ SVSyncCore::sendSyncInterest()
if (!m_initialized)
return;

Interest interest;
interest.setApplicationParameters(span<const uint8_t>{'0'});

ndn::Block vvWire;
// Build app parameters
ndn::encoding::EncodingBuffer enc;
{
std::lock_guard<std::mutex> lock(m_vvMutex);
vvWire = m_vv.encode();
size_t length = 0;

// Add parameters digest
// Add extra mapping blocks
if (m_getExtraBlock)
{
interest.setApplicationParameters(m_getExtraBlock(m_vv));
}
length += ndn::encoding::prependBlock(enc, m_getExtraBlock(m_vv));

// Add state vector
length += ndn::encoding::prependBlock(enc, m_vv.encode());

// Add length and ApplicationParameters type
enc.prependVarNumber(length);
enc.prependVarNumber(ndn::tlv::ApplicationParameters);
}

// Create sync interest name
Name syncName(m_syncPrefix);
ndn::Block wire = enc.block();
wire.encode();

#ifdef NDN_SVS_COMPRESSION
vvWire.encode();
boost::iostreams::filtering_istreambuf in;
in.push(boost::iostreams::lzma_compressor());
in.push(boost::iostreams::array_source(reinterpret_cast<const char*>(vvWire.data()), vvWire.size()));
in.push(boost::iostreams::array_source(reinterpret_cast<const char*>(wire.data()), wire.size()));
ndn::OBufferStream compressed;
boost::iostreams::copy(in, compressed);
vvWire = ndn::Block(tlv::StateVectorLzma, compressed.buf());
wire = ndn::Block(tlv::LzmaBlock, compressed.buf());
wire.encode();
#endif

syncName.append(Name::Component(vvWire));
interest.setName(syncName);
// Create Sync Interest
Interest interest(Name(m_syncPrefix).appendVersion(2));
interest.setApplicationParameters(wire);
interest.setInterestLifetime(1_ms);

switch (m_securityOptions.interestSigner->signingInfo.getSignerType())
Expand Down
2 changes: 1 addition & 1 deletion ndn-svs/tlv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ enum : uint32_t {
SeqNo = 204,
MappingData = 205,
MappingEntry = 206,
StateVectorLzma = 211,
LzmaBlock = 211,
};

} // namespace ndn::svs::tlv
Expand Down

0 comments on commit f998c2b

Please sign in to comment.