Skip to content

Commit

Permalink
cleanups and fetch connection info from the env
Browse files Browse the repository at this point in the history
  • Loading branch information
lisael committed Oct 15, 2016
1 parent 26875c2 commit 644f588
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 65 deletions.
22 changes: 18 additions & 4 deletions example/main.pony
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class BlogEntry
field3 = f3

fun string(): String =>
"BlogEntry " + field1.string() + " "+ field2.string() + " "+ field3.string() + " "
"BlogEntry " + field1.string() + " " + field2.string() + " " + field3.string()

class User
let id: I32
Expand All @@ -37,7 +37,7 @@ class BlogEntryRecordNotify is FetchNotify
let view: BlogEntriesView tag
new iso create(v: BlogEntriesView tag) => view = v
fun ref descirption(desc: RowDescription) => None
fun size(): USize => 100000
fun size(): USize => 10000
fun ref record(r: Record val) =>
try
let e = recover val BlogEntry(
Expand Down Expand Up @@ -113,33 +113,47 @@ actor BlogEntriesView

actor Main
let session: Session
let _env: Env

new create(env: Env) =>
_env = env
session = Session(env where user="macflytest",
password=EnvPasswordProvider(env),
database="macflytest")

let that = recover tag this end
session.execute("SELECT generate_series(0,10)",
recover val
lambda(r: Rows val)(that) =>
that.raw_count(r)
None
end
end)
"""
session.execute("SELECT 42, 24 as foo;;",
recover val
lambda(r: Rows val)(that) =>
that.raw_handler(r)
end
end)
session.execute("SELECT $1, $2 as foo",
recover val
lambda(r: Rows val)(that) =>
that.execute_handler(r)
end
end,
recover val [as PGValue: I32(70000), I32(-100000)] end)

let p = session.connect(recover val
lambda(c: Connection tag) =>
BlogEntriesView(c)
end
end)
"""

be raw_count(rows: Rows val) =>
_env.out.print(rows.size().string())

be raw_handler(rows: Rows val) =>
for row in rows.values() do
Expand Down
90 changes: 51 additions & 39 deletions pg/connection/conversation.pony
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,17 @@ actor RecordIterator
_conversation.start()

be batch(b: Array[Record val] val) =>
Debug.out(b.size())
var s = b.size()
for r in b.values() do
_notify.record(r)
s = s-1
/*if s == 0 then _conversation.start() end*/
end
_conversation.start()
_conversation.start()

be record(r: Record val) =>
Debug.out("record")
_notify.record(r)
_conversation.start()

Expand Down Expand Up @@ -142,42 +147,11 @@ actor FetchConversation is Conversation
/*Debug.out("paused")*/
if status is Sending then status = Paused end

be log(msg: String) => _conn.log(msg)

fun _sync() =>
_conn.writev(recover val SyncMessage.done() end)

fun _flush() =>
_conn.writev(recover val FlushMessage.done() end)

be apply(c: BEConnection tag) =>
c.writev(recover val ParseMessage(query, "", TypeOids(params)).done() end)
_bind()
_describe()
_flush()

be _bind() =>
_conn.writev(recover val BindMessage("", "", params).done() end)
_flush()

be _execute() =>
/*Debug.out("execute")*/
_conn.writev(recover val ExecuteMessage("", _size).done() end)
_flush()

be _describe() =>
_conn.writev(recover val DescribeMessage('P', "").done() end)
_flush()

be _close() =>
_conn.writev(recover val CloseMessage('P', "").done() end)
_flush()

be row(m: DataRowMessage val) =>
Debug.out("row")
_handle_row(m)

fun ref _handle_row(m: DataRowMessage val) =>
/*Debug.out("row")*/
try
let record = recover val Record(_tuple_desc as TupleDescription val, m.fields) end
if status is Sending then
Expand All @@ -186,7 +160,7 @@ actor FetchConversation is Conversation
_iterator.record(record)
else
/*Debug("Buffer")*/
try (_buffer as Array[Record val] trn).push(record) end
(_buffer as Array[Record val] trn).push(record)
end
end

Expand All @@ -210,17 +184,47 @@ actor FetchConversation is Conversation
| let r: RowDescriptionMessage val =>
_tuple_desc = r.tuple_desc
_execute()
| let r: DataRowMessage val => row(r)
| let r: EmptyQueryResponse val => Debug.out("Empty Query")
| let r: CommandCompleteMessage val => _iterator.stop(); _close()
| let r: PortalSuspendedMessage val => match status
| Sending => _execute()
| Paused => status = Suspended
| Sending => Debug.out("continue");_execute()
| Paused => Debug.out("suspend");status = Suspended
end
else
_conn.handle_message(m)
end

be log(msg: String) => _conn.log(msg)

fun _sync() =>
_conn.writev(recover val SyncMessage.done() end)

fun _flush() =>
_conn.writev(recover val FlushMessage.done() end)

be apply(c: BEConnection tag) =>
c.writev(recover val ParseMessage(query, "", TypeOids(params)).done() end)
_bind()
_describe()
_flush()

be _bind() =>
_conn.writev(recover val BindMessage("", "", params).done() end)
_flush()

be _execute() =>
/*Debug.out("execute")*/
_conn.writev(recover val ExecuteMessage("", _size).done() end)
_flush()

be _describe() =>
_conn.writev(recover val DescribeMessage('P', "").done() end)
_flush()

be _close() =>
_conn.writev(recover val CloseMessage('P', "").done() end)
_flush()

actor ExecuteConversation is Conversation
let query: String val
let params: Array[PGValue] val
Expand Down Expand Up @@ -285,7 +289,10 @@ actor ExecuteConversation is Conversation
| let r: RowDescriptionMessage val =>
_tuple_desc = r.tuple_desc
_execute()
| let r: DataRowMessage val => row(r)
| let r: BatchRowMessage val =>
for row' in r.rows.values() do
row(row')
end
| let r: EmptyQueryResponse val => Debug.out("Empty Query")
| let r: CommandCompleteMessage val => call_back(); _close()
else
Expand Down Expand Up @@ -320,6 +327,7 @@ actor QueryConversation is Conversation
try
let res = recover val Record(_tuple_desc as TupleDescription val, m.fields) end
(_rows as Rows trn).push(res)
Debug.out(res(0))
end

be message(m: ServerMessage val)=>
Expand All @@ -328,7 +336,11 @@ actor QueryConversation is Conversation
| let r: CommandCompleteMessage val => call_back(); Debug.out(r.command)
| let r: ReadyForQueryMessage val => _conn.next()
| let r: RowDescriptionMessage val => _tuple_desc = r.tuple_desc
| let r: DataRowMessage val => row(r)
| let r: BatchRowMessage val =>
Debug.out("Batch: " + r.rows.size().string() + " rows")
for row' in r.rows.values() do
row(row')
end
else
_conn.handle_message(m)
end
Expand Down
48 changes: 35 additions & 13 deletions pg/connection/listener.pony
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,57 @@ actor Listener
var r: Reader iso = Reader // current reader
var _ctype: U8 = 0 // current type (keep it if the data is chuncked)
var _clen: USize = 0 // current message len (as given by server)
var _batch: Bool = false
var _batch_size: USize = 0 // group rows in batch of this size. If 0, the
// batch ends only when the DataRowMessages stop.
var _rows: (Array[DataRowMessage val] trn | Array[DataRowMessage val] val) =
recover trn Array[DataRowMessage val] end

new create(c: BEConnection tag, batch: Bool = true) =>
_batch = batch
new create(c: BEConnection tag) =>
_conn = c

be batch_size(s: USize) =>
_batch_size = s

fun ref _batch_send() =>
_rows = recover val _rows end
let rows = _rows = recover trn Array[DataRowMessage val] end
Debug.out("Send Batch: " + rows.size().string() + " rows")
_conn.received(BatchRowMessage(rows))

be received(data: Array[U8] iso) =>
let data' = recover val (consume data).slice() end
r.append(data')
while r.size() > _clen do

// don't use while r.size() <= _clen do, because the
// continue is unconditionnal.
while true do
Debug.out("connection buffer size: " + r.size().string())
match parse_response()
| let result: PGParseError val => Debug.out(result.msg);_conn.log(result.msg)
| let result: ParsePending val => return
| let result: DataRowMessage val =>
if _batch then
try (_rows as Array[DataRowMessage val] trn).push(result) end
else
_conn.received(result)
end
try (_rows as Array[DataRowMessage val] trn).push(result) end
if _batch_size > 0 then
if
(r.size() < 5) // not enough bytes remain in the buffer, let's
// send the batch while we're waiting for more
or
(_rows.size() >= _batch_size) // max_size reached, send the batch
then
_batch_send()
else
continue
end
end
| let result: ServerMessage val =>
if _batch and (_rows.size() > 0) then
_rows = recover val _rows end
let rows = _rows = recover trn Array[DataRowMessage val] end
_conn.received(BatchRowMessage(rows))
// if some messages are still in the batch, there's something new, here.
// we first empty the batch
if (_rows.size() > 0) then
_batch_send()
end
_conn.received(result)
end
if r.size() <= _clen then break end
end

be terminate() =>
Expand Down
52 changes: 43 additions & 9 deletions pg/pg.pony
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,51 @@ actor Session
let _mgr: ConnectionManager

new create(env: Env,
host: String="",
service: String="5432",
host: (String | None) = None,
service: (String| None) = None,
user: (String | None) = None,
password: (String | PasswordProvider tag) = "",
database: String = "") =>
database: (String | None) = None
) =>
_env = env

// retreive the user from the env if not provided
let user' = try user as String else
try EnvVars(env.vars())("USER") else "" end
end
// retreive the connection parameters from env if not provided
// TODO: we should implement all options of libpq as well :
// https://www.postgresql.org/docs/current/static/libpq-envars.html

let user' = try
user as String
else try
EnvVars(env.vars())("PGUSER")
else try
EnvVars(env.vars())("USER")
else
""
end end end

let host' = try
host as String
else try
EnvVars(env.vars())("PGHOST")
else
"localhost"
end end

let service' = try
service as String
else try
EnvVars(env.vars())("PGPORT")
else
"5432"
end end

let database' = try
database as String
else try
EnvVars(env.vars())("PGDATABASE")
else
user'
end end

// Define the password strategy
let provider = match password
Expand All @@ -44,8 +78,8 @@ actor Session
RawPasswordProvider("")
end

_mgr = ConnectionManager(host, service, user', provider,
recover val [("user", user'), ("database", database)] end)
_mgr = ConnectionManager(host', service', user', provider,
recover val [("user", user'), ("database", database')] end)

be log(msg: String) =>
_env.out.print(msg)
Expand Down

0 comments on commit 644f588

Please sign in to comment.