@@ -102,8 +102,19 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
102
102
* this does not necessarily need to be the same version of Hive that is used internally by
103
103
* Spark SQL for execution.
104
104
*/
105
- protected [hive] def hiveVersion : String =
106
- getConf(" spark.sql.hive.version" , " 0.13.1" )
105
+ protected [hive] def hiveMetastoreVersion : String =
106
+ getConf(" spark.sql.hive.metastore.version" , " 0.13.1" )
107
+
108
+ /**
109
+ * The location of the jars that should be used to instantiate the HiveMetastoreClient. This
110
+ * property can be one of three option:
111
+ * - a comma-separated list of jar files that could be passed to a URLClassLoader
112
+ * - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This
113
+ * option is only valid when using the execution version of Hive.
114
+ * - maven - download the correct version of hive on demand from maven.
115
+ */
116
+ protected [hive] def hiveMetastoreJars : String =
117
+ getConf(" spark.sql.hive.metastore.jars" , " builtin" )
107
118
108
119
@ transient
109
120
protected [sql] lazy val substitutor = new VariableSubstitution ()
@@ -121,6 +132,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
121
132
executionConf.set(
122
133
" javax.jdo.option.ConnectionURL" , s " jdbc:derby:;databaseName= $localMetastore;create=true " )
123
134
135
+ /** The version of hive used internally by Spark SQL. */
136
+ lazy val hiveExecutionVersion : String = " 0.13.1"
137
+
124
138
/**
125
139
* The copy of the hive client that is used for execution. Currently this must always be
126
140
* Hive 13 as this is the version of Hive that is packaged with Spark SQL. This copy of the
@@ -129,31 +143,71 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
129
143
* for storing peristent metadata, and only point to a dummy metastore in a temporary directory.
130
144
*/
131
145
@ transient
132
- protected [hive] lazy val executionHive : ClientWrapper =
133
- new IsolatedClientLoader (
134
- version = IsolatedClientLoader .hiveVersion( " 13 " ),
135
- isolationOn = false ,
146
+ protected [hive] lazy val executionHive : ClientWrapper = {
147
+ logInfo( s " Initilizing execution hive, version $hiveExecutionVersion " )
148
+ new ClientWrapper (
149
+ version = IsolatedClientLoader .hiveVersion(hiveExecutionVersion) ,
136
150
config = Map (
137
151
" javax.jdo.option.ConnectionURL" ->
138
- s " jdbc:derby:;databaseName= $localMetastore;create=true " ),
139
- rootClassLoader = Utils .getContextOrSparkClassLoader).client. asInstanceOf [ ClientWrapper ]
152
+ s " jdbc:derby:;databaseName= $localMetastore;create=true " ))
153
+ }
140
154
SessionState .setCurrentSessionState(executionHive.state)
141
155
142
156
/**
143
- * The copy of the Hive client that is used to retrieve metadata from the Hive MetaStore. This
157
+ * The copy of the Hive client that is used to retrieve metadata from the Hive MetaStore.
144
158
* The version of the Hive client that is used here must match the metastore that is configured
145
159
* in the hive-site.xml file.
146
160
*/
147
161
@ transient
148
162
protected [hive] lazy val metadataHive : ClientInterface = {
163
+ val metaVersion = IsolatedClientLoader .hiveVersion(hiveMetastoreVersion)
164
+
149
165
// We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options
150
166
// into the isolated client loader
151
167
val metadataConf = new HiveConf ()
152
- val allConfig = metadataConf.iterator.map(e => e.getKey -> e.getValue).toMap
168
+ // `configure` goes second to override other settings.
169
+ val allConfig = metadataConf.iterator.map(e => e.getKey -> e.getValue).toMap ++ configure
170
+
171
+ val isolatedLoader = if (hiveMetastoreJars == " builtin" ) {
172
+ if (hiveExecutionVersion != hiveMetastoreVersion) {
173
+ throw new IllegalArgumentException (
174
+ " Builtin jars can only be used when hive execution version == hive metastore version. " +
175
+ s " Execution: ${hiveExecutionVersion} != Metastore: ${hiveMetastoreVersion}. " +
176
+ " Specify a vaild path to the correct hive jars using spark.sql.hive.metastore.jars " +
177
+ s " or change spark.sql.hive.metastore.version to ${hiveExecutionVersion}. " )
178
+ }
179
+ val jars = getClass.getClassLoader match {
180
+ case urlClassLoader : java.net.URLClassLoader => urlClassLoader.getURLs
181
+ case other =>
182
+ throw new IllegalArgumentException (
183
+ " Unable to locate hive jars to connect to metastore " +
184
+ s " using classloader ${other.getClass.getName}. " +
185
+ " Please set spark.sql.hive.metastore.jars" )
186
+ }
153
187
154
- // Config goes second to override other settings.
155
- // TODO: Support for loading the jars from an already downloaded location.
156
- IsolatedClientLoader .forVersion(hiveVersion, allConfig ++ configure).client
188
+ logInfo(
189
+ s " Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes. " )
190
+ new IsolatedClientLoader (
191
+ version = metaVersion,
192
+ execJars = jars.toSeq,
193
+ config = allConfig,
194
+ isolationOn = true )
195
+ } else if (hiveMetastoreJars == " maven" ) {
196
+ // TODO: Support for loading the jars from an already downloaded location.
197
+ logInfo(
198
+ s " Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven. " )
199
+ IsolatedClientLoader .forVersion(hiveMetastoreVersion, allConfig )
200
+ } else {
201
+ val jars = hiveMetastoreJars.split(" ," ).map(new java.net.URL (_))
202
+ logInfo(
203
+ s " Initializing HiveMetastoreConnection version $hiveMetastoreVersion using $jars" )
204
+ new IsolatedClientLoader (
205
+ version = metaVersion,
206
+ execJars = jars.toSeq,
207
+ config = allConfig,
208
+ isolationOn = true )
209
+ }
210
+ isolatedLoader.client
157
211
}
158
212
159
213
protected [sql] override def parseSql (sql : String ): LogicalPlan = {
0 commit comments