File tree Expand file tree Collapse file tree 5 files changed +105
-72
lines changed
main/scala/org/apache/spark/streaming/util
test/scala/org/apache/spark/streaming/util Expand file tree Collapse file tree 5 files changed +105
-72
lines changed Original file line number Diff line number Diff line change 406
406
<artifactId >akka-slf4j_${scala.binary.version}</artifactId >
407
407
<version >${akka.version} </version >
408
408
</dependency >
409
+ <dependency >
410
+ <groupId >org.apache.hadoop</groupId >
411
+ <artifactId >hadoop-minicluster</artifactId >
412
+ <version >${hadoop.version} </version >
413
+ <scope >test</scope >
414
+ </dependency >
409
415
<dependency >
410
416
<groupId >${akka.group} </groupId >
411
417
<artifactId >akka-testkit_${scala.binary.version}</artifactId >
Original file line number Diff line number Diff line change 68
68
<artifactId >junit-interface</artifactId >
69
69
<scope >test</scope >
70
70
</dependency >
71
+ <dependency >
72
+ <groupId >org.apache.hadoop</groupId >
73
+ <artifactId >hadoop-minicluster</artifactId >
74
+ <scope >test</scope >
75
+ </dependency >
71
76
</dependencies >
72
77
<build >
73
78
<outputDirectory >target/scala-${scala.binary.version} /classes</outputDirectory >
Original file line number Diff line number Diff line change @@ -25,10 +25,9 @@ private[streaming] object HdfsUtils {
25
25
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that
26
26
27
27
val dfsPath = new Path (path)
28
- val dfs =
29
- this .synchronized {
30
- dfsPath.getFileSystem(conf)
31
- }
28
+ val dfs = this .synchronized {
29
+ dfsPath.getFileSystem(conf)
30
+ }
32
31
// If the file exists and we have append support, append instead of creating a new file
33
32
val stream : FSDataOutputStream = {
34
33
if (dfs.isFile(dfsPath)) {
@@ -54,17 +53,16 @@ private[streaming] object HdfsUtils {
54
53
}
55
54
56
55
def checkState (state : Boolean , errorMsg : => String ) {
57
- if (! state) {
56
+ if (! state) {
58
57
throw new IllegalStateException (errorMsg)
59
58
}
60
59
}
61
60
62
61
def getBlockLocations (path : String , conf : Configuration ): Option [Array [String ]] = {
63
62
val dfsPath = new Path (path)
64
- val dfs =
65
- this .synchronized {
66
- dfsPath.getFileSystem(conf)
67
- }
63
+ val dfs = this .synchronized {
64
+ dfsPath.getFileSystem(conf)
65
+ }
68
66
val fileStatus = dfs.getFileStatus(dfsPath)
69
67
val blockLocs = Option (dfs.getFileBlockLocations(fileStatus, 0 , fileStatus.getLen))
70
68
blockLocs.map(_.flatMap(_.getHosts))
Original file line number Diff line number Diff line change @@ -183,10 +183,6 @@ private[streaming] class WriteAheadLogManager(
183
183
pastLogs ++= logFileInfo
184
184
logInfo(s " Recovered ${logFileInfo.size} write ahead log files from $logDirectory" )
185
185
logDebug(s " Recovered files are: \n ${logFileInfo.map(_.path).mkString(" \n " )}" )
186
- } else {
187
- fileSystem.mkdirs(logDirectoryPath,
188
- FsPermission .createImmutable(Integer .parseInt(" 770" , 8 ).toShort))
189
- logInfo(s " Created ${logDirectory} for write ahead log files " )
190
186
}
191
187
}
192
188
You can’t perform that action at this time.
0 commit comments