From e726ebdf9d466702561a2997749618613594c617 Mon Sep 17 00:00:00 2001 From: Ulf Bjorkengren Date: Fri, 17 Jan 2025 14:15:30 +0100 Subject: [PATCH] Filetransfer refactoring Signed-off-by: Ulf Bjorkengren --- .../filetransfer_client.go | 16 ++- server/vissv2server/vissv2server.go | 6 +- server/vissv2server/wsMgrFT/wsMgrFT.go | 132 ++++++++++-------- 3 files changed, 85 insertions(+), 69 deletions(-) diff --git a/client/client-1.0/filetransfer_client/filetransfer_client.go b/client/client-1.0/filetransfer_client/filetransfer_client.go index 971354e2..198f5e8b 100644 --- a/client/client-1.0/filetransfer_client/filetransfer_client.go +++ b/client/client-1.0/filetransfer_client/filetransfer_client.go @@ -223,8 +223,7 @@ func downloadFile(dlFile string, numOfChunks int, ctrlChannel chan string, dataC fileSize := getFileSize(file) readBuffer := make([]byte, fileSize/numOfChunks+1) uid := "1d878212" //TODO: random generation - ctrlChannel <- `{"action": "set","path": "Vehicle.DownloadFile","value":{"name": "` + dlFile + `", "hash":"` + hash + `","uid":"` + uid + `"},` + - ` "ts": "` + utils.GetRfcTime() + `"}` + ctrlChannel <- `{"action": "set","path": "Vehicle.DownloadFile","value":{"name": "` + dlFile + `", "hash":"` + hash + `","uid":"` + uid + `"}}` ctrlResp := <-ctrlChannel if strings.Contains(ctrlResp, "error") { utils.Error.Printf("Server responded with error message=%s", ctrlResp) @@ -243,6 +242,10 @@ func downloadFile(dlFile string, numOfChunks int, ctrlChannel chan string, dataC mNo, status := decodeDlResponse(dataResp) if status == byte(0x00) && mNo == messageNo { messageNo += 1 + } else if status == byte(0xFF) { + utils.Error.Printf("Session terminated due to server status response=0xFF") + sessionDone = true // terminate session + continue } else { messageNo = mNo // rewind file @@ -315,7 +318,14 @@ func equalByteArray(array1 []byte, array2 []byte) bool { func getFileDescriptorData(value interface{}) (string, string, string) { // {"name": "xxx","hash": "yyy","uid": "zzz"} var name, hash, uid string - for k, v := range value.(map[string]interface{}) { + var valueMap map[string]interface{} + switch value.(type) { + case string: + utils.MapRequest(value.(string), &valueMap) + case map[string]interface{}: + valueMap = value.(map[string]interface{}) + } + for k, v := range valueMap { switch vv := v.(type) { case string: // utils.Info.Println(k, "is string", vv) diff --git a/server/vissv2server/vissv2server.go b/server/vissv2server/vissv2server.go index f40679ac..7b117df3 100644 --- a/server/vissv2server/vissv2server.go +++ b/server/vissv2server/vissv2server.go @@ -567,9 +567,9 @@ func issueServiceRequest(requestMap map[string]interface{}, tDChanIndex int, sDC } func initiateFileTransfer(requestMap map[string]interface{}, nodeType utils.NodeTypes_t, path string) map[string]interface{} { -utils.Info.Printf("initiateFileTransfer: requestMap[action]=%s, nodeType=%d", requestMap["action"], nodeType) +//utils.Info.Printf("initiateFileTransfer: requestMap[action]=%s, nodeType=%d", requestMap["action"], nodeType) var ftInitData utils.FileTransferCache - var responseMap map[string]interface{} + var responseMap = map[string]interface{}{} if requestMap["action"] == "set" && nodeType == utils.ACTUATOR { // download var uidString string ftInitData.UploadTransfer = false @@ -584,7 +584,6 @@ utils.Info.Printf("initiateFileTransfer: requestMap[action]=%s, nodeType=%d", re responseMap["action"] = "set" responseMap["ts"] = utils.GetRfcTime() return responseMap -// return `{"RouterId": "` + requestMap["RouterId"].(string) + `"action": "set", "ts": "` + utils.GetRfcTime() + `"}` } else { utils.SetErrorResponse(requestMap, errorResponseMap, 7, "") //service_unavailable return errorResponseMap @@ -592,7 +591,6 @@ utils.Info.Printf("initiateFileTransfer: requestMap[action]=%s, nodeType=%d", re } else if requestMap["action"] == "get" && nodeType == utils.SENSOR { //upload if requestMap["path"] != nil { -utils.Info.Printf("initiateFileTransfer: upload!!!") path := requestMap["path"].(string) ftInitData.UploadTransfer = true ftInitData.Path, ftInitData.Name = getInternalFileName(path) diff --git a/server/vissv2server/wsMgrFT/wsMgrFT.go b/server/vissv2server/wsMgrFT/wsMgrFT.go index 96b44057..368c644c 100644 --- a/server/vissv2server/wsMgrFT/wsMgrFT.go +++ b/server/vissv2server/wsMgrFT/wsMgrFT.go @@ -37,6 +37,14 @@ var clientIndex int var fileTransferCache []utils.FileTransferCache const FILETRANSFERCACHESIZE = 10 +type ChunkDataCache struct { + MessageNo byte + LastMessage byte + ChunkSize []byte + Chunk []byte +} +var chunkDataCache = ChunkDataCache{} + func WsMgrFTInit(ftChannel chan utils.FileTransferCache) { var clientRequest utils.FileTransferCache var dataMessage, dataResponse []byte @@ -48,61 +56,37 @@ func WsMgrFTInit(ftChannel chan utils.FileTransferCache) { } go initDataSessions(clientChan) + var clientId int for { select { case clientRequest = <-ftChannel: clientRequest.Status = initFtSession(clientRequest) ftChannel <-clientRequest + continue case dataMessage = <-clientChan[0]: - dataResponse = getDataResponse(dataMessage) - if len(dataResponse) > 0 { - clientChan[0] <- dataResponse - } + clientId = 0 case dataMessage = <-clientChan[1]: - dataResponse = getDataResponse(dataMessage) - if len(dataResponse) > 0 { - clientChan[1] <- dataResponse - } + clientId = 1 case dataMessage = <-clientChan[2]: - dataResponse = getDataResponse(dataMessage) - if len(dataResponse) > 0 { - clientChan[2] <- dataResponse - } + clientId = 2 case dataMessage = <-clientChan[3]: - dataResponse = getDataResponse(dataMessage) - if len(dataResponse) > 0 { - clientChan[3] <- dataResponse - } + clientId = 3 case dataMessage = <-clientChan[4]: - dataResponse = getDataResponse(dataMessage) - if len(dataResponse) > 0 { - clientChan[4] <- dataResponse - } + clientId = 4 case dataMessage = <-clientChan[5]: - dataResponse = getDataResponse(dataMessage) - if len(dataResponse) > 0 { - clientChan[5] <- dataResponse - } + clientId = 5 case dataMessage = <-clientChan[6]: - dataResponse = getDataResponse(dataMessage) - if len(dataResponse) > 0 { - clientChan[6] <- dataResponse - } + clientId = 6 case dataMessage = <-clientChan[7]: - dataResponse = getDataResponse(dataMessage) - if len(dataResponse) > 0 { - clientChan[7] <- dataResponse - } + clientId = 7 case dataMessage = <-clientChan[8]: - dataResponse = getDataResponse(dataMessage) - if len(dataResponse) > 0 { - clientChan[8] <- dataResponse - } + clientId = 8 case dataMessage = <-clientChan[9]: - dataResponse = getDataResponse(dataMessage) - if len(dataResponse) > 0 { - clientChan[9] <- dataResponse - } + clientId = 9 + } + dataResponse = getDataResponse(dataMessage) + if len(dataResponse) > 0 { + clientChan[clientId] <- dataResponse } } } @@ -117,47 +101,50 @@ func getDataResponse(req []byte) []byte { func getDataResponseDl(req []byte) []byte { // request: uid(4)|messageNo(1)|chunkSize(4)| lastMessage(1)|chunk(N) resp := make([]byte, 4+1+1) // response: uid(4)|messageNo(1)|status(1) - uid := [utils.UIDLEN]byte(req[:4]) + uid := []byte(req[:4]) var messageNo uint8 buf := bytes.NewReader(req[4:5]) err := binary.Read(buf, binary.BigEndian, &messageNo) if err != nil { utils.Error.Println("binary.Read failed for messageNo:", err) + return createDlResponse(uid, byte(0x00), byte(0xFF)) // terminate session error response } var chunkSize uint32 buf = bytes.NewReader(req[5:9]) err = binary.Read(buf, binary.BigEndian, &chunkSize) if err != nil { utils.Error.Println("binary.Read failed for chunkSize:", err) + return createDlResponse(uid, byte(0x00), byte(0xFF)) // terminate session error response } var lastMessage uint8 buf = bytes.NewReader(req[9:10]) err = binary.Read(buf, binary.BigEndian, &lastMessage) if err != nil { - utils.Error.Println("binary.Read failed for chunkSize:", err) + utils.Error.Println("binary.Read failed for lastMessage:", err) + return createDlResponse(uid, byte(0x00), byte(0xFF)) // terminate session error response } chunk := req[10:] cacheIndex := findFileTransferCacheIndex(uid) if cacheIndex != -1 { if uint32(len(chunk)) != chunkSize { - return createDlResponse(req, req[4], byte(0x01)) + return createDlResponse(uid, req[4], byte(0x01)) } n, err := fileTransferCache[cacheIndex].FileDescriptor.Write(chunk) if err != nil { - return createDlResponse(req, req[4], byte(0x01)) + return createDlResponse(uid, req[4], byte(0x01)) } fileTransferCache[cacheIndex].FileOffset += n if lastMessage != 0 { fileTransferCache[cacheIndex].FileDescriptor.Close() if calculateHash(fileTransferCache[cacheIndex].Path + fileTransferCache[cacheIndex].Name) != fileTransferCache[cacheIndex].Hash { - return createDlResponse(req, byte(0x00), byte(0x01)) + return createDlResponse(uid, byte(0x00), byte(0x01)) } fileTransferCache[cacheIndex].Uid = clearUid() // delete cache entry - return createDlResponse(req, req[4], byte(0x00)) + return createDlResponse(uid, req[4], byte(0x00)) } - return createDlResponse(req, req[4], byte(0x00)) - } else { //error response - return createDlResponse(req, byte(0x00), byte(0x01)) + return createDlResponse(uid, req[4], byte(0x00)) + } else { + return createDlResponse(uid, byte(0x00), byte(0xFF)) // terminate session error response } return resp } @@ -169,7 +156,7 @@ func getDataResponseUl(req []byte) []byte { // request: uid(4)|messageNo(1)|sta lastMessage := byte(0x00) chunkSize := make([]byte,4) var chunk []byte - cacheIndex := findFileTransferCacheIndex([utils.UIDLEN]byte(uid)) + cacheIndex := findFileTransferCacheIndex([]byte(uid)) if cacheIndex != -1 { if status == byte(0x00) { var n int @@ -191,31 +178,52 @@ func getDataResponseUl(req []byte) []byte { // request: uid(4)|messageNo(1)|sta err = binary.Write(buf, binary.BigEndian, uint32(n)) if err != nil { utils.Error.Printf("binary.Write failed:%s", err) - return createUlResponse(uid, messageNo, lastMessage, []byte{0,0,0,0}, chunk) + return createUlResponse(uid, messageNo, lastMessage, []byte{0,0,0,0}, nil) } for i := 0; i < 4; i++ { chunkSize[i] = buf.Bytes()[i] } + } else if status == byte(0xFF) { + return createUlResponse(uid, messageNo, lastMessage, []byte{0,0,0,0}, nil) // error } else { - // resend. Not what is done below - return createUlResponse(uid, messageNo, lastMessage, []byte{0,0,0,0}, chunk) + lastMessage, chunkSize, chunk = readChunkData(messageNo) + if len(chunk) > 0 { + return createUlResponse(uid, messageNo, lastMessage, chunkSize, chunk) // resend previous message + } else { + return createUlResponse(uid, messageNo, lastMessage, []byte{0,0,0,0}, nil) //error + } } - } else { //error response - return createUlResponse(uid, messageNo, lastMessage, []byte{0,0,0,0}, chunk) + } else { + return createUlResponse(uid, messageNo, lastMessage, []byte{0,0,0,0}, nil) //error } + writeChunkData(messageNo, lastMessage, chunkSize, chunk) return createUlResponse(uid, messageNo, lastMessage, chunkSize, chunk) } +func readChunkData(messageNo byte) (byte, []byte, []byte) { + if messageNo != chunkDataCache.MessageNo { + return byte(0), nil, nil + } + return chunkDataCache.LastMessage, chunkDataCache.ChunkSize, chunkDataCache.Chunk +} + +func writeChunkData(messageNo byte, lastMessage byte, chunkSize []byte, chunk []byte) { + chunkDataCache.MessageNo = messageNo + chunkDataCache.LastMessage = lastMessage + chunkDataCache.ChunkSize = chunkSize + chunkDataCache.Chunk = chunk +} + func clearUid() [utils.UIDLEN]byte { return [utils.UIDLEN]byte{0} } -func createDlResponse(req []byte, messNo byte, status byte) []byte { // response: uid(4)|messageNo(1)|status(1) +func createDlResponse(uid []byte, messNo byte, status byte) []byte { // response: uid(4)|messageNo(1)|status(1) resp := make([]byte,6) - resp[0] = req[0] - resp[1] = req[1] - resp[2] = req[2] - resp[3] = req[3] + resp[0] = uid[0] + resp[1] = uid[1] + resp[2] = uid[2] + resp[3] = uid[3] resp[4] = messNo resp[5] = status return resp @@ -273,9 +281,9 @@ func getFileSize(fp *os.File) int { return int(fi.Size()) } -func findFileTransferCacheIndex(uid [utils.UIDLEN]byte) int { +func findFileTransferCacheIndex(uid []byte) int { for i := 0; i < FILETRANSFERCACHESIZE; i++ { - if fileTransferCache[i].Uid == uid { + if fileTransferCache[i].Uid == [utils.UIDLEN]byte(uid) { return i } }