forked from zealic/go2node
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchannel.go
152 lines (134 loc) · 3.12 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package go2node
import (
"encoding/json"
"os"
"os/exec"
"github.com/zealic/go2node/ipc"
)
// NodeChannel node ipc channel
type NodeChannel interface {
// read a node message
Read() (*NodeMessage, error)
// write a node message
Write(*NodeMessage) error
}
type nodeChannel struct {
reader chan *NodeMessage
writer chan *NodeMessage
ipcChannel ipc.Channel
queue []*NodeMessage
}
type internalNodeMessage struct {
Cmd string `json:"cmd"`
Type string `json:"type"`
Msg json.RawMessage `json:"msg,omitempty"`
}
const nodeChannelFD = "NODE_CHANNEL_FD"
const nodeChannelDelim = '\n'
// ExecNode execute new nodejs child process with Node ipc channel
func ExecNode(cmd *exec.Cmd) (NodeChannel, error) {
ipcChannel, e := ipc.Exec(cmd, nodeChannelFD)
if e != nil {
return nil, e
}
return newNodeChannel(ipcChannel)
}
func newNodeChannel(ipc ipc.Channel) (NodeChannel, error) {
// Handle message
readChan := make(chan *NodeMessage, 1)
writeChan := make(chan *NodeMessage, 1)
channel := &nodeChannel{
reader: readChan,
writer: writeChan,
ipcChannel: ipc,
queue: []*NodeMessage{}}
return channel, nil
}
func (c *nodeChannel) Read() (*NodeMessage, error) {
for {
ipcMsg, e := c.ipcChannel.ReadMessage(nodeChannelDelim)
if e != nil {
return nil, e
}
// Handle internal message
intMsg := new(internalNodeMessage)
e = json.Unmarshal(ipcMsg.Data, intMsg)
if e != nil {
return nil, e
}
msg, err := c.handleInternalMsg(ipcMsg, intMsg)
// Confirm ACK and NACK, read next message
if msg == nil && err == nil {
continue
}
return msg, err
}
}
func (c *nodeChannel) handleInternalMsg(
ipcMsg *ipc.Message,
intMsg *internalNodeMessage) (*NodeMessage, error) {
var err error
switch intMsg.Cmd {
case "NODE_HANDLE":
err = c.ipcChannel.WriteMessage(&ipc.Message{
Data: []byte(`{"cmd":"NODE_HANDLE_ACK"}`),
}, '\n')
if err != nil {
return nil, err
}
return &NodeMessage{
Message: intMsg.Msg,
Handle: ipcMsg.Files[0],
}, nil
case "NODE_HANDLE_NACK":
queue := c.queue
c.queue = []*NodeMessage{}
for _, m := range queue {
err := c.Write(&NodeMessage{
Message: m.Message,
Handle: m.Handle,
nack: true,
})
if err != nil {
return nil, err
}
}
case "NODE_HANDLE_ACK":
c.queue = []*NodeMessage{}
default:
return normNodeMessage(ipcMsg), nil
}
// ACK and NACK
return nil, nil
}
func (c *nodeChannel) Write(msg *NodeMessage) error {
var ipcMsg *ipc.Message
if msg.Handle == nil { // Normal message
ipcMsg = &ipc.Message{
Data: []byte(msg.Message),
Files: []*os.File{},
}
} else {
// Default use naked message
// NACK message will be naked too
ipcMsg = &ipc.Message{
Data: []byte(msg.Message),
Files: []*os.File{msg.Handle},
}
// Send raw message
if !msg.nack {
c.queue = append(c.queue, msg)
intMsg := &internalNodeMessage{
Cmd: "NODE_HANDLE",
Type: "net.Native",
Msg: json.RawMessage(msg.Message),
}
data, e := json.Marshal(intMsg)
if e != nil {
return e
}
ipcMsg.Data = data
}
}
return c.ipcChannel.WriteMessage(ipcMsg, nodeChannelDelim)
}