Skip to content

Commit

Permalink
Merge pull request #80 from UlfBj/ftupdate
Browse files Browse the repository at this point in the history
Filetransfer refactoring
  • Loading branch information
UlfBj authored Jan 17, 2025
2 parents fbb02de + e726ebd commit 317ccfa
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 69 deletions.
16 changes: 13 additions & 3 deletions client/client-1.0/filetransfer_client/filetransfer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,7 @@ func downloadFile(dlFile string, numOfChunks int, ctrlChannel chan string, dataC
fileSize := getFileSize(file)
readBuffer := make([]byte, fileSize/numOfChunks+1)
uid := "1d878212" //TODO: random generation
ctrlChannel <- `{"action": "set","path": "Vehicle.DownloadFile","value":{"name": "` + dlFile + `", "hash":"` + hash + `","uid":"` + uid + `"},` +
` "ts": "` + utils.GetRfcTime() + `"}`
ctrlChannel <- `{"action": "set","path": "Vehicle.DownloadFile","value":{"name": "` + dlFile + `", "hash":"` + hash + `","uid":"` + uid + `"}}`
ctrlResp := <-ctrlChannel
if strings.Contains(ctrlResp, "error") {
utils.Error.Printf("Server responded with error message=%s", ctrlResp)
Expand All @@ -243,6 +242,10 @@ func downloadFile(dlFile string, numOfChunks int, ctrlChannel chan string, dataC
mNo, status := decodeDlResponse(dataResp)
if status == byte(0x00) && mNo == messageNo {
messageNo += 1
} else if status == byte(0xFF) {
utils.Error.Printf("Session terminated due to server status response=0xFF")
sessionDone = true // terminate session
continue
} else {
messageNo = mNo
// rewind file
Expand Down Expand Up @@ -315,7 +318,14 @@ func equalByteArray(array1 []byte, array2 []byte) bool {

func getFileDescriptorData(value interface{}) (string, string, string) { // {"name": "xxx","hash": "yyy","uid": "zzz"}
var name, hash, uid string
for k, v := range value.(map[string]interface{}) {
var valueMap map[string]interface{}
switch value.(type) {
case string:
utils.MapRequest(value.(string), &valueMap)
case map[string]interface{}:
valueMap = value.(map[string]interface{})
}
for k, v := range valueMap {
switch vv := v.(type) {
case string:
// utils.Info.Println(k, "is string", vv)
Expand Down
6 changes: 2 additions & 4 deletions server/vissv2server/vissv2server.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,9 @@ func issueServiceRequest(requestMap map[string]interface{}, tDChanIndex int, sDC
}

func initiateFileTransfer(requestMap map[string]interface{}, nodeType utils.NodeTypes_t, path string) map[string]interface{} {
utils.Info.Printf("initiateFileTransfer: requestMap[action]=%s, nodeType=%d", requestMap["action"], nodeType)
//utils.Info.Printf("initiateFileTransfer: requestMap[action]=%s, nodeType=%d", requestMap["action"], nodeType)
var ftInitData utils.FileTransferCache
var responseMap map[string]interface{}
var responseMap = map[string]interface{}{}
if requestMap["action"] == "set" && nodeType == utils.ACTUATOR { // download
var uidString string
ftInitData.UploadTransfer = false
Expand All @@ -584,15 +584,13 @@ utils.Info.Printf("initiateFileTransfer: requestMap[action]=%s, nodeType=%d", re
responseMap["action"] = "set"
responseMap["ts"] = utils.GetRfcTime()
return responseMap
// return `{"RouterId": "` + requestMap["RouterId"].(string) + `"action": "set", "ts": "` + utils.GetRfcTime() + `"}`
} else {
utils.SetErrorResponse(requestMap, errorResponseMap, 7, "") //service_unavailable
return errorResponseMap
}

} else if requestMap["action"] == "get" && nodeType == utils.SENSOR { //upload
if requestMap["path"] != nil {
utils.Info.Printf("initiateFileTransfer: upload!!!")
path := requestMap["path"].(string)
ftInitData.UploadTransfer = true
ftInitData.Path, ftInitData.Name = getInternalFileName(path)
Expand Down
132 changes: 70 additions & 62 deletions server/vissv2server/wsMgrFT/wsMgrFT.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ var clientIndex int
var fileTransferCache []utils.FileTransferCache
const FILETRANSFERCACHESIZE = 10

type ChunkDataCache struct {
MessageNo byte
LastMessage byte
ChunkSize []byte
Chunk []byte
}
var chunkDataCache = ChunkDataCache{}

func WsMgrFTInit(ftChannel chan utils.FileTransferCache) {
var clientRequest utils.FileTransferCache
var dataMessage, dataResponse []byte
Expand All @@ -48,61 +56,37 @@ func WsMgrFTInit(ftChannel chan utils.FileTransferCache) {
}
go initDataSessions(clientChan)

var clientId int
for {
select {
case clientRequest = <-ftChannel:
clientRequest.Status = initFtSession(clientRequest)
ftChannel <-clientRequest
continue
case dataMessage = <-clientChan[0]:
dataResponse = getDataResponse(dataMessage)
if len(dataResponse) > 0 {
clientChan[0] <- dataResponse
}
clientId = 0
case dataMessage = <-clientChan[1]:
dataResponse = getDataResponse(dataMessage)
if len(dataResponse) > 0 {
clientChan[1] <- dataResponse
}
clientId = 1
case dataMessage = <-clientChan[2]:
dataResponse = getDataResponse(dataMessage)
if len(dataResponse) > 0 {
clientChan[2] <- dataResponse
}
clientId = 2
case dataMessage = <-clientChan[3]:
dataResponse = getDataResponse(dataMessage)
if len(dataResponse) > 0 {
clientChan[3] <- dataResponse
}
clientId = 3
case dataMessage = <-clientChan[4]:
dataResponse = getDataResponse(dataMessage)
if len(dataResponse) > 0 {
clientChan[4] <- dataResponse
}
clientId = 4
case dataMessage = <-clientChan[5]:
dataResponse = getDataResponse(dataMessage)
if len(dataResponse) > 0 {
clientChan[5] <- dataResponse
}
clientId = 5
case dataMessage = <-clientChan[6]:
dataResponse = getDataResponse(dataMessage)
if len(dataResponse) > 0 {
clientChan[6] <- dataResponse
}
clientId = 6
case dataMessage = <-clientChan[7]:
dataResponse = getDataResponse(dataMessage)
if len(dataResponse) > 0 {
clientChan[7] <- dataResponse
}
clientId = 7
case dataMessage = <-clientChan[8]:
dataResponse = getDataResponse(dataMessage)
if len(dataResponse) > 0 {
clientChan[8] <- dataResponse
}
clientId = 8
case dataMessage = <-clientChan[9]:
dataResponse = getDataResponse(dataMessage)
if len(dataResponse) > 0 {
clientChan[9] <- dataResponse
}
clientId = 9
}
dataResponse = getDataResponse(dataMessage)
if len(dataResponse) > 0 {
clientChan[clientId] <- dataResponse
}
}
}
Expand All @@ -117,47 +101,50 @@ func getDataResponse(req []byte) []byte {

func getDataResponseDl(req []byte) []byte { // request: uid(4)|messageNo(1)|chunkSize(4)| lastMessage(1)|chunk(N)
resp := make([]byte, 4+1+1) // response: uid(4)|messageNo(1)|status(1)
uid := [utils.UIDLEN]byte(req[:4])
uid := []byte(req[:4])
var messageNo uint8
buf := bytes.NewReader(req[4:5])
err := binary.Read(buf, binary.BigEndian, &messageNo)
if err != nil {
utils.Error.Println("binary.Read failed for messageNo:", err)
return createDlResponse(uid, byte(0x00), byte(0xFF)) // terminate session error response
}
var chunkSize uint32
buf = bytes.NewReader(req[5:9])
err = binary.Read(buf, binary.BigEndian, &chunkSize)
if err != nil {
utils.Error.Println("binary.Read failed for chunkSize:", err)
return createDlResponse(uid, byte(0x00), byte(0xFF)) // terminate session error response
}
var lastMessage uint8
buf = bytes.NewReader(req[9:10])
err = binary.Read(buf, binary.BigEndian, &lastMessage)
if err != nil {
utils.Error.Println("binary.Read failed for chunkSize:", err)
utils.Error.Println("binary.Read failed for lastMessage:", err)
return createDlResponse(uid, byte(0x00), byte(0xFF)) // terminate session error response
}
chunk := req[10:]
cacheIndex := findFileTransferCacheIndex(uid)
if cacheIndex != -1 {
if uint32(len(chunk)) != chunkSize {
return createDlResponse(req, req[4], byte(0x01))
return createDlResponse(uid, req[4], byte(0x01))
}
n, err := fileTransferCache[cacheIndex].FileDescriptor.Write(chunk)
if err != nil {
return createDlResponse(req, req[4], byte(0x01))
return createDlResponse(uid, req[4], byte(0x01))
}
fileTransferCache[cacheIndex].FileOffset += n
if lastMessage != 0 {
fileTransferCache[cacheIndex].FileDescriptor.Close()
if calculateHash(fileTransferCache[cacheIndex].Path + fileTransferCache[cacheIndex].Name) != fileTransferCache[cacheIndex].Hash {
return createDlResponse(req, byte(0x00), byte(0x01))
return createDlResponse(uid, byte(0x00), byte(0x01))
}
fileTransferCache[cacheIndex].Uid = clearUid() // delete cache entry
return createDlResponse(req, req[4], byte(0x00))
return createDlResponse(uid, req[4], byte(0x00))
}
return createDlResponse(req, req[4], byte(0x00))
} else { //error response
return createDlResponse(req, byte(0x00), byte(0x01))
return createDlResponse(uid, req[4], byte(0x00))
} else {
return createDlResponse(uid, byte(0x00), byte(0xFF)) // terminate session error response
}
return resp
}
Expand All @@ -169,7 +156,7 @@ func getDataResponseUl(req []byte) []byte { // request: uid(4)|messageNo(1)|sta
lastMessage := byte(0x00)
chunkSize := make([]byte,4)
var chunk []byte
cacheIndex := findFileTransferCacheIndex([utils.UIDLEN]byte(uid))
cacheIndex := findFileTransferCacheIndex([]byte(uid))
if cacheIndex != -1 {
if status == byte(0x00) {
var n int
Expand All @@ -191,31 +178,52 @@ func getDataResponseUl(req []byte) []byte { // request: uid(4)|messageNo(1)|sta
err = binary.Write(buf, binary.BigEndian, uint32(n))
if err != nil {
utils.Error.Printf("binary.Write failed:%s", err)
return createUlResponse(uid, messageNo, lastMessage, []byte{0,0,0,0}, chunk)
return createUlResponse(uid, messageNo, lastMessage, []byte{0,0,0,0}, nil)
}
for i := 0; i < 4; i++ {
chunkSize[i] = buf.Bytes()[i]
}
} else if status == byte(0xFF) {
return createUlResponse(uid, messageNo, lastMessage, []byte{0,0,0,0}, nil) // error
} else {
// resend. Not what is done below
return createUlResponse(uid, messageNo, lastMessage, []byte{0,0,0,0}, chunk)
lastMessage, chunkSize, chunk = readChunkData(messageNo)
if len(chunk) > 0 {
return createUlResponse(uid, messageNo, lastMessage, chunkSize, chunk) // resend previous message
} else {
return createUlResponse(uid, messageNo, lastMessage, []byte{0,0,0,0}, nil) //error
}
}
} else { //error response
return createUlResponse(uid, messageNo, lastMessage, []byte{0,0,0,0}, chunk)
} else {
return createUlResponse(uid, messageNo, lastMessage, []byte{0,0,0,0}, nil) //error
}
writeChunkData(messageNo, lastMessage, chunkSize, chunk)
return createUlResponse(uid, messageNo, lastMessage, chunkSize, chunk)
}

func readChunkData(messageNo byte) (byte, []byte, []byte) {
if messageNo != chunkDataCache.MessageNo {
return byte(0), nil, nil
}
return chunkDataCache.LastMessage, chunkDataCache.ChunkSize, chunkDataCache.Chunk
}

func writeChunkData(messageNo byte, lastMessage byte, chunkSize []byte, chunk []byte) {
chunkDataCache.MessageNo = messageNo
chunkDataCache.LastMessage = lastMessage
chunkDataCache.ChunkSize = chunkSize
chunkDataCache.Chunk = chunk
}

func clearUid() [utils.UIDLEN]byte {
return [utils.UIDLEN]byte{0}
}

func createDlResponse(req []byte, messNo byte, status byte) []byte { // response: uid(4)|messageNo(1)|status(1)
func createDlResponse(uid []byte, messNo byte, status byte) []byte { // response: uid(4)|messageNo(1)|status(1)
resp := make([]byte,6)
resp[0] = req[0]
resp[1] = req[1]
resp[2] = req[2]
resp[3] = req[3]
resp[0] = uid[0]
resp[1] = uid[1]
resp[2] = uid[2]
resp[3] = uid[3]
resp[4] = messNo
resp[5] = status
return resp
Expand Down Expand Up @@ -273,9 +281,9 @@ func getFileSize(fp *os.File) int {
return int(fi.Size())
}

func findFileTransferCacheIndex(uid [utils.UIDLEN]byte) int {
func findFileTransferCacheIndex(uid []byte) int {
for i := 0; i < FILETRANSFERCACHESIZE; i++ {
if fileTransferCache[i].Uid == uid {
if fileTransferCache[i].Uid == [utils.UIDLEN]byte(uid) {
return i
}
}
Expand Down

0 comments on commit 317ccfa

Please sign in to comment.