@@ -21,28 +21,31 @@ import java.io.IOException
21
21
import java .text .NumberFormat
22
22
import java .util .Date
23
23
24
+ import scala .collection .mutable
25
+
24
26
import org .apache .hadoop .fs .Path
25
27
import org .apache .hadoop .hive .ql .exec .{FileSinkOperator , Utilities }
26
28
import org .apache .hadoop .hive .ql .io .{HiveFileFormatUtils , HiveOutputFormat }
27
29
import org .apache .hadoop .hive .ql .plan .FileSinkDesc
28
- import org .apache .hadoop .mapred ._
29
30
import org .apache .hadoop .io .Writable
31
+ import org .apache .hadoop .mapred ._
30
32
33
+ import org .apache .spark .sql .Row
31
34
import org .apache .spark .{Logging , SerializableWritable , SparkHadoopWriter }
32
35
33
36
/**
34
37
* Internal helper class that saves an RDD using a Hive OutputFormat.
35
38
* It is based on [[SparkHadoopWriter ]].
36
39
*/
37
- private [hive] class SparkHiveHadoopWriter (
40
+ private [hive] class SparkHiveWriterContainer (
38
41
@ transient jobConf : JobConf ,
39
42
fileSinkConf : FileSinkDesc )
40
43
extends Logging
41
44
with SparkHadoopMapRedUtil
42
45
with Serializable {
43
46
44
47
private val now = new Date ()
45
- private val conf = new SerializableWritable (jobConf)
48
+ protected val conf = new SerializableWritable (jobConf)
46
49
47
50
private var jobID = 0
48
51
private var splitID = 0
@@ -51,152 +54,75 @@ private[hive] class SparkHiveHadoopWriter(
51
54
private var taID : SerializableWritable [TaskAttemptID ] = null
52
55
53
56
@ transient private var writer : FileSinkOperator .RecordWriter = null
54
- @ transient private var format : HiveOutputFormat [AnyRef , Writable ] = null
55
- @ transient private var committer : OutputCommitter = null
56
- @ transient private var jobContext : JobContext = null
57
- @ transient private var taskContext : TaskAttemptContext = null
57
+ @ transient private lazy val committer = conf.value.getOutputCommitter
58
+ @ transient private lazy val jobContext = newJobContext(conf.value, jID.value)
59
+ @ transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
60
+ @ transient private lazy val outputFormat =
61
+ conf.value.getOutputFormat.asInstanceOf [HiveOutputFormat [AnyRef ,Writable ]]
58
62
59
- def preSetup () {
63
+ def driverSideSetup () {
60
64
setIDs(0 , 0 , 0 )
61
65
setConfParams()
62
-
63
- val jCtxt = getJobContext()
64
- getOutputCommitter().setupJob(jCtxt)
66
+ committer.setupJob(jobContext)
65
67
}
66
68
67
-
68
- def setup (jobid : Int , splitid : Int , attemptid : Int ) {
69
- setIDs(jobid, splitid, attemptid)
69
+ def executorSideSetup (jobId : Int , splitId : Int , attemptId : Int ) {
70
+ setIDs(jobId, splitId, attemptId)
70
71
setConfParams()
71
- }
72
-
73
- def open () {
74
- val numfmt = NumberFormat .getInstance()
75
- numfmt.setMinimumIntegerDigits(5 )
76
- numfmt.setGroupingUsed(false )
77
-
78
- val extension = Utilities .getFileExtension(
79
- conf.value,
80
- fileSinkConf.getCompressed,
81
- getOutputFormat())
82
-
83
- val outputName = " part-" + numfmt.format(splitID) + extension
84
- val path = FileOutputFormat .getTaskOutputPath(conf.value, outputName)
85
-
86
- getOutputCommitter().setupTask(getTaskContext())
87
- writer = HiveFileFormatUtils .getHiveRecordWriter(
88
- conf.value,
89
- fileSinkConf.getTableInfo,
90
- conf.value.getOutputValueClass.asInstanceOf [Class [Writable ]],
91
- fileSinkConf,
92
- path,
93
- null )
72
+ committer.setupTask(taskContext)
94
73
}
95
74
96
75
/**
97
- * create an HiveRecordWriter. imitate the above function open()
98
- * @param dynamicPartPath the relative path for dynamic partition
99
- *
100
- * since this function is used to create different writer for
101
- * different dynamic partition.So we need a parameter dynamicPartPath
102
- * and use it we can calculate a new path and pass the new path to
103
- * the function HiveFileFormatUtils.getHiveRecordWriter
76
+ * Create a `HiveRecordWriter`. A relative dynamic partition path can be used to create a writer
77
+ * for writing data to a dynamic partition.
104
78
*/
105
- def open (dynamicPartPath : String ) {
106
- val numfmt = NumberFormat .getInstance()
107
- numfmt.setMinimumIntegerDigits(5 )
108
- numfmt.setGroupingUsed(false )
109
-
110
- val extension = Utilities .getFileExtension(
111
- conf.value,
112
- fileSinkConf.getCompressed,
113
- getOutputFormat())
114
-
115
- val outputName = " part-" + numfmt.format(splitID) + extension
116
- val outputPath : Path = FileOutputFormat .getOutputPath(conf.value)
117
- if (outputPath == null ) {
118
- throw new IOException (" Undefined job output-path" )
119
- }
120
- val workPath = new Path (outputPath, dynamicPartPath.stripPrefix(" /" )) // remove "/"
121
- val path = new Path (workPath, outputName)
122
- getOutputCommitter().setupTask(getTaskContext())
79
+ def open () {
123
80
writer = HiveFileFormatUtils .getHiveRecordWriter(
124
81
conf.value,
125
82
fileSinkConf.getTableInfo,
126
83
conf.value.getOutputValueClass.asInstanceOf [Class [Writable ]],
127
84
fileSinkConf,
128
- path ,
85
+ FileOutputFormat .getTaskOutputPath(conf.value, getOutputName) ,
129
86
Reporter .NULL )
130
87
}
131
88
132
- def write ( value : Writable ) {
133
- if (writer != null ) {
134
- writer.write(value )
135
- } else {
136
- throw new IOException ( " Writer is null, open() has not been called " )
137
- }
89
+ protected def getOutputName : String = {
90
+ val numberFormat = NumberFormat .getInstance()
91
+ numberFormat.setMinimumIntegerDigits( 5 )
92
+ numberFormat.setGroupingUsed( false )
93
+ val extension = Utilities .getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat )
94
+ " part- " + numberFormat.format(splitID) + extension
138
95
}
139
96
97
+ def getLocalFileWriter (row : Row ): FileSinkOperator .RecordWriter = writer
98
+
140
99
def close () {
141
100
// Seems the boolean value passed into close does not matter.
142
101
writer.close(false )
143
102
}
144
103
145
104
def commit () {
146
- val taCtxt = getTaskContext()
147
- val cmtr = getOutputCommitter()
148
- if (cmtr.needsTaskCommit(taCtxt)) {
105
+ if (committer.needsTaskCommit(taskContext)) {
149
106
try {
150
- cmtr .commitTask(taCtxt )
107
+ committer .commitTask(taskContext )
151
108
logInfo (taID + " : Committed" )
152
109
} catch {
153
110
case e : IOException =>
154
111
logError(" Error committing the output of task: " + taID.value, e)
155
- cmtr .abortTask(taCtxt )
112
+ committer .abortTask(taskContext )
156
113
throw e
157
114
}
158
115
} else {
159
- logWarning (" No need to commit output of task: " + taID.value)
116
+ logInfo (" No need to commit output of task: " + taID.value)
160
117
}
161
118
}
162
119
163
120
def commitJob () {
164
- // always ? Or if cmtr.needsTaskCommit ?
165
- val cmtr = getOutputCommitter()
166
- cmtr.commitJob(getJobContext())
121
+ committer.commitJob(jobContext)
167
122
}
168
123
169
124
// ********* Private Functions *********
170
125
171
- private def getOutputFormat (): HiveOutputFormat [AnyRef ,Writable ] = {
172
- if (format == null ) {
173
- format = conf.value.getOutputFormat()
174
- .asInstanceOf [HiveOutputFormat [AnyRef ,Writable ]]
175
- }
176
- format
177
- }
178
-
179
- private def getOutputCommitter (): OutputCommitter = {
180
- if (committer == null ) {
181
- committer = conf.value.getOutputCommitter
182
- }
183
- committer
184
- }
185
-
186
- private def getJobContext (): JobContext = {
187
- if (jobContext == null ) {
188
- jobContext = newJobContext(conf.value, jID.value)
189
- }
190
- jobContext
191
- }
192
-
193
- private def getTaskContext (): TaskAttemptContext = {
194
- if (taskContext == null ) {
195
- taskContext = newTaskAttemptContext(conf.value, taID.value)
196
- }
197
- taskContext
198
- }
199
-
200
126
private def setIDs (jobId : Int , splitId : Int , attemptId : Int ) {
201
127
jobID = jobId
202
128
splitID = splitId
@@ -216,7 +142,7 @@ private[hive] class SparkHiveHadoopWriter(
216
142
}
217
143
}
218
144
219
- private [hive] object SparkHiveHadoopWriter {
145
+ private [hive] object SparkHiveWriterContainer {
220
146
def createPathFromString (path : String , conf : JobConf ): Path = {
221
147
if (path == null ) {
222
148
throw new IllegalArgumentException (" Output path is null" )
@@ -226,6 +152,59 @@ private[hive] object SparkHiveHadoopWriter {
226
152
if (outputPath == null || fs == null ) {
227
153
throw new IllegalArgumentException (" Incorrectly formatted output path" )
228
154
}
229
- outputPath.makeQualified(fs)
155
+ outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
156
+ }
157
+ }
158
+
159
+ private [spark] class SparkHiveDynamicPartitionWriterContainer (
160
+ @ transient jobConf : JobConf ,
161
+ fileSinkConf : FileSinkDesc ,
162
+ dynamicPartColNames : Array [String ],
163
+ defaultPartName : String )
164
+ extends SparkHiveWriterContainer (jobConf, fileSinkConf) {
165
+
166
+ @ transient var writers : mutable.HashMap [String , FileSinkOperator .RecordWriter ] = _
167
+
168
+ override def open (): Unit = {
169
+ writers = mutable.HashMap .empty[String , FileSinkOperator .RecordWriter ]
170
+ }
171
+
172
+ override def close (): Unit = {
173
+ writers.values.foreach(_.close(false ))
174
+ }
175
+
176
+ override def getLocalFileWriter (row : Row ): FileSinkOperator .RecordWriter = {
177
+ val dynamicPartPath = dynamicPartColNames
178
+ .zip(row.takeRight(dynamicPartColNames.length))
179
+ .map { case (col, rawVal) =>
180
+ val string = String .valueOf(rawVal)
181
+ s " / $col= ${if (rawVal == null || string.isEmpty) defaultPartName else string}"
182
+ }
183
+ .mkString
184
+
185
+ val path = {
186
+ val outputPath = FileOutputFormat .getOutputPath(conf.value)
187
+ assert(outputPath != null , " Undefined job output-path" )
188
+ val workPath = new Path (outputPath, dynamicPartPath.stripPrefix(" /" ))
189
+ new Path (workPath, getOutputName)
190
+ }
191
+
192
+ def newWriter = {
193
+ val newFileSinkDesc = new FileSinkDesc (
194
+ fileSinkConf.getDirName + dynamicPartPath,
195
+ fileSinkConf.getTableInfo,
196
+ fileSinkConf.getCompressed)
197
+ newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec)
198
+ newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)
199
+ HiveFileFormatUtils .getHiveRecordWriter(
200
+ conf.value,
201
+ fileSinkConf.getTableInfo,
202
+ conf.value.getOutputValueClass.asInstanceOf [Class [Writable ]],
203
+ newFileSinkDesc,
204
+ path,
205
+ Reporter .NULL )
206
+ }
207
+
208
+ writers.getOrElseUpdate(dynamicPartPath, newWriter)
230
209
}
231
210
}
0 commit comments