-
Notifications
You must be signed in to change notification settings - Fork 188
.*: add compactor for syncer #2261
base: master
Are you sure you want to change the base?
Conversation
Co-authored-by: Ehco <[email protected]>
Co-authored-by: Ehco <[email protected]>
chanSize := s.cfg.QueueSize * s.cfg.WorkerCount / 2 | ||
if s.cfg.Compact { | ||
chanSize /= 2 | ||
} | ||
s.dmlJobCh = make(chan *job, chanSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DM originally cached s.cfg.QueueSize * s.cfg.WorkerCount dml jobs in memory. Now if compact: false
, dmlJobCh will dmlWorker will both cached s.cfg.QueueSize * s.cfg.WorkerCount/2 jobs. If compact: true
, dmlJobCh, compactor buffer, compactor output channel and dmlWorker will all cached s.cfg.QueueSize * s.cfg.WorkerCount/4 jobs.
Actually we can use a larger compact buffer-size, but if so, when user pause-task/stop-task, they may need to wait a longer time to wait all jobs flushed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems causality output channel didn't get adjusted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
if all chanSize and bufferSize is accept by other reviews, btw pelease add some comments when the parameters are determined
upgrade failed due to https://github.com/pingcap/dm/issues/1961 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will review after food
*delDML = *updateDML | ||
delDML.op = del | ||
// use oldValues of update as values of delete and reset oldValues | ||
delDML.values = updateDML.oldValues |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe in future we can remove the op
of DML
,
swicth DML
:
only has new value: it's an INSERT
only has old value: it's a DELETE
else: it's a UPDATE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe some problems like a table with all columns are generate columns, so new value and old value will both be null in insert statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe there should be at least one column, to let other columns generated from it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this kind of implicit opType is no good for the readbility if our code? Are there any harm to reserve the explicit op
field?
metrics.QueueSizeGauge.WithLabelValues(c.task, "compactor_input", c.source).Set(float64(len(c.inCh))) | ||
|
||
if j.tp == flush { | ||
c.flushBuffer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will block the consumption of c.inCh
. if we has a lot of jobs to be flushed, we are now blocked to wait every job sent to next stage. This will increase the risk of upstream read timeout.
Maybe we can implement ping-pong buffer in future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not the same problem. Upstream read timeout may also caused by the dmlJobCh is full. And after we support async flush checkpoint, we only need flush job when stop-task/pause-task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when DM write 1 job per second to downstream and we have 100 jobs in compactor to flush, we now have to wait 100 seconds before reading upstream. Without compactor we only wait 1 second.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 102 in a734fe0
if len(c.inCh) == 0 || len(c.buffer) >= c.bufferSize || len(c.outCh) == 0 { |
Does
len(c.in)==0
meets requirements?
Co-authored-by: lance6716 <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will review tests later
chanSize := s.cfg.QueueSize * s.cfg.WorkerCount / 2 | ||
if s.cfg.Compact { | ||
chanSize /= 2 | ||
} | ||
s.dmlJobCh = make(chan *job, chanSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems causality output channel didn't get adjusted
} | ||
|
||
for i := 0; i < len(values); i++ { | ||
if values[i] != oldValues[i] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're comparing interface{}, so I guess they are not equal for most time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interface values are comparable. Two interface values are equal if they have identical dynamic types and equal dynamic values or if both have value nil.
https://stackoverflow.com/questions/34245932/checking-equality-of-interface
I think causality output channel no need to adjust, because it has no buffer and default queue size 1024 is enough. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest lgtm
if c.safeMode { | ||
j.dml.safeMode = true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if c.safeMode { | |
j.dml.safeMode = true | |
} | |
j.dml.safeMode = c.safeMode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
key := j.dml.identifyKey() | ||
prevPos, ok := tableKeyMap[key] | ||
// if no such key in the buffer, add it | ||
if !ok || prevPos >= len(c.buffer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can prevPos bigger than buffer length
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just avoid panic. have removed it
*delDML = *updateDML | ||
delDML.op = del | ||
// use oldValues of update as values of delete and reset oldValues | ||
delDML.values = updateDML.oldValues |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this kind of implicit opType is no good for the readbility if our code? Are there any harm to reserve the explicit op
field?
@@ -59,10 +59,14 @@ type DMLWorker struct { | |||
|
|||
// dmlWorkerWrap creates and runs a dmlWorker instance and returns flush job channel. | |||
func dmlWorkerWrap(inCh chan *job, syncer *Syncer) chan *job { | |||
chanSize := syncer.cfg.QueueSize / 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment for this special logic? I'm curious if we can change the default value of this field to avoid this change. BTW, doesn't anyone know the effect of the chan size change? How can a user know he/she need to adjust its value and what is the proper value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update comment in pingcap/ticdc#3162
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think no user will change the queue-size... so I keep the original value and didn't add a new config item like compact-buffer-size
Please comment in https://github.com/pingcap/ticdc/pull/3162 |
/run-unit-test |
What problem does this PR solve?
add compactor to compact dml
What is changed and how it works?
Check List
Tests