From 37ff0cf76aad2d485d47ff28e68811922b0b83c9 Mon Sep 17 00:00:00 2001 From: JianGuo Date: Wed, 3 Jan 2024 22:30:31 +0800 Subject: [PATCH] update (#101) --- datastreamer/streamserver.go | 178 +++++++++++++++++++---------------- 1 file changed, 96 insertions(+), 82 deletions(-) diff --git a/datastreamer/streamserver.go b/datastreamer/streamserver.go index 98ba591..15356bf 100644 --- a/datastreamer/streamserver.go +++ b/datastreamer/streamserver.go @@ -138,6 +138,7 @@ type client struct { conn net.Conn status ClientStatus fromEntry uint64 + clientId string } // ResultEntry type for a result entry @@ -237,7 +238,7 @@ func (s *StreamServer) waitConnections() { } // Check max connections allowed - if len(s.clients) >= maxConnections { + if s.getSafeClientsLen() >= maxConnections { log.Warnf("Unable to accept client connection, maximum number of connections reached (%d)", maxConnections) conn.Close() time.Sleep(2 * time.Second) // nolint:gomnd @@ -261,6 +262,7 @@ func (s *StreamServer) handleConnection(conn net.Conn) { conn: conn, status: csStopped, fromEntry: 0, + clientId: clientId, } s.mutexClients.Unlock() @@ -288,7 +290,7 @@ func (s *StreamServer) handleConnection(conn net.Conn) { // Manage the requested command log.Debugf("Command %d[%s] received from %s", command, StrCommand[Command(command)], clientId) - err = s.processCommand(Command(command), clientId) + err = s.processCommand(Command(command), s.getSafeClient(clientId)) if err != nil { // Kill client connection time.Sleep(2 * time.Second) // nolint:gomnd @@ -588,7 +590,8 @@ func (s *StreamServer) broadcastAtomicOp() { // Wait for new atomic operation to broadcast broadcastOp := <-s.stream start := time.Now().UnixMilli() - + var killedClientMap = map[string]struct{}{} + s.mutexClients.Lock() // For each connected and started client log.Debugf("Clients: %d, AO-entries: %d", len(s.clients), len(broadcastOp.entries)) for id, cli := range s.clients { @@ -612,11 +615,17 @@ func (s *StreamServer) broadcastAtomicOp() { if err != nil { // Kill client connection log.Warnf("Error sending entry to %s: %v", id, err) - s.killClient(id) + killedClientMap[id] = struct{}{} } } } } + s.mutexClients.Unlock() + + for k := range killedClientMap { + s.killClient(k) + } + log.Debugf("broadcastAtomicOp process time: %vms", time.Now().UnixMilli()-start) } } @@ -637,8 +646,8 @@ func (s *StreamServer) killClient(clientId string) { } // processCommand manages the received TCP commands from the clients -func (s *StreamServer) processCommand(command Command, clientId string) error { - cli := s.clients[clientId] +func (s *StreamServer) processCommand(command Command, client *client) error { + cli := client // Manage each different kind of command request from a client var err error @@ -647,10 +656,10 @@ func (s *StreamServer) processCommand(command Command, clientId string) error { if cli.status != csStopped { log.Error("Stream to client already started!") err = ErrClientAlreadyStarted - _ = s.sendResultEntry(uint32(CmdErrAlreadyStarted), StrCommandErrors[CmdErrAlreadyStarted], clientId) + _ = s.sendResultEntry(uint32(CmdErrAlreadyStarted), StrCommandErrors[CmdErrAlreadyStarted], client) } else { cli.status = csSyncing - err = s.processCmdStart(clientId) + err = s.processCmdStart(client) if err == nil { cli.status = csSynced } @@ -660,10 +669,10 @@ func (s *StreamServer) processCommand(command Command, clientId string) error { if cli.status != csStopped { log.Error("Stream to client already started!") err = ErrClientAlreadyStarted - _ = s.sendResultEntry(uint32(CmdErrAlreadyStarted), StrCommandErrors[CmdErrAlreadyStarted], clientId) + _ = s.sendResultEntry(uint32(CmdErrAlreadyStarted), StrCommandErrors[CmdErrAlreadyStarted], client) } else { cli.status = csSyncing - err = s.processCmdStartBookmark(clientId) + err = s.processCmdStartBookmark(client) if err == nil { cli.status = csSynced } @@ -673,148 +682,146 @@ func (s *StreamServer) processCommand(command Command, clientId string) error { if cli.status != csSynced { log.Error("Stream to client already stopped!") err = ErrClientAlreadyStopped - _ = s.sendResultEntry(uint32(CmdErrAlreadyStopped), StrCommandErrors[CmdErrAlreadyStopped], clientId) + _ = s.sendResultEntry(uint32(CmdErrAlreadyStopped), StrCommandErrors[CmdErrAlreadyStopped], client) } else { cli.status = csStopped - err = s.processCmdStop(clientId) + err = s.processCmdStop(client) } case CmdHeader: if cli.status != csStopped { log.Error("Header command not allowed, stream started!") err = ErrHeaderCommandNotAllowed - _ = s.sendResultEntry(uint32(CmdErrAlreadyStarted), StrCommandErrors[CmdErrAlreadyStarted], clientId) + _ = s.sendResultEntry(uint32(CmdErrAlreadyStarted), StrCommandErrors[CmdErrAlreadyStarted], client) } else { - err = s.processCmdHeader(clientId) + err = s.processCmdHeader(client) } case CmdEntry: if cli.status != csStopped { log.Error("Entry command not allowed, stream started!") err = ErrEntryCommandNotAllowed - _ = s.sendResultEntry(uint32(CmdErrAlreadyStarted), StrCommandErrors[CmdErrAlreadyStarted], clientId) + _ = s.sendResultEntry(uint32(CmdErrAlreadyStarted), StrCommandErrors[CmdErrAlreadyStarted], client) } else { - err = s.processCmdEntry(clientId) + err = s.processCmdEntry(client) } case CmdBookmark: if cli.status != csStopped { log.Error("Bookmark command not allowed, stream started!") err = ErrBookmarkCommandNotAllowed - _ = s.sendResultEntry(uint32(CmdErrAlreadyStarted), StrCommandErrors[CmdErrAlreadyStarted], clientId) + _ = s.sendResultEntry(uint32(CmdErrAlreadyStarted), StrCommandErrors[CmdErrAlreadyStarted], client) } else { - err = s.processCmdBookmark(clientId) + err = s.processCmdBookmark(client) } default: log.Error("Invalid command!") err = ErrInvalidCommand - _ = s.sendResultEntry(uint32(CmdErrInvalidCommand), StrCommandErrors[CmdErrInvalidCommand], clientId) + _ = s.sendResultEntry(uint32(CmdErrInvalidCommand), StrCommandErrors[CmdErrInvalidCommand], client) } return err } // processCmdStart processes the TCP Start command from the clients -func (s *StreamServer) processCmdStart(clientId string) error { +func (s *StreamServer) processCmdStart(client *client) error { // Read from entry number parameter - conn := s.clients[clientId].conn - fromEntry, err := readFullUint64(conn) + fromEntry, err := readFullUint64(client.conn) if err != nil { return err } - s.clients[clientId].fromEntry = fromEntry + client.fromEntry = fromEntry // Log - log.Infof("Client %s command Start from %d", clientId, fromEntry) + log.Infof("Client %s command Start from %d", client.clientId, fromEntry) // Check received param if fromEntry > s.nextEntry && fromEntry > s.initEntry { - log.Infof("Start command invalid from entry %d for client %s", fromEntry, clientId) + log.Infof("Start command invalid from entry %d for client %s", fromEntry, client.clientId) err = ErrStartCommandInvalidParamFromEntry - _ = s.sendResultEntry(uint32(CmdErrBadFromEntry), StrCommandErrors[CmdErrBadFromEntry], clientId) + _ = s.sendResultEntry(uint32(CmdErrBadFromEntry), StrCommandErrors[CmdErrBadFromEntry], client) return err } // Send a command result entry OK - err = s.sendResultEntry(0, "OK", clientId) + err = s.sendResultEntry(0, "OK", client) if err != nil { return err } // Stream entries data from the requested entry number if fromEntry < s.nextEntry { - err = s.streamingFromEntry(clientId, fromEntry) + err = s.streamingFromEntry(client, fromEntry) } return err } // processCmdStartBookmark processes the TCP Start Bookmark command from the clients -func (s *StreamServer) processCmdStartBookmark(clientId string) error { +func (s *StreamServer) processCmdStartBookmark(client *client) error { // Read bookmark length parameter - conn := s.clients[clientId].conn - length, err := readFullUint32(conn) + length, err := readFullUint32(client.conn) if err != nil { return err } // Check maximum length allowed if length > maxBookmarkLength { - log.Infof("Client %s exceeded [%d] maximum allowed length [%d] for a bookmark.", clientId, length, maxBookmarkLength) + log.Infof("Client %s exceeded [%d] maximum allowed length [%d] for a bookmark.", client.clientId, length, maxBookmarkLength) return ErrBookmarkMaxLength } // Read bookmark parameter - bookmark, err := readFullBytes(length, conn) + bookmark, err := readFullBytes(length, client.conn) if err != nil { return err } // Log - log.Infof("Client %s command StartBookmark [%v]", clientId, bookmark) + log.Infof("Client %s command StartBookmark [%v]", client.clientId, bookmark) // Get bookmark entryNum, err := s.bookmark.GetBookmark(bookmark) if err != nil { - log.Infof("StartBookmark command invalid from bookmark %v for client %s: %v", bookmark, clientId, err) + log.Infof("StartBookmark command invalid from bookmark %v for client %s: %v", bookmark, client.clientId, err) err = ErrStartBookmarkInvalidParamFromBookmark - _ = s.sendResultEntry(uint32(CmdErrBadFromBookmark), StrCommandErrors[CmdErrBadFromBookmark], clientId) + _ = s.sendResultEntry(uint32(CmdErrBadFromBookmark), StrCommandErrors[CmdErrBadFromBookmark], client) return err } // Send a command result entry OK - err = s.sendResultEntry(0, "OK", clientId) + err = s.sendResultEntry(0, "OK", client) if err != nil { return err } // Stream entries data from the entry number marked by the bookmark - log.Infof("Client %s Bookmark [%v] is the entry number [%d]", clientId, bookmark, entryNum) + log.Infof("Client %s Bookmark [%v] is the entry number [%d]", client.clientId, bookmark, entryNum) if entryNum < s.nextEntry { - err = s.streamingFromEntry(clientId, entryNum) + err = s.streamingFromEntry(client, entryNum) } return err } // processCmdStop processes the TCP Stop command from the clients -func (s *StreamServer) processCmdStop(clientId string) error { +func (s *StreamServer) processCmdStop(client *client) error { // Log - log.Infof("Client %s command Stop", clientId) + log.Infof("Client %s command Stop", client.clientId) // Send a command result entry OK - err := s.sendResultEntry(0, "OK", clientId) + err := s.sendResultEntry(0, "OK", client) return err } // processCmdHeader processes the TCP Header command from the clients -func (s *StreamServer) processCmdHeader(clientId string) error { +func (s *StreamServer) processCmdHeader(client *client) error { // Log - log.Infof("Client %s command Header", clientId) + log.Infof("Client %s command Header", client.clientId) // Send a command result entry OK - err := s.sendResultEntry(0, "OK", clientId) + err := s.sendResultEntry(0, "OK", client) if err != nil { return err } @@ -824,33 +831,31 @@ func (s *StreamServer) processCmdHeader(clientId string) error { binaryHeader := encodeHeaderEntryToBinary(header) // Send header entry to the client - conn := s.clients[clientId].conn - if conn != nil { - _, err = conn.Write(binaryHeader) + if client.conn != nil { + _, err = client.conn.Write(binaryHeader) } else { err = ErrNilConnection } if err != nil { - log.Warnf("Error sending header entry to %s: %v", clientId, err) + log.Warnf("Error sending header entry to %s: %v", client.clientId, err) return err } return nil } // processCmdEntry processes the TCP Entry command from the clients -func (s *StreamServer) processCmdEntry(clientId string) error { +func (s *StreamServer) processCmdEntry(client *client) error { // Read from entry number parameter - conn := s.clients[clientId].conn - entryNumber, err := readFullUint64(conn) + entryNumber, err := readFullUint64(client.conn) if err != nil { return err } // Log - log.Infof("Client %s command Entry %d", clientId, entryNumber) + log.Infof("Client %s command Entry %d", client.clientId, entryNumber) // Send a command result entry OK - err = s.sendResultEntry(0, "OK", clientId) + err = s.sendResultEntry(0, "OK", client) if err != nil { return err } @@ -867,14 +872,13 @@ func (s *StreamServer) processCmdEntry(clientId string) error { binaryEntry := encodeFileEntryToBinary(entry) // Send entry to the client - conn = s.clients[clientId].conn - if conn != nil { - _, err = conn.Write(binaryEntry) + if client.conn != nil { + _, err = client.conn.Write(binaryEntry) } else { err = ErrNilConnection } if err != nil { - log.Warnf("Error sending entry to %s: %v", clientId, err) + log.Warnf("Error sending entry to %s: %v", client.clientId, err) return err } @@ -882,31 +886,30 @@ func (s *StreamServer) processCmdEntry(clientId string) error { } // processCmdBookmark processes the TCP Bookmark command from the clients -func (s *StreamServer) processCmdBookmark(clientId string) error { +func (s *StreamServer) processCmdBookmark(client *client) error { // Read bookmark length parameter - conn := s.clients[clientId].conn - length, err := readFullUint32(conn) + length, err := readFullUint32(client.conn) if err != nil { return err } // Check maximum length allowed if length > maxBookmarkLength { - log.Infof("Client %s exceeded [%d] maximum allowed length [%d] for a bookmark.", clientId, length, maxBookmarkLength) + log.Infof("Client %s exceeded [%d] maximum allowed length [%d] for a bookmark.", client.clientId, length, maxBookmarkLength) return ErrBookmarkMaxLength } // Read bookmark parameter - bookmark, err := readFullBytes(length, conn) + bookmark, err := readFullBytes(length, client.conn) if err != nil { return err } // Log - log.Infof("Client %s command Bookmark %v", clientId, bookmark) + log.Infof("Client %s command Bookmark %v", client.clientId, bookmark) // Send a command result entry OK - err = s.sendResultEntry(0, "OK", clientId) + err = s.sendResultEntry(0, "OK", client) if err != nil { return err } @@ -923,14 +926,13 @@ func (s *StreamServer) processCmdBookmark(clientId string) error { binaryEntry := encodeFileEntryToBinary(entry) // Send entry to the client - conn = s.clients[clientId].conn - if conn != nil { - _, err = conn.Write(binaryEntry) + if client.conn != nil { + _, err = client.conn.Write(binaryEntry) } else { err = ErrNilConnection } if err != nil { - log.Warnf("Error sending entry to %s: %v", clientId, err) + log.Warnf("Error sending entry to %s: %v", client.clientId, err) return err } @@ -938,10 +940,9 @@ func (s *StreamServer) processCmdBookmark(clientId string) error { } // streamingFromEntry sends to the client the stream data starting from the requested entry number -func (s *StreamServer) streamingFromEntry(clientId string, fromEntry uint64) error { +func (s *StreamServer) streamingFromEntry(client *client, fromEntry uint64) error { // Log - conn := s.clients[clientId].conn - log.Infof("SYNCING %s from entry %d...", clientId, fromEntry) + log.Infof("SYNCING %s from entry %d...", client.clientId, fromEntry) // Start file stream iterator iterator, err := s.streamFile.iteratorFrom(fromEntry, true) @@ -963,18 +964,18 @@ func (s *StreamServer) streamingFromEntry(clientId string, fromEntry uint64) err // Send the file data entry binaryEntry := encodeFileEntryToBinary(iterator.Entry) - log.Debugf("Sending data entry %d (type %d) to %s", iterator.Entry.Number, iterator.Entry.Type, clientId) - if conn != nil { - _, err = conn.Write(binaryEntry) + log.Debugf("Sending data entry %d (type %d) to %s", iterator.Entry.Number, iterator.Entry.Type, client.clientId) + if client.conn != nil { + _, err = client.conn.Write(binaryEntry) } else { err = ErrNilConnection } if err != nil { - log.Warnf("Error sending entry %d to %s: %v", iterator.Entry.Number, clientId, err) + log.Warnf("Error sending entry %d to %s: %v", iterator.Entry.Number, client.clientId, err) return err } } - log.Infof("Synced %s until %d!", clientId, iterator.Entry.Number) + log.Infof("Synced %s until %d!", client.clientId, iterator.Entry.Number) // Close iterator s.streamFile.iteratorEnd(iterator) @@ -983,7 +984,7 @@ func (s *StreamServer) streamingFromEntry(clientId string, fromEntry uint64) err } // sendResultEntry sends the response to a TCP command for the clients -func (s *StreamServer) sendResultEntry(errorNum uint32, errorStr string, clientId string) error { +func (s *StreamServer) sendResultEntry(errorNum uint32, errorStr string, client *client) error { // Prepare the result entry byteSlice := []byte(errorStr) @@ -1001,19 +1002,32 @@ func (s *StreamServer) sendResultEntry(errorNum uint32, errorStr string, clientI // Send the result entry to the client var err error - conn := s.clients[clientId].conn - if conn != nil { - _, err = conn.Write(binaryEntry) + if client.conn != nil { + _, err = client.conn.Write(binaryEntry) } else { err = ErrNilConnection } if err != nil { - log.Warnf("Error sending result entry to %s: %v", clientId, err) + log.Warnf("Error sending result entry to %s: %v", client.clientId, err) return err } return nil } +func (s *StreamServer) getSafeClient(clientId string) *client { + s.mutexClients.Lock() + client := s.clients[clientId] + s.mutexClients.Unlock() + return client +} + +func (s *StreamServer) getSafeClientsLen() int { + s.mutexClients.Lock() + clientLen := len(s.clients) + s.mutexClients.Unlock() + return clientLen +} + // BookmarkPrintDump prints all bookmarks func (s *StreamServer) BookmarkPrintDump() { err := s.bookmark.PrintDump()