From 2cd16259ac23dba71cbd7222ff3be31dfd745b51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= <58293609+ToniRamirezM@users.noreply.github.com> Date: Tue, 10 Sep 2024 12:33:19 +0200 Subject: [PATCH] Revert "unit test (#140)" (#141) This reverts commit ae23d0948a3d3315a5e79c318a10f7d0a0ee04d5. --- cmd/main.go | 10 ++++-- datastreamer/datastreamer_test.go | 24 ++++++------- datastreamer/streamclient.go | 32 +++++++++-------- datastreamer/streamfile_test.go | 22 ------------ datastreamer/streamrelay.go | 12 +++++-- datastreamer/streamrelay_test.go | 58 ------------------------------- datastreamer/streamserver.go | 2 +- 7 files changed, 46 insertions(+), 114 deletions(-) delete mode 100644 datastreamer/streamfile_test.go delete mode 100644 datastreamer/streamrelay_test.go diff --git a/cmd/main.go b/cmd/main.go index f83ffdf..bd81fc1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -452,7 +452,10 @@ func runClient(ctx *cli.Context) error { paramDumpBatch := ctx.String("dumpbatch") // Create client - c := datastreamer.NewClient(server, StSequencer) + c, err := datastreamer.NewClient(server, StSequencer) + if err != nil { + return err + } // Set process entry callback function if !sanityCheck { @@ -475,7 +478,10 @@ func runClient(ctx *cli.Context) error { } // Start client (connect to the server) - c.Start() + err = c.Start() + if err != nil { + return err + } // Query file header information if queryHeader { diff --git a/datastreamer/datastreamer_test.go b/datastreamer/datastreamer_test.go index 6760e5c..9239896 100644 --- a/datastreamer/datastreamer_test.go +++ b/datastreamer/datastreamer_test.go @@ -104,6 +104,7 @@ var ( }, WriteTimeout: 3 * time.Second, } + leveldb = config.Filename[0:strings.IndexRune(config.Filename, '.')] + ".db" streamServer *datastreamer.StreamServer streamType = datastreamer.StreamType(1) entryType1 = datastreamer.EntryType(1) @@ -179,15 +180,15 @@ var ( } ) -func deleteFiles(fileName string) error { +func deleteFiles() error { // Delete test file from filesystem - err := os.Remove(fileName) + err := os.Remove(config.Filename) if err != nil && !os.IsNotExist(err) { return err } // Delete leveldb folder from filesystem - err = os.RemoveAll(fileName[0:strings.IndexRune(fileName, '.')] + ".db") + err = os.RemoveAll(leveldb) if err != nil && !os.IsNotExist(err) { return err } @@ -196,7 +197,7 @@ func deleteFiles(fileName string) error { } func TestServer(t *testing.T) { - err := deleteFiles(config.Filename) + err := deleteFiles() if err != nil { panic(err) } @@ -447,8 +448,6 @@ func TestServer(t *testing.T) { // Log final file header datastreamer.PrintHeaderEntry(streamServer.GetHeader(), "final tests") - - streamServer.BookmarkPrintDump() } func TestClient(t *testing.T) { @@ -457,13 +456,15 @@ func TestClient(t *testing.T) { var entry datastreamer.FileEntry var header datastreamer.HeaderEntry - client := datastreamer.NewClient(fmt.Sprintf("localhost:%d", config.Port), streamType) + client, err := datastreamer.NewClient(fmt.Sprintf("localhost:%d", config.Port), streamType) + require.NoError(t, err) - client.Start() + err = client.Start() + require.NoError(t, err) // Case: Query data from not existing bookmark -> FAIL fromBookmark = nonAddedBookmark.Encode() - _, err := client.ExecCommandGetBookmark(fromBookmark) + _, err = client.ExecCommandGetBookmark(fromBookmark) require.EqualError(t, datastreamer.ErrBookmarkNotFound, err.Error()) // Case: Query data from existing bookmark -> OK @@ -533,9 +534,4 @@ func TestClient(t *testing.T) { entry, err = client.ExecCommandGetEntry(fromEntry) require.NoError(t, err) require.Equal(t, testEntries[2], TestEntry{}.Decode(entry.Data)) - require.Equal(t, uint64(0), client.GetFromStream()) - require.Equal(t, uint64(1304), client.GetTotalEntries()) - require.True(t, client.IsStarted()) - log.Debug("closing connection from the test") - client.CloseConnection() } diff --git a/datastreamer/streamclient.go b/datastreamer/streamclient.go index b52aafa..0a0e51e 100644 --- a/datastreamer/streamclient.go +++ b/datastreamer/streamclient.go @@ -45,7 +45,7 @@ type StreamClient struct { } // NewClient creates a new data stream client -func NewClient(server string, streamType StreamType) *StreamClient { +func NewClient(server string, streamType StreamType) (*StreamClient, error) { // Create the client data stream c := StreamClient{ server: server, @@ -69,17 +69,17 @@ func NewClient(server string, streamType StreamType) *StreamClient { // Set default callback function to process entry c.setProcessEntryFunc(PrintReceivedEntry, c.relayServer) - return &c + return &c, nil } // NewClientWithLogsConfig creates a new data stream client with logs configuration -func NewClientWithLogsConfig(server string, streamType StreamType, logsConfig log.Config) *StreamClient { +func NewClientWithLogsConfig(server string, streamType StreamType, logsConfig log.Config) (*StreamClient, error) { log.Init(logsConfig) return NewClient(server, streamType) } // Start connects to the data stream server and starts getting data from the server -func (c *StreamClient) Start() { +func (c *StreamClient) Start() error { // Connect to server c.connectServer() @@ -91,6 +91,8 @@ func (c *StreamClient) Start() { // Flag stared c.started = true + + return nil } // connectServer waits until the server connection is established and returns if a command result is pending @@ -114,7 +116,7 @@ func (c *StreamClient) connectServer() bool { if c.streaming { _, _, err = c.execCommand(CmdStart, true, c.nextEntry, nil) if err != nil { - c.CloseConnection() + c.closeConnection() time.Sleep(defaultTimeout) continue } @@ -127,8 +129,8 @@ func (c *StreamClient) connectServer() bool { return false } -// CloseConnection closes connection to the server -func (c *StreamClient) CloseConnection() { +// closeConnection closes connection to the server +func (c *StreamClient) closeConnection() { if c.conn != nil { log.Infof("%s Close connection", c.ID) c.conn.Close() @@ -430,7 +432,7 @@ func (c *StreamClient) readResultEntry() (ResultEntry, error) { if err != nil { return e, err } - // e.PrintResultEntry() + // PrintResultEntry(e) return e, nil } @@ -451,7 +453,7 @@ func (c *StreamClient) readContent(buffer []byte) error { // readEntries reads from the server all type of packets func (c *StreamClient) readEntries() { - defer c.CloseConnection() + defer c.closeConnection() for { // Wait for connection @@ -461,7 +463,7 @@ func (c *StreamClient) readEntries() { packet := make([]byte, 1) err := c.readContent(packet) if err != nil { - c.CloseConnection() + c.closeConnection() continue } @@ -471,7 +473,7 @@ func (c *StreamClient) readEntries() { // Read result entry data r, err := c.readResultEntry() if err != nil { - c.CloseConnection() + c.closeConnection() continue } // Send data to results channel @@ -480,7 +482,7 @@ func (c *StreamClient) readEntries() { if deferredResult { r := c.getResult(CmdStart) if r.errorNum != uint32(CmdErrOK) { - c.CloseConnection() + c.closeConnection() time.Sleep(defaultTimeout) continue } @@ -490,7 +492,7 @@ func (c *StreamClient) readEntries() { // Read result entry data r, err := c.readDataEntry() if err != nil { - c.CloseConnection() + c.closeConnection() continue } c.entryRsp <- r @@ -499,7 +501,7 @@ func (c *StreamClient) readEntries() { // Read header entry data h, err := c.readHeaderEntry() if err != nil { - c.CloseConnection() + c.closeConnection() continue } // Send data to headers channel @@ -509,7 +511,7 @@ func (c *StreamClient) readEntries() { // Read file/stream entry data e, err := c.readDataEntry() if err != nil { - c.CloseConnection() + c.closeConnection() continue } // Send data to stream entries channel diff --git a/datastreamer/streamfile_test.go b/datastreamer/streamfile_test.go deleted file mode 100644 index cd8ac20..0000000 --- a/datastreamer/streamfile_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package datastreamer_test - -import ( - "testing" - - "github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer" - "github.com/stretchr/testify/require" -) - -func TestOpenFile(t *testing.T) { - fileName := "/tmp/datastreamer_test_3.bin" - err := deleteFiles(fileName) - require.NoError(t, err) - _, err = datastreamer.NewStreamFile(fileName, 1, 137, streamType) - require.NoError(t, err) - - _, err = datastreamer.NewStreamFile(fileName, 1, 137, streamType) - require.NoError(t, err) - - err = deleteFiles(fileName) - require.NoError(t, err) -} diff --git a/datastreamer/streamrelay.go b/datastreamer/streamrelay.go index 369e9c1..253bc0b 100644 --- a/datastreamer/streamrelay.go +++ b/datastreamer/streamrelay.go @@ -20,7 +20,11 @@ func NewRelay(server string, port uint16, version uint8, systemID uint64, var err error // Create client side - r.client = NewClient(server, streamType) + r.client, err = NewClient(server, streamType) + if err != nil { + log.Errorf("Error creating relay client side: %v", err) + return nil, err + } // Create server side r.server, err = NewServer(port, version, systemID, streamType, fileName, writeTimeout, @@ -39,7 +43,11 @@ func NewRelay(server string, port uint16, version uint8, systemID uint64, // Start connects and syncs with master server then opens access to relay clients func (r *StreamRelay) Start() error { // Start client side - r.client.Start() + err := r.client.Start() + if err != nil { + log.Errorf("Error starting relay client: %v", err) + return err + } // Get total entries from the master server header, err := r.client.ExecCommandGetHeader() diff --git a/datastreamer/streamrelay_test.go b/datastreamer/streamrelay_test.go deleted file mode 100644 index 4848000..0000000 --- a/datastreamer/streamrelay_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package datastreamer_test - -import ( - "fmt" - "testing" - "time" - - "github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer" - "github.com/stretchr/testify/require" -) - -func TestRelay(t *testing.T) { - fileName1 := "/tmp/datastreamer_test_0.bin" - fileName2 := "/tmp/datastreamer_test_1.bin" - - err := deleteFiles(fileName1) - require.NoError(t, err) - err = deleteFiles(fileName2) - require.NoError(t, err) - - streamServer, err := datastreamer.NewServer(6901, 1, 137, streamType, - fileName1, config.WriteTimeout, config.InactivityTimeout, 5*time.Second, &config.Log) - require.NoError(t, err) - - err = streamServer.Start() - require.NoError(t, err) - err = streamServer.StartAtomicOp() - require.NoError(t, err) - - entryNumber, err := streamServer.AddStreamBookmark(testBookmark.Encode()) - require.NoError(t, err) - require.Equal(t, uint64(0), entryNumber) - - entryNumber, err = streamServer.AddStreamEntry(entryType1, testEntries[1].Encode()) - require.NoError(t, err) - require.Equal(t, uint64(1), entryNumber) - - entryNumber, err = streamServer.AddStreamBookmark(testBookmark2.Encode()) - require.NoError(t, err) - require.Equal(t, uint64(2), entryNumber) - - entryNumber, err = streamServer.AddStreamEntry(entryType1, testEntries[2].Encode()) - require.NoError(t, err) - require.Equal(t, uint64(3), entryNumber) - - err = streamServer.CommitAtomicOp() - require.NoError(t, err) - - var relayPort uint16 = 6902 - sr, err := datastreamer.NewRelay(fmt.Sprintf("localhost:%d", 6901), relayPort, 1, 137, datastreamer.StreamType(1), - fileName2, config.WriteTimeout, config.InactivityTimeout, 5*time.Second, nil) - require.NoError(t, err) - err = sr.Start() - require.NoError(t, err) - - client := datastreamer.NewClient(fmt.Sprintf("localhost:%d", relayPort), streamType) - client.Start() -} diff --git a/datastreamer/streamserver.go b/datastreamer/streamserver.go index d574c7d..63b0f60 100644 --- a/datastreamer/streamserver.go +++ b/datastreamer/streamserver.go @@ -1263,7 +1263,7 @@ func DecodeBinaryToResultEntry(b []byte) (ResultEntry, error) { } // PrintResultEntry prints result entry type -func (e ResultEntry) PrintResultEntry() { +func PrintResultEntry(e ResultEntry) { log.Debug("--- RESULT ENTRY -------------------------") log.Debugf("packetType: [%d]", e.packetType) log.Debugf("length: [%d]", e.length)