diff --git a/example/main.pony b/example/main.pony index 8db4527..a404c41 100644 --- a/example/main.pony +++ b/example/main.pony @@ -37,8 +37,14 @@ class BlogEntryRecordNotify is FetchNotify let view: BlogEntriesView tag new iso create(v: BlogEntriesView tag) => view = v fun ref descirption(desc: RowDescription) => None - fun size(): USize => 10000 + fun size(): USize => 10 + fun ref batch(r: Array[Record val] val, next: FetchNotifyNext val) => + Debug.out("Batch") + if r.size() == size() then + next(None) + end fun ref record(r: Record val) => + Debug.out(".") try let e = recover val BlogEntry( r(0) as I32, @@ -46,7 +52,7 @@ class BlogEntryRecordNotify is FetchNotify /*r(1) as I32,*/ /*r(2) as I32*/ ) end - Debug.out(e.string()) + // Debug.out(e.string()) /*(entries as Array[BlogEntry val] trn).push(e)*/ end @@ -62,6 +68,17 @@ class UserRecordNotify is FetchNotify let view: BlogEntriesView tag new create(v: BlogEntriesView tag) => view = v fun ref descirption(desc: RowDescription) => None + + fun ref batch(b: Array[Record val] val, next: FetchNotifyNext val) => + Debug(b.size()) + for r in b.values() do + try + view.user(recover User(r("id") as I32) end) + else + Debug.out("Error") + end + end + fun ref record(r: Record val) => try view.user(recover User(r("id") as I32) end) @@ -76,9 +93,10 @@ actor BlogEntriesView be fetch_entries() => try + Debug("fetch_entries") (_conn as Connection).fetch( /*"SELECT 1 as user_id, 2, 3 UNION ALL SELECT 4 as user_id, 5, 6 UNION ALL SELECT 7 as user_id, 8, 9",*/ - "SELECT generate_series(0,1000000)", + "SELECT generate_series(0,100)", recover BlogEntryRecordNotify(this) end) end @@ -91,6 +109,7 @@ actor BlogEntriesView be user(u: User iso) => _user = recover val consume u end + Debug.out("###") fetch_entries() be render(entries': Array[BlogEntry val] val) => @@ -121,14 +140,14 @@ actor Main password=EnvPasswordProvider(env), database="macflytest") let that = recover tag this end - session.execute("SELECT generate_series(0,10)", + """ + session.execute("SELECT generate_series(0,1)", recover val lambda(r: Rows val)(that) => that.raw_count(r) None end end) - """ session.execute("SELECT 42, 24 as foo;;", recover val @@ -145,12 +164,12 @@ actor Main end end, recover val [as PGValue: I32(70000), I32(-100000)] end) + """ let p = session.connect(recover val lambda(c: Connection tag) => BlogEntriesView(c) end end) - """ be raw_count(rows: Rows val) => _env.out.print(rows.size().string()) diff --git a/pg/codec/registry.pony b/pg/codec/registry.pony index 1af9713..61ea7fc 100644 --- a/pg/codec/registry.pony +++ b/pg/codec/registry.pony @@ -42,26 +42,37 @@ primitive TypeOids end primitive Decode - fun apply(type_oid: I32, value: Array[U8] val, 0): PGValue ? => - DecodeText(type_oid, value) - fun apply(type_oid: I32, value: Array[U8] val, 1): PGValue ? => - DecodeBinary(type_oid, value) - fun apply(type_oid: I32, value: Array[U8] val, format: I16): PGValue ? => error + fun apply(type_oid: I32, value: Array[U8] val, format: I16): PGValue ? => + if format == 0 then + DecodeText(type_oid, value) + else if format == 1 then + DecodeBinary(type_oid, value) + else + Debug.out("Unknown fromat" + format.string()) + error + end end primitive DecodeText - fun apply(23, value: Array[U8] val): I32 ? => - String.from_array(value).i32() - fun apply(type_oid: I32, value: Array[U8] val) ? => Debug.out("Unknown type OID: " + type_oid.string()); error + fun apply(type_oid: I32, value: Array[U8] val): PGValue ? => + match type_oid + | 23 => String.from_array(value).i32() + else + Debug.out("Unknown type OID: " + type_oid.string()); error + end primitive DecodeBinary - fun apply(23, value: Array[U8] val): I32 ? => - var result = I32(0) - for i in value.values() do - result = (result << 8) + i.i32() + fun apply(type_oid: I32, value: Array[U8] val): PGValue ? => + match type_oid + | 23 => + var result = I32(0) + for i in value.values() do + result = (result << 8) + i.i32() + end + result + else + Debug.out("Unknown type OID: " + type_oid.string()); error end - result - fun apply(type_oid: I32, value: Array[U8] val) ? => Debug.out("Unknown type OID: " + type_oid.string()); error primitive EncodeBinary fun apply(param: I32, writer: Writer) ? => diff --git a/pg/connection.pony b/pg/connection.pony index fc7bd07..ee53126 100644 --- a/pg/connection.pony +++ b/pg/connection.pony @@ -2,9 +2,13 @@ use "debug" use "pg/connection" use "pg/introspect" + +type FetchNotifyNext is {((FetchNotify iso | None))} + interface FetchNotify fun ref descirption(desc: RowDescription) => None fun ref record(r: Record val) => None + fun ref batch(r: Array[Record val] val, next: FetchNotifyNext val) => None fun ref stop() => None fun size(): USize => 30 diff --git a/pg/connection/conversation.pony b/pg/connection/conversation.pony index 8acb8d2..a5d2daf 100644 --- a/pg/connection/conversation.pony +++ b/pg/connection/conversation.pony @@ -84,42 +84,135 @@ type Sending is _Sending val type Paused is _Paused val type Suspended is _Suspended val -actor RecordIterator - let _notify: FetchNotify ref - let _conversation: FetchConversation tag +actor FetchConversation is Conversation + let query: String val + let params: Array[PGValue] val + let _conn: BEConnection tag + var _rows: (Rows val | Rows trn ) = recover trn Rows end + var _tuple_desc: (TupleDescription val | None) = None + var _buffer: (Array[Record val] trn | Array[Record val] val) + let _buffers: Array[Array[Record val] val] = Array[Array[Record val] val] + var _notify: (FetchNotify iso | None) + let _size: USize - new create(n: FetchNotify iso, c: FetchConversation) => + new create(c: BEConnection tag, q: String, + n: FetchNotify iso, p: Array[PGValue] val) => + query = q + params = p + _conn = c + _size = n.size() _notify = consume n - _conversation = c - _conversation.start() - - be batch(b: Array[Record val] val) => - Debug.out(b.size()) - var s = b.size() - for r in b.values() do - _notify.record(r) - s = s-1 - /*if s == 0 then _conversation.start() end*/ + _buffer = recover trn Array[Record val] end + + be _batch(b: BatchRowMessage val) => + Debug.out(query) + try + for m in b.rows.values() do + let record = recover val Record(_tuple_desc as TupleDescription val, m.fields) end + (_buffer as Array[Record val] trn).push(record) + if _buffer.size() == _size then + _do_send() + end + end + else + Debug.out("can't create and push record") end - _conversation.start() - be record(r: Record val) => - Debug.out("record") - _notify.record(r) - _conversation.start() + be _set_notifier(fn: (FetchNotify iso | None)) => + match consume fn + | let f: FetchNotify iso => _notify = consume f + end - be stop() => _notify.stop() + be _next() => + _execute() + be _stop() => + try (_notify as FetchNotify iso).stop() end + + be _send() => + _do_send() -actor FetchConversation is Conversation + fun ref _do_send() => + Debug.out("send") + _buffer = recover val _buffer end + let b = _buffer = recover trn Array[Record val] end + try + let that = recover tag this end + (_notify as FetchNotify iso).batch(b, recover val + lambda(fn: (FetchNotify iso | None)=None) (that) => + that._set_notifier(consume fn) + that._next() + end + end) + else + _buffers.push(b) + end + + be message(m: ServerMessage val)=> + match m + | let r: ParseCompleteMessage val => None //_bind() + | let r: CloseCompleteMessage val => _sync() + | let r: BindCompleteMessage val => None //_describe() + | let r: ReadyForQueryMessage val => _conn.next() + | let r: BatchRowMessage val => + None + //for row' in r.rows.values() do + // row(row') + //end + _batch(r) + | let r: RowDescriptionMessage val => + _tuple_desc = r.tuple_desc + _execute() + | let r: EmptyQueryResponse val => Debug.out("Empty Query") + | let r: CommandCompleteMessage val => + _send() + _stop() + _close() + | let r: PortalSuspendedMessage val => None + else + _conn.handle_message(m) + end + + be log(msg: String) => _conn.log(msg) + + fun _sync() => + _conn.writev(recover val SyncMessage.done() end) + + fun _flush() => + _conn.writev(recover val FlushMessage.done() end) + + be apply(c: BEConnection tag) => + c.writev(recover val ParseMessage(query, "", TypeOids(params)).done() end) + _bind() + _describe() + _flush() + + be _bind() => + _conn.writev(recover val BindMessage("", "", params).done() end) + _flush() + + be _execute() => + /*Debug.out("execute")*/ + _conn.writev(recover val ExecuteMessage("", _size).done() end) + _flush() + + be _describe() => + _conn.writev(recover val DescribeMessage('P', "").done() end) + _flush() + + be _close() => + _conn.writev(recover val CloseMessage('P', "").done() end) + _flush() + +actor FetchConversation2 is Conversation let query: String val let params: Array[PGValue] val let _conn: BEConnection tag var _rows: (Rows val | Rows trn ) = recover trn Rows end var _tuple_desc: (TupleDescription val | None) = None var _buffer: (Array[Record val] trn | Array[Record val] val) - var status: FetchStatus val= Paused - let _iterator: RecordIterator tag + var status: FetchStatus val= Suspended + let _notify: FetchNotify iso let _size: USize new create(c: BEConnection tag, q: String, @@ -127,10 +220,9 @@ actor FetchConversation is Conversation query = q params = p _conn = c - let notify = consume n - _size = notify.size() + _size = n.size() + _notify = consume n _buffer = recover trn Array[Record val] end - _iterator = RecordIterator(consume notify, this) be start() => if status is Suspended then _execute() end @@ -139,7 +231,7 @@ actor FetchConversation is Conversation _buffer = recover val _buffer end let b = _buffer = recover trn Array[Record val] end status = Paused - _iterator.batch(b) + //_notify.batch(b) end status = Sending @@ -155,11 +247,12 @@ actor FetchConversation is Conversation try let record = recover val Record(_tuple_desc as TupleDescription val, m.fields) end if status is Sending then - /*Debug.out("Sending")*/ + Debug.out("Sending") status = Paused - _iterator.record(record) + _notify.record(record) + start() else - /*Debug("Buffer")*/ + Debug("Buffer") (_buffer as Array[Record val] trn).push(record) end end @@ -177,15 +270,16 @@ actor FetchConversation is Conversation | let r: BindCompleteMessage val => None //_describe() | let r: ReadyForQueryMessage val => _conn.next() | let r: BatchRowMessage val => - for row' in r.rows.values() do - row(row') - end - //batch(r.rows) + None + //for row' in r.rows.values() do + // row(row') + //end + batch(r.rows) | let r: RowDescriptionMessage val => _tuple_desc = r.tuple_desc - _execute() + start() | let r: EmptyQueryResponse val => Debug.out("Empty Query") - | let r: CommandCompleteMessage val => _iterator.stop(); _close() + | let r: CommandCompleteMessage val => _close() | let r: PortalSuspendedMessage val => match status | Sending => Debug.out("continue");_execute() | Paused => Debug.out("suspend");status = Suspended @@ -318,6 +412,7 @@ actor QueryConversation is Conversation be call_back() => // TODO; don't fail silently + Debug.out("coucou") try _rows = recover val _rows as Rows trn end _handler(_rows as Rows val) @@ -330,6 +425,15 @@ actor QueryConversation is Conversation Debug.out(res(0)) end + be batch(r: BatchRowMessage val) => + for row' in r.rows.values() do + try + let res = recover val Record(_tuple_desc as TupleDescription val, row'.fields) end + (_rows as Rows trn).push(res) + Debug.out(res(0)) + end + end + be message(m: ServerMessage val)=> match m | let r: EmptyQueryResponse val => Debug.out("Empty Query") @@ -338,9 +442,7 @@ actor QueryConversation is Conversation | let r: RowDescriptionMessage val => _tuple_desc = r.tuple_desc | let r: BatchRowMessage val => Debug.out("Batch: " + r.rows.size().string() + " rows") - for row' in r.rows.values() do - row(row') - end + batch(r) else _conn.handle_message(m) end diff --git a/pg/connection/listener.pony b/pg/connection/listener.pony index d13965c..2ed240d 100644 --- a/pg/connection/listener.pony +++ b/pg/connection/listener.pony @@ -1,6 +1,7 @@ use "buffered" use "collections" use "debug" +use "net" use "pg/protocol" use "pg/introspect" @@ -15,21 +16,26 @@ class PGParseError is ParseEvent msg = msg' -actor Listener - let _conn: BEConnection tag +class PGNotify is TCPConnectionNotify + + let _conn: _Connection tag var r: Reader iso = Reader // current reader var _ctype: U8 = 0 // current type (keep it if the data is chuncked) var _clen: USize = 0 // current message len (as given by server) - var _batch_size: USize = 0 // group rows in batch of this size. If 0, the + var _batch_size: USize = 1000 // group rows in batch of this size. If 0, the // batch ends only when the DataRowMessages stop. var _rows: (Array[DataRowMessage val] trn | Array[DataRowMessage val] val) = recover trn Array[DataRowMessage val] end - new create(c: BEConnection tag) => - _conn = c + fun ref connected(conn: TCPConnection ref) => + _conn.connected() - be batch_size(s: USize) => - _batch_size = s + fun ref closed(conn: TCPConnection ref) => + terminate() + _conn.received(ConnectionClosedMessage) + + new iso create(c: _Connection tag) => + _conn = c fun ref _batch_send() => _rows = recover val _rows end @@ -37,18 +43,19 @@ actor Listener Debug.out("Send Batch: " + rows.size().string() + " rows") _conn.received(BatchRowMessage(rows)) - be received(data: Array[U8] iso) => + fun ref received(conn: TCPConnection ref, data: Array[U8] iso) => let data' = recover val (consume data).slice() end r.append(data') // don't use while r.size() <= _clen do, because the // continue is unconditionnal. while true do - Debug.out("connection buffer size: " + r.size().string()) + //Debug.out("connection buffer size: " + r.size().string()) match parse_response() | let result: PGParseError val => Debug.out(result.msg);_conn.log(result.msg) | let result: ParsePending val => return | let result: DataRowMessage val => + //Debug.out("Row") try (_rows as Array[DataRowMessage val] trn).push(result) end if _batch_size > 0 then if @@ -73,7 +80,7 @@ actor Listener if r.size() <= _clen then break end end - be terminate() => + fun ref terminate() => if _ctype == 'E' then r.append(recover Array[U8].init(0, _clen - r.size()) end) try _conn.received(recover val parse_response() end as ServerMessage val) end @@ -141,7 +148,7 @@ actor Listener for n in Range(0, n_fields.usize()) do let len = r.i32_be() let data = recover val r.block(len.usize()) end - fields.push(recover val FieldData(len, data) end) + fields.push(recover val FieldData(len, recover val Array[U8].append(consume data) end) end) end fields end @@ -213,8 +220,8 @@ actor Listener return PGParseError("Malformed parameter message") end ParameterStatusMessage( - recover val item.trim(0, end_idx) end, - recover val item.trim(end_idx + 1) end) + recover val Array[U8].append(item.trim(0, end_idx)) end, + recover val Array[U8].append(item.trim(end_idx + 1)) end) fun ref parse_auth_resp(): ServerMessage val => /*Debug.out("parse_auth_resp")*/ diff --git a/pg/connection/tcp.pony b/pg/connection/tcp.pony index ef868d4..dbeb539 100644 --- a/pg/connection/tcp.pony +++ b/pg/connection/tcp.pony @@ -22,40 +22,11 @@ interface BEConnection be fetch(query: String, notify: FetchNotify iso, params: (Array[PGValue] val | None) = None) -class PGNotify is TCPConnectionNotify - let _conn: _Connection - let _listener: Listener - - new iso create(c: _Connection, l: Listener) => - _conn = c - _listener = l - - fun ref connected(conn: TCPConnection ref) => - _conn.connected() - - fun ref received(conn: TCPConnection ref, data: Array[U8] iso) => - // Debug.out("received") - _listener.received(consume data) - - fun ref closed(conn: TCPConnection ref) => - /*_listener.received(recover [as U8: 0, 0, 0, 0, 0]end)*/ - _listener.terminate() - _conn.received(ConnectionClosedMessage) - - /*fun ref sent(conn: TCPConnection ref, data: (String val | Array[U8 val] val)): (String val | Array[U8 val] val) =>*/ - /*Debug.out("send")*/ - /*match data*/ - /*| let s: String val => for c in s.values() do Debug.out(c) end*/ - /*| let s: Array[U8 val] val => for c in s.values() do Debug.out(c) end*/ - /*end*/ - /*conn.write_final(data)*/ - /*""*/ actor _Connection is BEConnection let _conn: TCPConnection tag var _fe: ( Connection tag | None) = None // front-end connection let _pool: ConnectionManager tag - let _listener: Listener tag let _params: Array[(String, String)] val var _convs: List[Conversation tag] = List[Conversation tag] var _current: Conversation tag @@ -66,8 +37,7 @@ actor _Connection is BEConnection service: String, params: Array[(String, String)] val, pool: ConnectionManager) => - _listener = Listener(this) - _conn = TCPConnection(auth, PGNotify(this, _listener), host, service) + _conn = TCPConnection(auth, PGNotify(this), host, service) _pool = pool _params = params _current = AuthConversation(_pool, this, _params) @@ -132,7 +102,7 @@ actor _Connection is BEConnection None be received(s: ServerMessage val) => - // Debug.out("recieved " + s.string()) + Debug.out("recieved " + s.string()) _current.message(s) be _log_error(m: ErrorMessage val) => @@ -163,6 +133,3 @@ actor _Connection is BEConnection be set_frontend(c: Connection tag) => _fe = c - - -