Skip to content

Commit

Permalink
Changed timing of messages on mongofw/wr replication.
Browse files Browse the repository at this point in the history
  • Loading branch information
riclolsen committed May 25, 2024
1 parent 6ad522e commit 4b99f45
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 72 deletions.
71 changes: 43 additions & 28 deletions src/mongofw/customized_module.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ module.exports.CustomProcessor = function (

// will send only update data from drivers
if (!change?.updateDescription?.updatedFields?.sourceDataUpdate) return
if (!change?.operationType) return

chgQueue.enqueue(change)
})
Expand All @@ -111,36 +112,44 @@ setTimeout(procQueue, 1000)
async function procQueue() {
let cntSeq = 0
while (!chgQueue.isEmpty()) {
const change = chgQueue.peek()
chgQueue.dequeue()
let fwArr = []
let strSz = 0
while (!chgQueue.isEmpty()) {
const change = chgQueue.peek()
chgQueue.dequeue()

if (
change?.updateDescription?.updatedFields?.sourceDataUpdate
?.valueBsonAtSource
)
delete change.updateDescription.updatedFields.sourceDataUpdate
.valueBsonAtSource
if (change?.updateDescription?.truncatedArrays)
delete change.updateDescription.truncatedArrays

const fwObj = {
cnt: cnt++,
tag: change?.fullDocument?.tag,
operationType: change.operationType,
documentKey: change.documentKey,
updateDescription: change.updateDescription,
if (
change?.updateDescription?.updatedFields?.sourceDataUpdate
?.valueBsonAtSource
)
delete change.updateDescription.updatedFields.sourceDataUpdate
.valueBsonAtSource
if (change?.updateDescription?.truncatedArrays)
delete change.updateDescription.truncatedArrays

const obj = {
cnt: cnt++,
tag: change?.fullDocument?.tag,
operationType: change.operationType,
documentKey: change.documentKey,
updateDescription: change.updateDescription,
}
const chgLen = JSON.stringify(obj).length
if (chgLen > 60000) {
Log.log('Discarded change too large: ' + chgLen)
cnt--
continue
}
strSz += chgLen
fwArr.push(obj)
if (strSz > 5000) break
}
const opData = JSON.stringify(fwObj)
const opData = JSON.stringify(fwArr)
const message = zlib.deflateSync(opData)

Log.log(opData.length + ' ' + message.length)
if (message.length > maxSz) maxSz = message.length
if (message.length > 60000) {
Log.log('Message too large: ' + message.length)
cnt--
setTimeout(procQueue, 100)
return
}
Log.log('Objects: ' + fwArr.length)

const buff = Buffer.from(message)
await udpSocket.send(
buff,
Expand All @@ -154,16 +163,22 @@ async function procQueue() {
}
}
)
await sleep(15)
// Log.log('Data sent via UDP' + opData);
Log.log('Queue Size: ' + chgQueue.size())
Log.log('Size: ' + buff.length)
Log.log('Message count ' + fwObj.cnt)
Log.log('Change count ' + cnt)
Log.log('Max: ' + maxSz)
Log.log('Seq count ' + cntSeq++)
if (cntSeq > 75 || buff.length > 6000) {
setTimeout(procQueue, 100+100*parseInt(buff.length/1500))
if (cntSeq > 50 || buff.length > 6000) {
setTimeout(procQueue, 100 + 100 * parseInt(buff.length / 1500))
return
}
}

setTimeout(procQueue, 100)
}

function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
87 changes: 43 additions & 44 deletions src/mongowr/customized_module.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,63 +84,62 @@ let cntLost = 0

setTimeout(procQueue, 1000)
async function procQueue() {
let cntPr = 0

if (msgQueue.size() > 5000) {
msgQueue.clear()
Log.log('Queue too large! Emptied!')
}

let cntPr = 0
const updateOps = []
while (!msgQueue.isEmpty()) {
cntPr++
if (cntPr > 200) {
setTimeout(procQueue, 100)
return
if (cntPr > 50) {
break
}
try {
const dataObj = msgQueue.peek()
const arrObj = msgQueue.peek()
msgQueue.dequeue()

//if (msg.length > maxSz) maxSz = msg.length
//Log.log('Size: ' + msg.length)
//Log.log('Max: ' + maxSz)
Log.log('Queue Size: ' + msgQueue.size())

if (!dataObj?.cnt) {
Log.log('Unexpected format')
}
if (dataObj.cnt - cnt > 1 && cnt != -1) {
Log.log('Message lost # ' + (dataObj.cnt - 1))
cntLost += dataObj.cnt - cnt
if (arrObj.length)
for (let i = 0; i < arrObj.length; i++) {
const dataObj = arrObj[i]
Log.log('Queue Size: ' + msgQueue.size())

if (!dataObj?.cnt) {
Log.log('Unexpected format')
}
if (dataObj.cnt - cnt > 1 && cnt != -1) {
Log.log('Message lost # ' + (dataObj.cnt - 1))
cntLost += dataObj.cnt - cnt
}
cnt = dataObj.cnt
Log.log('Total lost: ' + cntLost)
Log.log(' Cnt: ' + dataObj.cnt)

// will process only update data from drivers
if (!dataObj?.updateDescription?.updatedFields?.sourceDataUpdate) return

if (dataObj?.updateDescription?.updatedFields?.sourceDataUpdate.timeTag)
dataObj.updateDescription.updatedFields.sourceDataUpdate.timeTag =
new Date(
dataObj.updateDescription.updatedFields.sourceDataUpdate.timeTag
)
if (
dataObj?.updateDescription?.updatedFields?.sourceDataUpdate
.timeTagAtSource
)
dataObj.updateDescription.updatedFields.sourceDataUpdate.timeTagAtSource =
new Date(
dataObj.updateDescription.updatedFields.sourceDataUpdate.timeTagAtSource
)

updateOps.push({
updateOne: {
filter: { ...dataObj.documentKey },
update: { $set: { ...dataObj.updateDescription.updatedFields } },
},
})
}
cnt = dataObj.cnt
Log.log('Total lost: ' + cntLost)
Log.log(' Cnt: ' + dataObj.cnt)

// will process only update data from drivers
if (!dataObj?.updateDescription?.updatedFields?.sourceDataUpdate) return

if (dataObj?.updateDescription?.updatedFields?.sourceDataUpdate.timeTag)
dataObj.updateDescription.updatedFields.sourceDataUpdate.timeTag =
new Date(
dataObj.updateDescription.updatedFields.sourceDataUpdate.timeTag
)
if (
dataObj?.updateDescription?.updatedFields?.sourceDataUpdate
.timeTagAtSource
)
dataObj.updateDescription.updatedFields.sourceDataUpdate.timeTagAtSource =
new Date(
dataObj.updateDescription.updatedFields.sourceDataUpdate.timeTagAtSource
)

updateOps.push({
updateOne: {
filter: { ...dataObj.documentKey },
update: { $set: { ...dataObj.updateDescription.updatedFields } },
},
})
} catch (e) {
Log.log('Error: ' + e)
}
Expand Down

0 comments on commit 4b99f45

Please sign in to comment.