-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathQuery.scala
169 lines (141 loc) · 4.03 KB
/
Query.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package com.komanov.mysql.streaming
import java.sql._
import java.util
import scala.collection.mutable
object Query {
val TableName = "test_table"
val CreateSql =
s"""
CREATE TABLE $TableName (
id INT,
name VARCHAR(1000),
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
"""
private val MillionSql =
s"""
SELECT
t1.id AS id1, t1.name AS name1, t2.id AS id2, t2.name AS name2,
t2.name AS more1, t2.name AS more2, t2.name AS more3, t2.name AS more4
FROM
$TableName t1
LEFT JOIN
$TableName t2
ON TRUE
"""
val TableSize = 1000
val TestData = generateTestData(200)
private val map = new util.IdentityHashMap[MysqlDriver, Connection]()
private def generateTestData(length: Int) = {
(1 to TableSize)
.map(i => TestTableRow(i, i.toString.padTo(length, '0')))
.toList
}
def prepareTable(driver: MysqlDriver, length: Int = 200): Unit = {
val conn = getConnection(driver)
val st = conn.prepareStatement(s"INSERT INTO $TableName (id, name) VALUES(?, ?)")
for (row <- generateTestData(length)) {
st.setInt(1, row.id)
st.setString(2, row.name)
require(st.executeUpdate() == 1)
}
st.close()
}
def clearTable(driver: MysqlDriver): Unit = {
withStatement(driver) { st =>
st.execute(s"TRUNCATE TABLE $TableName")
}
}
def selectAtOnce(driver: MysqlDriver): List[TestTableRow] = {
withStatement(driver) { st =>
val rs = st.executeQuery(s"SELECT id, name FROM $TableName")
val result = mutable.ListBuffer[TestTableRow]()
while (rs.next()) {
result += mapRow(rs)
}
result.toList
}
}
def selectAtOnce(driver: MysqlDriver, limit: Int): List[TestTableRow] = {
withStatement(driver) { st =>
val rs = st.executeQuery(s"SELECT id, name FROM $TableName LIMIT $limit")
val result = mutable.ListBuffer[TestTableRow]()
while (rs.next()) {
result += mapRow(rs)
}
result.toList
}
}
def selectViaStreaming(driver: MysqlDriver): List[TestTableRow] = {
val result = mutable.ListBuffer[TestTableRow]()
forEach(driver, r => result += r)
result.toList
}
def selectViaStreaming(driver: MysqlDriver, limit: Int): List[TestTableRow] = {
val result = mutable.ListBuffer[TestTableRow]()
forEach(driver, limit, r => result += r)
result.toList
}
def forEach(driver: MysqlDriver, f: TestTableRow => Unit): Unit = {
withStatement(driver) { st =>
st.setFetchSize(Int.MinValue)
val rs = st.executeQuery(s"SELECT id, name FROM $TableName")
while (rs.next()) {
f(mapRow(rs))
}
}
}
def forEach(driver: MysqlDriver, limit: Int, f: TestTableRow => Unit): Unit = {
withStatement(driver) { st =>
st.setFetchSize(Int.MinValue)
val rs = st.executeQuery(s"SELECT id, name FROM $TableName LIMIT $limit")
while (rs.next()) {
f(mapRow(rs))
}
}
}
def forEachMillionAtOnce(driver: MysqlDriver): Unit = {
withStatement(driver) { st =>
// no setFetchSize!
var count = 0
val rs = st.executeQuery(MillionSql)
while (rs.next()) {
mapRow(rs)
count += 1
}
require(count == TableSize * TableSize)
}
}
def forEachMillionViaStreaming(driver: MysqlDriver): Unit = {
withStatement(driver) { st =>
st.setFetchSize(Int.MinValue)
var count = 0
val rs = st.executeQuery(MillionSql)
while (rs.next()) {
mapRow(rs)
count += 1
}
require(count == TableSize * TableSize)
}
}
private def mapRow(rs: ResultSet): TestTableRow = {
TestTableRow(rs.getInt(1), rs.getString(2))
}
private def withStatement[T](driver: MysqlDriver)(f: Statement => T): T = {
val st = getConnection(driver).createStatement()
try {
f(st)
} finally {
st.close()
}
}
private def getConnection(d: MysqlDriver): Connection = {
var c = map.get(d)
if (c == null) {
c = DriverManager.getConnection(d.url)
map.put(d, c)
}
c
}
}
case class TestTableRow(id: Int, name: String)