17
17
18
18
package org .apache .spark .rdd
19
19
20
- import java .sql .{PreparedStatement , Connection , ResultSet }
20
+ import java .sql .{Connection , ResultSet }
21
21
22
22
import scala .reflect .ClassTag
23
23
@@ -70,31 +70,21 @@ class JdbcRDD[T: ClassTag](
70
70
override def compute (thePart : Partition , context : TaskContext ) = new NextIterator [T ] {
71
71
context.addOnCompleteCallback{ () => closeIfNeeded() }
72
72
val part = thePart.asInstanceOf [JdbcPartition ]
73
- var conn : Connection = _
74
- var stmt : PreparedStatement = _
75
- try {
76
- conn = getConnection()
77
- stmt = conn.prepareStatement(sql, ResultSet .TYPE_FORWARD_ONLY , ResultSet .CONCUR_READ_ONLY )
73
+ val conn = getConnection()
74
+ val stmt = conn.prepareStatement(sql, ResultSet .TYPE_FORWARD_ONLY , ResultSet .CONCUR_READ_ONLY )
78
75
79
- // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results,
80
- // rather than pulling entire resultset into memory.
81
- // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html
82
- if (conn.getMetaData.getURL.matches(" jdbc:mysql:.*" )) {
83
- stmt.setFetchSize(Integer .MIN_VALUE )
84
- logInfo(" statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming " )
85
- }
86
-
87
- stmt.setLong(1 , part.lower)
88
- stmt.setLong(2 , part.upper)
89
- val rs = stmt.executeQuery()
90
-
91
- } catch {
92
- case e : Exception =>
93
- close()
94
- logError(" Exception occurred on creating connection/preparedStatement" , e)
95
- throw e // Is it correct to throw Exception, or what is preferred cleanup here?
76
+ // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results,
77
+ // rather than pulling entire resultset into memory.
78
+ // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html
79
+ if (conn.getMetaData.getURL.matches(" jdbc:mysql:.*" )) {
80
+ stmt.setFetchSize(Integer .MIN_VALUE )
81
+ logInfo(" statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming " )
96
82
}
97
83
84
+ stmt.setLong(1 , part.lower)
85
+ stmt.setLong(2 , part.upper)
86
+ val rs = stmt.executeQuery()
87
+
98
88
override def getNext : T = {
99
89
if (rs.next()) {
100
90
mapRow(rs)
@@ -116,7 +106,7 @@ class JdbcRDD[T: ClassTag](
116
106
case e : Exception => logWarning(" Exception closing statement" , e)
117
107
}
118
108
try {
119
- if (null != conn && ! stmt .isClosed()) conn.close()
109
+ if (null != conn && ! conn .isClosed()) conn.close()
120
110
logInfo(" closed connection" )
121
111
} catch {
122
112
case e : Exception => logWarning(" Exception closing connection" , e)
0 commit comments