-
Notifications
You must be signed in to change notification settings - Fork 1
/
scbtcp.lua
242 lines (206 loc) · 5.53 KB
/
scbtcp.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
#!/usr/bin/env lua
-- -*-lua-*-
--
-- $Id: scbtcp.lua $
--
-- Author: Markus Stenberg <markus [email protected]>
--
-- Copyright (c) 2013 cisco Systems, Inc.
--
-- Created: Sun Jan 27 11:15:32 2013 mstenber
-- Last modified: Sat Oct 19 01:48:06 2013 mstenber
-- Edit time: 27 min
--
-- (Code has been moved here from scb.lua)
-- This module contains TCP code related to simple callback stuff
-- (scb)
local scb = require 'scb'
local mst = require 'mst'
local socket = require 'socket'
local ERR_CONNECTION_REFUSED = "connection refused"
local ERR_TIMEOUT = 'timeout'
local ERR_ALREADY_CONNECTED = 'already connected'
module(...)
local _base = scb.Scb
--- ScbIO (used for connecting + listen->accepted)
ScbIO = _base:new_subclass{listen_read=true, listen_write=true,
class="ScbIO"}
function ScbIO:init()
-- write queue
self.wq = mst.array:new{}
-- the rest is up to baseclass..
_base.init(self)
end
function ScbIO:handle_io_read()
self:d('handle_io_read')
local r, error, partial = self.s:receive(2^10)
local s = r or partial
if not s or #s == 0
then
self:d('got read', s, #s, error)
if error == 'closed'
then
-- ?
else
-- ??
end
self:call_callback_once('close_callback')
self:done()
else
self.callback(s)
end
end
function ScbIO:handle_io_write()
while true
do
-- it's writable.. nothing in the queue?
if #self.wq == 0
then
self:d("handle_io_write - queue empty")
self.s_w:stop()
self:d('done write-stop')
return
end
-- we have _something_. let's try to write the first one..
self:d("handle_io_write - sending")
local s, i = self.wq[1]
i = i or 1
local r, err = self.s:send(s, i)
self:d("handle_io_write - send done", r, err)
-- todo - handle writing errors here
if not r -- and err == 'closed'
then
self:call_callback_once('close_callback')
self:done()
return
end
-- update the queue
if r == #s
then
self:d("handle_io_write removing from queue")
self.wq:remove_index(1)
else
-- r can't be >#s, or we have oddity on our hands
self:a(r<#s, "too many bytes written")
-- just wait for the next writable callback
self.wq[1] = {s, r+1}
return
end
end
end
function ScbIO:write(s)
self:d('write', #s)
self.wq:insert(s)
self.s_w:start()
end
--- ScbListen
local ScbListen = _base:new_subclass{listen_read=true, listen_write=false, class="ScbListen"}
function ScbListen:handle_io_read()
self:d(' --accept--')
local c = self.s:accept()
self:d(' --accept--', c, c:getfd())
self:a(self.callback, "no callback in handle_io_read")
if c
then
c:settimeout(0)
local evio = ScbIO:new{s=c, p=self}
self:a(evio.listen_read, 'listen_read disappeared')
self:a(evio.listen_write, 'listen_write disappeared')
evio:start()
self.callback(evio)
end
end
--- ScbConnect
local ScbConnect = _base:new_subclass{listen_write=true, class="ScbConnect"}
function ScbConnect:handle_io_write()
self:d('handle_io_write')
local r, err = self.s:connect(self.ip, self.port)
self:d('connect result', r, err)
self:a(self.callback, 'missing callback from ScbConnect')
if err == ERR_CONNECTION_REFUSED
then
self.callback(nil, err)
return
end
if err == ERR_TIMEOUT
then
return
end
-- first off, we're done! so get rid of the filehandle + mark us done
self:stop()
-- then, forward the freshly wrapped IO socket onward
-- (someone needs to set up callback(s))
local evio = wrap_socket(self.evio_d)
self.callback(evio)
-- we're ready to be cleaned (as if we were anywhere anyway, after :stop())
self.s = nil
self:done()
end
function wrap_socket(d)
mst.check_parameters("scb:wrap_socket", d, {"s"}, 3)
local s = d.s
s:settimeout(0)
local evio = ScbIO:new(d)
mst.a(evio.listen_read and evio.listen_write)
evio:start()
return evio
end
function create_socket(d)
-- no mandatory parameters really, can have ip if necessary
--mst.check_parameters("scb:create_listener", d, {"host"}, 3)
local s
if scb.parameters_ipv6ish(d)
then
mst.d('creating tcp6 socket for', d)
s = socket.tcp6()
else
mst.d('creating tcp socket for', d)
s = socket.tcp()
end
s:settimeout(0)
s:setoption('reuseaddr', true)
s:setoption('tcp-nodelay', true)
return s
end
function create_listener(d)
mst.check_parameters("scb:create_listener", d, {"ip", "port"}, 3)
local s = create_socket(d)
local r, err = s:bind(d.ip, d.port)
if r
then
s:listen(10)
return s
end
return nil, err
end
function new_listener(d)
mst.check_parameters("scb:new_listener", d, {"ip", "port", "callback"}, 3)
local s, err = create_listener(d)
if s
then
d.s = s
l = ScbListen:new(d)
l:start()
return l
end
return nil, err
end
function new_connect(d)
-- ip, port, connected_callback, callback
mst.check_parameters("scb:new_connect", d, {"ip", "port", "callback"}, 3)
local s = create_socket(d)
local r, e = s:connect(d.ip, d.port)
--print('new_connect', r, e)
d.s = s
if r == 1
then
local evio = wrap_socket(d)
connected_callback(evio)
return evio
end
-- apparently connect is still pending. create connect
d.evio_d = mst.table_copy(d)
c = ScbConnect:new(d)
c:start()
return c
end