Skip to content

Commit

Permalink
mysql/conn: do not allocate during writes (vitessio#14482)
Browse files Browse the repository at this point in the history
Signed-off-by: Vicent Marti <[email protected]>
  • Loading branch information
vmg authored Nov 7, 2023
1 parent a15ef42 commit 6eebcb7
Showing 1 changed file with 25 additions and 37 deletions.
62 changes: 25 additions & 37 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (c *Conn) endWriterBuffering() error {
c.bufferedWriter = nil
}()

c.stopFlushTimer()
c.flushTimer.Stop()
return c.bufferedWriter.Flush()
}

Expand All @@ -345,43 +345,20 @@ func (c *Conn) returnReader() {
readersPool.Put(c.bufferedReader)
}

// getWriter returns the current writer. It may be either
// the original connection or a wrapper. The returned unget
// function must be invoked after the writing is finished.
// In buffered mode, the unget starts a timer to flush any
// buffered data.
func (c *Conn) getWriter() (w io.Writer, unget func()) {
c.bufMu.Lock()
if c.bufferedWriter != nil {
return c.bufferedWriter, func() {
c.startFlushTimer()
c.bufMu.Unlock()
}
}
c.bufMu.Unlock()
return c.conn, func() {}
}

// startFlushTimer must be called while holding lock on bufMu.
func (c *Conn) startFlushTimer() {
c.stopFlushTimer()
c.flushTimer = time.AfterFunc(mysqlServerFlushDelay, func() {
c.bufMu.Lock()
defer c.bufMu.Unlock()

if c.bufferedWriter == nil {
return
}
c.stopFlushTimer()
c.bufferedWriter.Flush()
})
}
if c.flushTimer == nil {
c.flushTimer = time.AfterFunc(mysqlServerFlushDelay, func() {
c.bufMu.Lock()
defer c.bufMu.Unlock()

// stopFlushTimer must be called while holding lock on bufMu.
func (c *Conn) stopFlushTimer() {
if c.flushTimer != nil {
c.flushTimer.Stop()
c.flushTimer = nil
if c.bufferedWriter == nil {
return
}
c.bufferedWriter.Flush()
})
} else {
c.flushTimer.Reset(mysqlServerFlushDelay)
}
}

Expand Down Expand Up @@ -615,8 +592,19 @@ func (c *Conn) writePacket(data []byte) error {
index := 0
dataLength := len(data) - packetHeaderSize

w, unget := c.getWriter()
defer unget()
var w io.Writer

c.bufMu.Lock()
if c.bufferedWriter != nil {
w = c.bufferedWriter
defer func() {
c.startFlushTimer()
c.bufMu.Unlock()
}()
} else {
c.bufMu.Unlock()
w = c.conn
}

var header [packetHeaderSize]byte
for {
Expand Down

0 comments on commit 6eebcb7

Please sign in to comment.