Skip to content

Commit

Permalink
fix(rust/hyper/demo): Fix struct mem bug
Browse files Browse the repository at this point in the history
Signed-off-by: hackerchai <[email protected]>
  • Loading branch information
hackerchai committed Aug 12, 2024
1 parent 13340af commit 86856f1
Showing 1 changed file with 69 additions and 113 deletions.
182 changes: 69 additions & 113 deletions rust/hyper/_demo/hyper_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ func onSignal(handle *libuv.Signal, signum c.Int) {
}

func closeWalkCb(handle *libuv.Handle, arg c.Pointer) {
// if handle.IsClosing() == 0 {
// handle.Close(nil)
// }
handle.Close(nil)
c.Printf(c.Str("closeWalkCb called\n"))
if handle.IsClosing() == 0 {
c.Printf(c.Str("Closing handle\n"))
handle.Close(nil)
}
}

func allocBuffer(handle *libuv.Handle, suggestedSize uintptr, buf *libuv.Buf) {
Expand Down Expand Up @@ -91,7 +92,7 @@ func closeConn(handle *libuv.Handle) {
}

func onPoll(handle *libuv.Poll, status c.Int, events c.Int) {
conn := (*ConnData)(unsafe.Pointer((*libuv.Handle)(unsafe.Pointer(handle)).GetData()))
conn := (*ConnData)((*libuv.Handle)(unsafe.Pointer(handle)).GetData())

if status < 0 {
//fmt.Fprintf(os.Stderr, "Poll error: %s\n", libuv.Strerror(libuv.Errno(status)))
Expand All @@ -111,6 +112,7 @@ func onPoll(handle *libuv.Poll, status c.Int, events c.Int) {
}

func updateConnDataRegistrations(conn *ConnData, create bool) bool {
c.Printf(c.Str("updateConnDataRegistrations IsClosing: %d\n"), conn.IsClosing)
events := c.Int(0)
if conn.EventMask&c.Uint(libuv.READABLE) != 0 {
events |= c.Int(libuv.READABLE)
Expand Down Expand Up @@ -202,8 +204,10 @@ func createConnData(client *libuv.Tcp) *ConnData {
return nil
}

(*libuv.Handle)(unsafe.Pointer(&conn.PollHandle)).SetData(unsafe.Pointer(conn))
(*libuv.Handle)(unsafe.Pointer(&conn.Stream)).SetData(unsafe.Pointer(conn))
//(*libuv.Handle)(unsafe.Pointer(&conn.PollHandle)).Data(unsafe.Pointer(conn))
(*libuv.Handle)(unsafe.Pointer(&conn.PollHandle)).Data = unsafe.Pointer(conn)
//(*libuv.Handle)(unsafe.Pointer(&conn.Stream)).SetData(unsafe.Pointer(conn))
conn.Stream.Data = unsafe.Pointer(conn)

if !updateConnDataRegistrations(conn, true) {
(*libuv.Handle)(unsafe.Pointer(&conn.PollHandle)).Close(nil)
Expand All @@ -216,8 +220,10 @@ func createConnData(client *libuv.Tcp) *ConnData {

func freeConnData(userdata c.Pointer) {
conn := (*ConnData)(userdata)
c.Printf(c.Str("freeConnData IsClosing: %d\n"), conn.IsClosing)
if conn != nil && conn.IsClosing == 0 {
conn.IsClosing = 1
c.Printf(c.Str("freeConnData IsClosing: %d\n"), conn.IsClosing)
// We don't immediately close the connection here.
// Instead, we'll let the main loop handle the closure when appropriate.
}
Expand Down Expand Up @@ -292,7 +298,7 @@ func serverCallback(userdata c.Pointer, request *hyper.Request, channel *hyper.R
conn.RequestCount++
// fmt.Printf("Handling request %d on connection from %s:%s\n", conn.RequestCount,
// c.GoString((*c.Char)(&serviceData.Host[0])), c.GoString((*c.Char)(&serviceData.Port[0])))
c.Printf(c.Str("Handling request %d on connection from %s:%s\n"), c.Int(conn.RequestCount),
c.Printf(c.Str("Handling request %d on connection from %s:%s\n"), conn.RequestCount,
(*c.Char)(&serviceData.Host[0]), (*c.Char)(&serviceData.Port[0]))

// fmt.Printf("Received request from %s:%s\n", c.GoString((*c.Char)(&serviceData.Host[0])),
Expand Down Expand Up @@ -392,46 +398,47 @@ func serverCallback(userdata c.Pointer, request *hyper.Request, channel *hyper.R
}

response := hyper.NewResponse()
// if response != nil {
// response.SetStatus(200)
// rspHeaders := response.Headers()
// if rspHeaders != nil {
// rspHeaders.Set((*byte)(unsafe.Pointer(c.Str("Content-Type"))), uintptr(12), (*byte)(unsafe.Pointer(c.Str("text/plain"))), uintptr(10))
// rspHeaders.Set((*byte)(unsafe.Pointer(c.Str("Cache-Control"))), uintptr(13), (*byte)(unsafe.Pointer(c.Str("no-cache"))), uintptr(8))
// } else {
// //fmt.Fprintf(os.Stderr, "Error: Failed to get response headers\n")
// c.Printf(c.Str("Error: Failed to get response headers\n"))
// }

// if methodLen > 0 && c.Strncmp((*c.Char)(unsafe.Pointer(&method[0])), c.Str("GET"), methodLen) == 0 {
// c.Printf(c.Str("Sending GET response\n"))
// body := hyper.NewBody()
// if body != nil {
// body.SetDataFunc(sendEachBodyChunk)
// chunkCount := (*c.Int)(c.Malloc(unsafe.Sizeof(c.Int(0))))
// if chunkCount != nil {
// *chunkCount = 10
// body.SetUserdata(unsafe.Pointer(chunkCount), func(p c.Pointer) { c.Free(p) })
// response.SetBody(body)
// } else {
// //fmt.Fprintf(os.Stderr, "Error: Failed to allocate chunk_count\n")
// c.Printf(c.Str("Error: Failed to allocate chunk_count\n"))
// }
// } else {
// //fmt.Fprintf(os.Stderr, "Error: Failed to create response body\n")
// c.Printf(c.Str("Error: Failed to create response body\n"))
// }
// }

// channel.Send(response)
// } else {
// //fmt.Fprintf(os.Stderr, "Error: Failed to create response\n")
// c.Printf(c.Str("Error: Failed to create response\n"))
// }
if response != nil {
response.SetStatus(200)
rspHeaders := response.Headers()
if rspHeaders != nil {
c.Printf(c.Str("Setting response headers\n"))
hres := rspHeaders.Set((*byte)(unsafe.Pointer(c.Str("Content-Type"))), uintptr(12), (*byte)(unsafe.Pointer(c.Str("text/plain"))), uintptr(10))
if hres != hyper.OK {
c.Printf(c.Str("Error: Failed to set response headers\n"))
}
hres = rspHeaders.Set((*byte)(unsafe.Pointer(c.Str("Cache-Control"))), uintptr(13), (*byte)(unsafe.Pointer(c.Str("no-cache"))), uintptr(8))
if hres != hyper.OK {
c.Printf(c.Str("Error: Failed to set response headers\n"))
}
} else {
//fmt.Fprintf(os.Stderr, "Error: Failed to get response headers\n")
c.Printf(c.Str("Error: Failed to get response headers\n"))
}

if methodLen > 0 && c.Strncmp((*c.Char)(unsafe.Pointer(&method[0])), c.Str("GET"), methodLen) == 0 {
c.Printf(c.Str("Sending GET response\n"))
body := hyper.NewBody()
if body != nil {
body.SetDataFunc(sendEachBodyChunk)
chunkCount := (*c.Int)(c.Malloc(unsafe.Sizeof(c.Int(0))))
if chunkCount != nil {
*chunkCount = 10
body.SetUserdata(unsafe.Pointer(chunkCount), func(p c.Pointer) { c.Free(p) })
response.SetBody(body)
} else {
//fmt.Fprintf(os.Stderr, "Error: Failed to allocate chunk_count\n")
c.Printf(c.Str("Error: Failed to allocate chunk_count\n"))
}
} else {
//fmt.Fprintf(os.Stderr, "Error: Failed to create response body\n")
c.Printf(c.Str("Error: Failed to create response body\n"))
}
}

channel.Send(response)
} else {
//fmt.Fprintf(os.Stderr, "Error: Failed to create response\n")
c.Printf(c.Str("Error: Failed to create response\n"))
}

Expand Down Expand Up @@ -465,11 +472,11 @@ func onNewConnection(serverStream *libuv.Stream, status c.Int) {
if addr.Family == net.AF_INET {
s := (*net.SockaddrIn)(unsafe.Pointer(&addr))
libuv.Ip4Name(s, (*c.Char)(&userdata.Host[0]), unsafe.Sizeof(userdata.Host))
c.Snprintf((*c.Char)(&userdata.Port[0]), unsafe.Sizeof(userdata.Port), c.Str("%d"), Ntohs(s.Port))
c.Snprintf((*c.Char)(&userdata.Port[0]), unsafe.Sizeof(userdata.Port), c.Str("%d"), net.Ntohs(s.Port))
} else if addr.Family == net.AF_INET6 {
s := (*net.SockaddrIn6)(unsafe.Pointer(&addr))
libuv.Ip6Name(s, (*c.Char)(&userdata.Host[0]), unsafe.Sizeof(userdata.Host))
c.Snprintf((*c.Char)(&userdata.Port[0]), unsafe.Sizeof(userdata.Port), c.Str("%d"), Ntohs(s.Port))
c.Snprintf((*c.Char)(&userdata.Port[0]), unsafe.Sizeof(userdata.Port), c.Str("%d"), net.Ntohs(s.Port))
}

// fmt.Printf("New incoming connection from (%s:%s)\n", c.GoString((*c.Char)(&userdata.Host[0])),
Expand Down Expand Up @@ -585,68 +592,31 @@ func main() {
conn := (*ConnData)(taskUserdata)
//fmt.Printf("Connection task completed for request %d\n", conn.RequestCount)
c.Printf(c.Str("Connection task completed for request %d\n"), conn.RequestCount)
c.Printf(c.Str("IsClosing: %d\n"), c.Int(conn.IsClosing))
// if conn.IsClosing == 0 {
// c.Printf(c.Str("Closing connection\n"))
// conn.IsClosing = 1
// if (*libuv.Handle)(unsafe.Pointer(&conn.PollHandle)).IsClosing() != 0 {
// c.Printf(c.Str("Closing poll handle\n"))
// (*libuv.Handle)(unsafe.Pointer(&conn.PollHandle)).Close(onClose)
// }
// if (*libuv.Handle)(unsafe.Pointer(&conn.Stream)).IsClosing() != 0 {
// c.Printf(c.Str("Closing stream\n"))
// (*libuv.Handle)(unsafe.Pointer(&conn.Stream)).Close(onClose)
// }
// }
if (*libuv.Handle)(unsafe.Pointer(&conn.PollHandle)).IsClosing() == 0 {
c.Printf(c.Str("Closing poll handle\n"))
(*libuv.Handle)(unsafe.Pointer(&conn.PollHandle)).Close(onClose)
c.Printf(c.Str("hyper.TaskEmpty IsClosing: %d\n"), conn.IsClosing)
if conn.IsClosing == 0 {
c.Printf(c.Str("Closing connection\n"))
conn.IsClosing = 1
if (*libuv.Handle)(unsafe.Pointer(&conn.PollHandle)).IsClosing() == 0 {
c.Printf(c.Str("Closing poll handle\n"))
(*libuv.Handle)(unsafe.Pointer(&conn.PollHandle)).Close(onClose)
}
if (*libuv.Handle)(unsafe.Pointer(&conn.Stream)).IsClosing() == 0 {
c.Printf(c.Str("Closing stream\n"))
(*libuv.Handle)(unsafe.Pointer(&conn.Stream)).Close(onClose)
}
}
if (*libuv.Handle)(unsafe.Pointer(&conn.Stream)).IsClosing() == 0 {
c.Printf(c.Str("Closing stream\n"))
(*libuv.Handle)(unsafe.Pointer(&conn.Stream)).Close(onClose)
}
}
conn := (*ConnData)(taskUserdata)
if (*libuv.Handle)(unsafe.Pointer(&conn.PollHandle)).IsClosing() == 0 {
c.Printf(c.Str("Closing poll handle\n"))
(*libuv.Handle)(unsafe.Pointer(&conn.PollHandle)).Close(onClose)
}
if (*libuv.Handle)(unsafe.Pointer(&conn.Stream)).IsClosing() == 0 {
c.Printf(c.Str("Closing stream\n"))
(*libuv.Handle)(unsafe.Pointer(&conn.Stream)).Close(onClose)
}
break

case hyper.TaskError:
err := (*hyper.Error)(task.Value())
var errbuf [256]byte
errlen := err.Print(&errbuf[0], unsafe.Sizeof(errbuf))
//fmt.Fprintf(os.Stderr, "Task error: %.*s\n", int(errlen), c.GoString((*c.Char)(unsafe.Pointer(&errbuf[0]))))
//fmt.Fprintf(os.Stderr, c.Str("Task error: %.*s\n"), errlen, (*c.Char)(unsafe.Pointer(&errbuf[0])))
c.Printf(c.Str("Task error: %.*s\n"), errlen, (*c.Char)(unsafe.Pointer(&errbuf[0])))
err.Free()
conn := (*ConnData)(taskUserdata)
c.Printf(c.Str("IsClosing: %d\n"), c.Int(conn.IsClosing))
// if conn.IsClosing == 0 {
// c.Printf(c.Str("Closing connection\n"))
// conn.IsClosing = 1
// if (*libuv.Handle)(unsafe.Pointer(&conn.PollHandle)).IsClosing() != 0 {
// c.Printf(c.Str("Closing poll handle\n"))
// (*libuv.Handle)(unsafe.Pointer(&conn.PollHandle)).Close(onClose)
// }
// if (*libuv.Handle)(unsafe.Pointer(&conn.Stream)).IsClosing() != 0 {
// c.Printf(c.Str("Closing stream\n"))
// (*libuv.Handle)(unsafe.Pointer(&conn.Stream)).Close(onClose)
// }
// }
if (*libuv.Handle)(unsafe.Pointer(&conn.PollHandle)).IsClosing() == 0 {
c.Printf(c.Str("Closing poll handle\n"))
(*libuv.Handle)(unsafe.Pointer(&conn.PollHandle)).Close(onClose)
}
if (*libuv.Handle)(unsafe.Pointer(&conn.Stream)).IsClosing() == 0 {
c.Printf(c.Str("Closing stream\n"))
(*libuv.Handle)(unsafe.Pointer(&conn.Stream)).Close(onClose)
}
//TODO: Debug temporarily

Check warning on line 618 in rust/hyper/_demo/hyper_server/server.go

View check run for this annotation

qiniu-x / note-check

rust/hyper/_demo/hyper_server/server.go#L618

A Note is recommended to use "MARKER(uid): note body" format.
shouldExit = true
break

case hyper.TaskClientConn:
Expand Down Expand Up @@ -708,18 +678,4 @@ func main() {

//fmt.Println("Shutdown complete.")
c.Printf(c.Str("Shutdown complete.\n"))
}

// Ntohs converts a 16-bit integer from network byte order to host byte order.
func Ntohs(x uint16) uint16 {
if isLittleEndian() {
return ((x & 0xFF00) >> 8) | ((x & 0x00FF) << 8)
}
return x
}

// isLittleEndian checks if the host machine is little-endian.
func isLittleEndian() bool {
var i int32 = 0x01020304
return *(*byte)(unsafe.Pointer(&i)) == 0x04
}
}

0 comments on commit 86856f1

Please sign in to comment.