Skip to content

Commit

Permalink
considered if copy data is too large
Browse files Browse the repository at this point in the history
  • Loading branch information
diPhantxm committed Sep 14, 2024
1 parent 2ee3ff3 commit 00a5627
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions router/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type RelayStateMgr interface {
RelayRunCommand(msg pgproto3.FrontendMessage, waitForResp bool, replyCl bool) error

ProcQuery(query pgproto3.FrontendMessage, waitForResp bool, replyCl bool) (txstatus.TXStatus, []pgproto3.BackendMessage, bool, error)
ProcCopy(stmt *lyx.Copy, data *pgproto3.CopyData) error
ProcCopy(stmt *lyx.Copy, data *pgproto3.CopyData, expRoute *routingstate.DataShardRoute) error

ProcCommand(query pgproto3.FrontendMessage, waitForResp bool, replyCl bool) error

Expand Down Expand Up @@ -657,7 +657,7 @@ func (rst *RelayStateImpl) RelayRunCommand(msg pgproto3.FrontendMessage, waitFor
}

// TODO : unit tests
func (rst *RelayStateImpl) ProcCopy(stmt *lyx.Copy, data *pgproto3.CopyData) error {
func (rst *RelayStateImpl) ProcCopy(stmt *lyx.Copy, data *pgproto3.CopyData, expRoute *routingstate.DataShardRoute) error {
spqrlog.Zero.Debug().
Uint("client", rst.Client().ID()).
Msg("client process copy")
Expand All @@ -676,10 +676,10 @@ func (rst *RelayStateImpl) ProcCopy(stmt *lyx.Copy, data *pgproto3.CopyData) err
// Parse data
// and decide where to route
prev := 0
var route *routingstate.DataShardRoute
valueClause := &lyx.ValueClause{}
for i, b := range data.Data {
if i+2 < len(data.Data) && string(data.Data[i:i+2]) == "\\." {
prev = len(data.Data)
break
}
if b == '\n' || b == delimiter {
Expand All @@ -701,10 +701,10 @@ func (rst *RelayStateImpl) ProcCopy(stmt *lyx.Copy, data *pgproto3.CopyData) err
return fmt.Errorf("multishard copy is not supported")
}

if route == nil {
route = smt.Route
if expRoute.Shkey.Name == "" {
*expRoute = *smt.Route
}
if smt.Route.Shkey.Name != route.Shkey.Name {
if smt.Route.Shkey.Name != expRoute.Shkey.Name {
return fmt.Errorf("multishard copy is not supported")
}

Expand All @@ -713,11 +713,14 @@ func (rst *RelayStateImpl) ProcCopy(stmt *lyx.Copy, data *pgproto3.CopyData) err
}

for _, sh := range rst.Client().Server().Datashards() {
if route != nil && sh.Name() == route.Shkey.Name {
return sh.Send(data)
if expRoute != nil && sh.Name() == expRoute.Shkey.Name {
err := sh.Send(&pgproto3.CopyData{Data: data.Data[:prev]})
data.Data = data.Data[prev:]
return err
}
}

// shouldn't exit from here
return nil
}

Expand Down Expand Up @@ -805,20 +808,21 @@ func (rst *RelayStateImpl) ProcQuery(query pgproto3.FrontendMessage, waitForResp
q := rst.qp.Stmt().(*lyx.Copy)

if err := func() error {
buf := []byte{}
msg := &pgproto3.CopyData{Data: make([]byte, 0)}
route := &routingstate.DataShardRoute{}
for {
cpMsg, err := rst.Client().Receive()
if err != nil {
return err
}

switch msg := cpMsg.(type) {
switch newMsg := cpMsg.(type) {
case *pgproto3.CopyData:
buf = append(buf, msg.Data...)
case *pgproto3.CopyDone, *pgproto3.CopyFail:
if err := rst.ProcCopy(q, &pgproto3.CopyData{Data: buf}); err != nil {
msg.Data = append(msg.Data, newMsg.Data...)
if err = rst.ProcCopy(q, msg, route); err != nil {
return err
}
case *pgproto3.CopyDone, *pgproto3.CopyFail:
if err := rst.ProcCopyComplete(&cpMsg); err != nil {
return err
}
Expand Down

0 comments on commit 00a5627

Please sign in to comment.