diff --git a/example/main.pony b/example/main.pony index f364586..8db4527 100644 --- a/example/main.pony +++ b/example/main.pony @@ -24,7 +24,7 @@ class BlogEntry field3 = f3 fun string(): String => - "BlogEntry " + field1.string() + " "+ field2.string() + " "+ field3.string() + " " + "BlogEntry " + field1.string() + " " + field2.string() + " " + field3.string() class User let id: I32 @@ -37,7 +37,7 @@ class BlogEntryRecordNotify is FetchNotify let view: BlogEntriesView tag new iso create(v: BlogEntriesView tag) => view = v fun ref descirption(desc: RowDescription) => None - fun size(): USize => 100000 + fun size(): USize => 10000 fun ref record(r: Record val) => try let e = recover val BlogEntry( @@ -113,13 +113,23 @@ actor BlogEntriesView actor Main let session: Session + let _env: Env new create(env: Env) => + _env = env session = Session(env where user="macflytest", password=EnvPasswordProvider(env), database="macflytest") - let that = recover tag this end + session.execute("SELECT generate_series(0,10)", + recover val + lambda(r: Rows val)(that) => + that.raw_count(r) + None + end + end) + """ + session.execute("SELECT 42, 24 as foo;;", recover val lambda(r: Rows val)(that) => @@ -127,6 +137,7 @@ actor Main end end) + session.execute("SELECT $1, $2 as foo", recover val lambda(r: Rows val)(that) => @@ -134,12 +145,15 @@ actor Main end end, recover val [as PGValue: I32(70000), I32(-100000)] end) - let p = session.connect(recover val lambda(c: Connection tag) => BlogEntriesView(c) end end) + """ + + be raw_count(rows: Rows val) => + _env.out.print(rows.size().string()) be raw_handler(rows: Rows val) => for row in rows.values() do diff --git a/pg/connection/conversation.pony b/pg/connection/conversation.pony index 7caa3cf..8acb8d2 100644 --- a/pg/connection/conversation.pony +++ b/pg/connection/conversation.pony @@ -94,12 +94,17 @@ actor RecordIterator _conversation.start() be batch(b: Array[Record val] val) => + Debug.out(b.size()) + var s = b.size() for r in b.values() do _notify.record(r) + s = s-1 + /*if s == 0 then _conversation.start() end*/ end - _conversation.start() + _conversation.start() be record(r: Record val) => + Debug.out("record") _notify.record(r) _conversation.start() @@ -142,42 +147,11 @@ actor FetchConversation is Conversation /*Debug.out("paused")*/ if status is Sending then status = Paused end - be log(msg: String) => _conn.log(msg) - - fun _sync() => - _conn.writev(recover val SyncMessage.done() end) - - fun _flush() => - _conn.writev(recover val FlushMessage.done() end) - - be apply(c: BEConnection tag) => - c.writev(recover val ParseMessage(query, "", TypeOids(params)).done() end) - _bind() - _describe() - _flush() - - be _bind() => - _conn.writev(recover val BindMessage("", "", params).done() end) - _flush() - - be _execute() => - /*Debug.out("execute")*/ - _conn.writev(recover val ExecuteMessage("", _size).done() end) - _flush() - - be _describe() => - _conn.writev(recover val DescribeMessage('P', "").done() end) - _flush() - - be _close() => - _conn.writev(recover val CloseMessage('P', "").done() end) - _flush() - be row(m: DataRowMessage val) => + Debug.out("row") _handle_row(m) fun ref _handle_row(m: DataRowMessage val) => - /*Debug.out("row")*/ try let record = recover val Record(_tuple_desc as TupleDescription val, m.fields) end if status is Sending then @@ -186,7 +160,7 @@ actor FetchConversation is Conversation _iterator.record(record) else /*Debug("Buffer")*/ - try (_buffer as Array[Record val] trn).push(record) end + (_buffer as Array[Record val] trn).push(record) end end @@ -210,17 +184,47 @@ actor FetchConversation is Conversation | let r: RowDescriptionMessage val => _tuple_desc = r.tuple_desc _execute() - | let r: DataRowMessage val => row(r) | let r: EmptyQueryResponse val => Debug.out("Empty Query") | let r: CommandCompleteMessage val => _iterator.stop(); _close() | let r: PortalSuspendedMessage val => match status - | Sending => _execute() - | Paused => status = Suspended + | Sending => Debug.out("continue");_execute() + | Paused => Debug.out("suspend");status = Suspended end else _conn.handle_message(m) end + be log(msg: String) => _conn.log(msg) + + fun _sync() => + _conn.writev(recover val SyncMessage.done() end) + + fun _flush() => + _conn.writev(recover val FlushMessage.done() end) + + be apply(c: BEConnection tag) => + c.writev(recover val ParseMessage(query, "", TypeOids(params)).done() end) + _bind() + _describe() + _flush() + + be _bind() => + _conn.writev(recover val BindMessage("", "", params).done() end) + _flush() + + be _execute() => + /*Debug.out("execute")*/ + _conn.writev(recover val ExecuteMessage("", _size).done() end) + _flush() + + be _describe() => + _conn.writev(recover val DescribeMessage('P', "").done() end) + _flush() + + be _close() => + _conn.writev(recover val CloseMessage('P', "").done() end) + _flush() + actor ExecuteConversation is Conversation let query: String val let params: Array[PGValue] val @@ -285,7 +289,10 @@ actor ExecuteConversation is Conversation | let r: RowDescriptionMessage val => _tuple_desc = r.tuple_desc _execute() - | let r: DataRowMessage val => row(r) + | let r: BatchRowMessage val => + for row' in r.rows.values() do + row(row') + end | let r: EmptyQueryResponse val => Debug.out("Empty Query") | let r: CommandCompleteMessage val => call_back(); _close() else @@ -320,6 +327,7 @@ actor QueryConversation is Conversation try let res = recover val Record(_tuple_desc as TupleDescription val, m.fields) end (_rows as Rows trn).push(res) + Debug.out(res(0)) end be message(m: ServerMessage val)=> @@ -328,7 +336,11 @@ actor QueryConversation is Conversation | let r: CommandCompleteMessage val => call_back(); Debug.out(r.command) | let r: ReadyForQueryMessage val => _conn.next() | let r: RowDescriptionMessage val => _tuple_desc = r.tuple_desc - | let r: DataRowMessage val => row(r) + | let r: BatchRowMessage val => + Debug.out("Batch: " + r.rows.size().string() + " rows") + for row' in r.rows.values() do + row(row') + end else _conn.handle_message(m) end diff --git a/pg/connection/listener.pony b/pg/connection/listener.pony index 44947ff..d13965c 100644 --- a/pg/connection/listener.pony +++ b/pg/connection/listener.pony @@ -20,35 +20,57 @@ actor Listener var r: Reader iso = Reader // current reader var _ctype: U8 = 0 // current type (keep it if the data is chuncked) var _clen: USize = 0 // current message len (as given by server) - var _batch: Bool = false + var _batch_size: USize = 0 // group rows in batch of this size. If 0, the + // batch ends only when the DataRowMessages stop. var _rows: (Array[DataRowMessage val] trn | Array[DataRowMessage val] val) = recover trn Array[DataRowMessage val] end - new create(c: BEConnection tag, batch: Bool = true) => - _batch = batch + new create(c: BEConnection tag) => _conn = c + be batch_size(s: USize) => + _batch_size = s + + fun ref _batch_send() => + _rows = recover val _rows end + let rows = _rows = recover trn Array[DataRowMessage val] end + Debug.out("Send Batch: " + rows.size().string() + " rows") + _conn.received(BatchRowMessage(rows)) + be received(data: Array[U8] iso) => let data' = recover val (consume data).slice() end r.append(data') - while r.size() > _clen do + + // don't use while r.size() <= _clen do, because the + // continue is unconditionnal. + while true do + Debug.out("connection buffer size: " + r.size().string()) match parse_response() | let result: PGParseError val => Debug.out(result.msg);_conn.log(result.msg) | let result: ParsePending val => return | let result: DataRowMessage val => - if _batch then - try (_rows as Array[DataRowMessage val] trn).push(result) end - else - _conn.received(result) - end + try (_rows as Array[DataRowMessage val] trn).push(result) end + if _batch_size > 0 then + if + (r.size() < 5) // not enough bytes remain in the buffer, let's + // send the batch while we're waiting for more + or + (_rows.size() >= _batch_size) // max_size reached, send the batch + then + _batch_send() + else + continue + end + end | let result: ServerMessage val => - if _batch and (_rows.size() > 0) then - _rows = recover val _rows end - let rows = _rows = recover trn Array[DataRowMessage val] end - _conn.received(BatchRowMessage(rows)) + // if some messages are still in the batch, there's something new, here. + // we first empty the batch + if (_rows.size() > 0) then + _batch_send() end _conn.received(result) end + if r.size() <= _clen then break end end be terminate() => diff --git a/pg/pg.pony b/pg/pg.pony index 4ce3f7b..2ade007 100644 --- a/pg/pg.pony +++ b/pg/pg.pony @@ -24,17 +24,51 @@ actor Session let _mgr: ConnectionManager new create(env: Env, - host: String="", - service: String="5432", + host: (String | None) = None, + service: (String| None) = None, user: (String | None) = None, password: (String | PasswordProvider tag) = "", - database: String = "") => + database: (String | None) = None + ) => _env = env - // retreive the user from the env if not provided - let user' = try user as String else - try EnvVars(env.vars())("USER") else "" end - end + // retreive the connection parameters from env if not provided + // TODO: we should implement all options of libpq as well : + // https://www.postgresql.org/docs/current/static/libpq-envars.html + + let user' = try + user as String + else try + EnvVars(env.vars())("PGUSER") + else try + EnvVars(env.vars())("USER") + else + "" + end end end + + let host' = try + host as String + else try + EnvVars(env.vars())("PGHOST") + else + "localhost" + end end + + let service' = try + service as String + else try + EnvVars(env.vars())("PGPORT") + else + "5432" + end end + + let database' = try + database as String + else try + EnvVars(env.vars())("PGDATABASE") + else + user' + end end // Define the password strategy let provider = match password @@ -44,8 +78,8 @@ actor Session RawPasswordProvider("") end - _mgr = ConnectionManager(host, service, user', provider, - recover val [("user", user'), ("database", database)] end) + _mgr = ConnectionManager(host', service', user', provider, + recover val [("user", user'), ("database", database')] end) be log(msg: String) => _env.out.print(msg)