Skip to content

Commit

Permalink
Revert "unit test (#140)" (#141)
Browse files Browse the repository at this point in the history
This reverts commit ae23d09.
  • Loading branch information
ToniRamirezM authored Sep 10, 2024
1 parent ae23d09 commit 2cd1625
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 114 deletions.
10 changes: 8 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,10 @@ func runClient(ctx *cli.Context) error {
paramDumpBatch := ctx.String("dumpbatch")

// Create client
c := datastreamer.NewClient(server, StSequencer)
c, err := datastreamer.NewClient(server, StSequencer)
if err != nil {
return err
}

// Set process entry callback function
if !sanityCheck {
Expand All @@ -475,7 +478,10 @@ func runClient(ctx *cli.Context) error {
}

// Start client (connect to the server)
c.Start()
err = c.Start()
if err != nil {
return err
}

// Query file header information
if queryHeader {
Expand Down
24 changes: 10 additions & 14 deletions datastreamer/datastreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ var (
},
WriteTimeout: 3 * time.Second,
}
leveldb = config.Filename[0:strings.IndexRune(config.Filename, '.')] + ".db"
streamServer *datastreamer.StreamServer
streamType = datastreamer.StreamType(1)
entryType1 = datastreamer.EntryType(1)
Expand Down Expand Up @@ -179,15 +180,15 @@ var (
}
)

func deleteFiles(fileName string) error {
func deleteFiles() error {
// Delete test file from filesystem
err := os.Remove(fileName)
err := os.Remove(config.Filename)
if err != nil && !os.IsNotExist(err) {
return err
}

// Delete leveldb folder from filesystem
err = os.RemoveAll(fileName[0:strings.IndexRune(fileName, '.')] + ".db")
err = os.RemoveAll(leveldb)
if err != nil && !os.IsNotExist(err) {
return err
}
Expand All @@ -196,7 +197,7 @@ func deleteFiles(fileName string) error {
}

func TestServer(t *testing.T) {
err := deleteFiles(config.Filename)
err := deleteFiles()
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -447,8 +448,6 @@ func TestServer(t *testing.T) {

// Log final file header
datastreamer.PrintHeaderEntry(streamServer.GetHeader(), "final tests")

streamServer.BookmarkPrintDump()
}

func TestClient(t *testing.T) {
Expand All @@ -457,13 +456,15 @@ func TestClient(t *testing.T) {
var entry datastreamer.FileEntry
var header datastreamer.HeaderEntry

client := datastreamer.NewClient(fmt.Sprintf("localhost:%d", config.Port), streamType)
client, err := datastreamer.NewClient(fmt.Sprintf("localhost:%d", config.Port), streamType)
require.NoError(t, err)

client.Start()
err = client.Start()
require.NoError(t, err)

// Case: Query data from not existing bookmark -> FAIL
fromBookmark = nonAddedBookmark.Encode()
_, err := client.ExecCommandGetBookmark(fromBookmark)
_, err = client.ExecCommandGetBookmark(fromBookmark)
require.EqualError(t, datastreamer.ErrBookmarkNotFound, err.Error())

// Case: Query data from existing bookmark -> OK
Expand Down Expand Up @@ -533,9 +534,4 @@ func TestClient(t *testing.T) {
entry, err = client.ExecCommandGetEntry(fromEntry)
require.NoError(t, err)
require.Equal(t, testEntries[2], TestEntry{}.Decode(entry.Data))
require.Equal(t, uint64(0), client.GetFromStream())
require.Equal(t, uint64(1304), client.GetTotalEntries())
require.True(t, client.IsStarted())
log.Debug("closing connection from the test")
client.CloseConnection()
}
32 changes: 17 additions & 15 deletions datastreamer/streamclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type StreamClient struct {
}

// NewClient creates a new data stream client
func NewClient(server string, streamType StreamType) *StreamClient {
func NewClient(server string, streamType StreamType) (*StreamClient, error) {
// Create the client data stream
c := StreamClient{
server: server,
Expand All @@ -69,17 +69,17 @@ func NewClient(server string, streamType StreamType) *StreamClient {
// Set default callback function to process entry
c.setProcessEntryFunc(PrintReceivedEntry, c.relayServer)

return &c
return &c, nil
}

// NewClientWithLogsConfig creates a new data stream client with logs configuration
func NewClientWithLogsConfig(server string, streamType StreamType, logsConfig log.Config) *StreamClient {
func NewClientWithLogsConfig(server string, streamType StreamType, logsConfig log.Config) (*StreamClient, error) {
log.Init(logsConfig)
return NewClient(server, streamType)
}

// Start connects to the data stream server and starts getting data from the server
func (c *StreamClient) Start() {
func (c *StreamClient) Start() error {
// Connect to server
c.connectServer()

Expand All @@ -91,6 +91,8 @@ func (c *StreamClient) Start() {

// Flag stared
c.started = true

return nil
}

// connectServer waits until the server connection is established and returns if a command result is pending
Expand All @@ -114,7 +116,7 @@ func (c *StreamClient) connectServer() bool {
if c.streaming {
_, _, err = c.execCommand(CmdStart, true, c.nextEntry, nil)
if err != nil {
c.CloseConnection()
c.closeConnection()
time.Sleep(defaultTimeout)
continue
}
Expand All @@ -127,8 +129,8 @@ func (c *StreamClient) connectServer() bool {
return false
}

// CloseConnection closes connection to the server
func (c *StreamClient) CloseConnection() {
// closeConnection closes connection to the server
func (c *StreamClient) closeConnection() {
if c.conn != nil {
log.Infof("%s Close connection", c.ID)
c.conn.Close()
Expand Down Expand Up @@ -430,7 +432,7 @@ func (c *StreamClient) readResultEntry() (ResultEntry, error) {
if err != nil {
return e, err
}
// e.PrintResultEntry()
// PrintResultEntry(e)
return e, nil
}

Expand All @@ -451,7 +453,7 @@ func (c *StreamClient) readContent(buffer []byte) error {

// readEntries reads from the server all type of packets
func (c *StreamClient) readEntries() {
defer c.CloseConnection()
defer c.closeConnection()

for {
// Wait for connection
Expand All @@ -461,7 +463,7 @@ func (c *StreamClient) readEntries() {
packet := make([]byte, 1)
err := c.readContent(packet)
if err != nil {
c.CloseConnection()
c.closeConnection()
continue
}

Expand All @@ -471,7 +473,7 @@ func (c *StreamClient) readEntries() {
// Read result entry data
r, err := c.readResultEntry()
if err != nil {
c.CloseConnection()
c.closeConnection()
continue
}
// Send data to results channel
Expand All @@ -480,7 +482,7 @@ func (c *StreamClient) readEntries() {
if deferredResult {
r := c.getResult(CmdStart)
if r.errorNum != uint32(CmdErrOK) {
c.CloseConnection()
c.closeConnection()
time.Sleep(defaultTimeout)
continue
}
Expand All @@ -490,7 +492,7 @@ func (c *StreamClient) readEntries() {
// Read result entry data
r, err := c.readDataEntry()
if err != nil {
c.CloseConnection()
c.closeConnection()
continue
}
c.entryRsp <- r
Expand All @@ -499,7 +501,7 @@ func (c *StreamClient) readEntries() {
// Read header entry data
h, err := c.readHeaderEntry()
if err != nil {
c.CloseConnection()
c.closeConnection()
continue
}
// Send data to headers channel
Expand All @@ -509,7 +511,7 @@ func (c *StreamClient) readEntries() {
// Read file/stream entry data
e, err := c.readDataEntry()
if err != nil {
c.CloseConnection()
c.closeConnection()
continue
}
// Send data to stream entries channel
Expand Down
22 changes: 0 additions & 22 deletions datastreamer/streamfile_test.go

This file was deleted.

12 changes: 10 additions & 2 deletions datastreamer/streamrelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ func NewRelay(server string, port uint16, version uint8, systemID uint64,
var err error

// Create client side
r.client = NewClient(server, streamType)
r.client, err = NewClient(server, streamType)
if err != nil {
log.Errorf("Error creating relay client side: %v", err)
return nil, err
}

// Create server side
r.server, err = NewServer(port, version, systemID, streamType, fileName, writeTimeout,
Expand All @@ -39,7 +43,11 @@ func NewRelay(server string, port uint16, version uint8, systemID uint64,
// Start connects and syncs with master server then opens access to relay clients
func (r *StreamRelay) Start() error {
// Start client side
r.client.Start()
err := r.client.Start()
if err != nil {
log.Errorf("Error starting relay client: %v", err)
return err
}

// Get total entries from the master server
header, err := r.client.ExecCommandGetHeader()
Expand Down
58 changes: 0 additions & 58 deletions datastreamer/streamrelay_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion datastreamer/streamserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,7 @@ func DecodeBinaryToResultEntry(b []byte) (ResultEntry, error) {
}

// PrintResultEntry prints result entry type
func (e ResultEntry) PrintResultEntry() {
func PrintResultEntry(e ResultEntry) {
log.Debug("--- RESULT ENTRY -------------------------")
log.Debugf("packetType: [%d]", e.packetType)
log.Debugf("length: [%d]", e.length)
Expand Down

0 comments on commit 2cd1625

Please sign in to comment.