Skip to content

Commit 0985111

Browse files
[SC-5835][DIRECTORYCOMMIT] Move DirectoryCommitProtocol to its own package
## What changes were proposed in this pull request? As part of the effort of clearly separating out Edge functionality, we're hereby moving all code related to the Directory Commit protocol to its own package. A few things about this change are not ideal - had to: - import com.databricks in `PartitioningAwareFileIndex` - open up `DirectoryAtomicReadProtocol.testingFs` (for testig hack) - write ugly code for getting configs from `SparkEnv.conf`, because of not having access to `ConfigEntry` - duplicate a bunch of utility classes: `Clock`, `ManualClock`, `SystemClock`, `ThreadUtils` ... but most of these (except the last) should hopefully be resolved by [SC-5838](https://databricks.atlassian.net/browse/SC-5838). ## How was this patch tested? spark-sql tests Author: Adrian Ionescu <[email protected]> Closes apache#247 from adrian-ionescu/SC-5835.
1 parent 0a21de2 commit 0985111

File tree

15 files changed

+405
-83
lines changed

15 files changed

+405
-83
lines changed

project/SparkBuild.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -853,7 +853,7 @@ object TestSettings {
853853
javaOptions in Test += "-Dspark.ui.showConsoleProgress=false",
854854
javaOptions in Test += "-Dspark.unsafe.exceptionOnMemoryLeak=true",
855855
javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=false",
856-
javaOptions in Test += "-Dspark.sql.sources.commitProtocolClass=com.databricks.DatabricksAtomicCommitProtocol",
856+
javaOptions in Test += "-Dspark.sql.sources.commitProtocolClass=com.databricks.sql.transaction.directory.DirectoryAtomicCommitProtocol",
857857
javaOptions in Test += "-Dderby.system.durability=test",
858858
javaOptions in Test ++= System.getProperties.asScala.filter(_._1.startsWith("spark"))
859859
.map { case (k,v) => s"-D$k=$v" }.toSeq,

sql/core/src/main/scala/com/databricks/sql/acl/CheckPermissions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
package com.databricks.sql.acl
1010

1111
import com.databricks.sql.acl.Action._
12-
import com.databricks.sql.transaction.VacuumTableCommand
12+
import com.databricks.sql.transaction.directory.VacuumTableCommand
1313

1414
import org.apache.spark.sql.catalog.{Catalog => PublicCatalog}
1515
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}

sql/core/src/main/scala/com/databricks/sql/parser/DatabricksSqlCommandBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ package com.databricks.sql.parser
1111
import scala.collection.JavaConverters._
1212

1313
import com.databricks.sql.parser.DatabricksSqlBaseParser._
14-
import com.databricks.sql.transaction.VacuumTableCommand
14+
import com.databricks.sql.transaction.directory.VacuumTableCommand
1515

1616
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
1717
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}

sql/core/src/main/scala/com/databricks/sql/transaction/ProtocolAlias.scala

Lines changed: 0 additions & 16 deletions
This file was deleted.

sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocol.scala renamed to sql/core/src/main/scala/com/databricks/sql/transaction/directory/DirectoryAtomicCommitProtocol.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,22 @@
66
* http://www.apache.org/licenses/LICENSE-2.0
77
*/
88

9-
package org.apache.spark.sql.transaction
9+
package com.databricks.sql.transaction.directory
1010

1111
import java.io._
12-
import java.nio.charset.StandardCharsets
1312

1413
import scala.collection.mutable
1514
import scala.util.control.NonFatal
1615

1716
import com.databricks.sql.DatabricksSQLConf._
17+
import com.databricks.util.ThreadUtils
1818
import org.apache.hadoop.conf.Configuration
1919
import org.apache.hadoop.fs.{FileSystem => HadoopFileSystem, _}
2020
import org.apache.hadoop.mapreduce._
21-
import org.json4s.NoTypeHints
22-
import org.json4s.jackson.Serialization
2321

2422
import org.apache.spark.internal.Logging
2523
import org.apache.spark.internal.io.FileCommitProtocol
2624
import org.apache.spark.sql.SparkSession
27-
import org.apache.spark.util.ThreadUtils
2825

2926
/**
3027
* File commit protocol optimized for cloud storage. Files are written directly to their final
@@ -41,12 +38,13 @@ import org.apache.spark.util.ThreadUtils
4138
* Note that this is only atomic per-directory, and that we only provide snapshot isolation and
4239
* not serializability.
4340
*/
44-
class DatabricksAtomicCommitProtocol(jobId: String, path: String)
41+
class DirectoryAtomicCommitProtocol(jobId: String, path: String)
4542
extends FileCommitProtocol with Serializable with Logging {
4643

44+
import DirectoryAtomicCommitProtocol._
45+
import DirectoryAtomicReadProtocol._
46+
4747
import FileCommitProtocol._
48-
import DatabricksAtomicReadProtocol._
49-
import DatabricksAtomicCommitProtocol._
5048

5149
// Globally unique alphanumeric string. We decouple this from jobId for possible future use.
5250
private val txnId: TxnId = newTxnId()
@@ -202,8 +200,8 @@ class DatabricksAtomicCommitProtocol(jobId: String, path: String)
202200
}
203201
}
204202

205-
object DatabricksAtomicCommitProtocol extends Logging {
206-
import DatabricksAtomicReadProtocol._
203+
object DirectoryAtomicCommitProtocol extends Logging {
204+
import DirectoryAtomicReadProtocol._
207205

208206
import scala.collection.parallel.ThreadPoolTaskSupport
209207

sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicReadProtocol.scala renamed to sql/core/src/main/scala/com/databricks/sql/transaction/directory/DirectoryAtomicReadProtocol.scala

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,28 @@
66
* http://www.apache.org/licenses/LICENSE-2.0
77
*/
88

9-
package org.apache.spark.sql.transaction
9+
package com.databricks.sql.transaction.directory
1010

11-
import java.io.{File, FileNotFoundException, InputStream, InputStreamReader, IOException, OutputStream}
11+
import java.io.{FileNotFoundException, InputStream, InputStreamReader, IOException, OutputStream}
1212
import java.nio.charset.StandardCharsets
1313

1414
import scala.collection.mutable
1515
import scala.util.Try
1616
import scala.util.control.NonFatal
1717

1818
import com.databricks.sql.DatabricksSQLConf._
19+
import com.databricks.util.{Clock, SystemClock, ThreadUtils}
1920
import org.apache.hadoop.fs._
2021
import org.json4s.NoTypeHints
2122
import org.json4s.jackson.Serialization
2223

2324
import org.apache.spark.SparkEnv
2425
import org.apache.spark.internal.Logging
25-
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
2626

2727
/**
28-
* Read-side support for DatabricksAtomicCommitProtocol.
28+
* Read-side support for DirectoryAtomicCommitProtocol.
2929
*/
30-
object DatabricksAtomicReadProtocol extends Logging {
30+
object DirectoryAtomicReadProtocol extends Logging {
3131
type TxnId = String
3232

3333
import scala.collection.parallel.ThreadPoolTaskSupport
@@ -41,11 +41,12 @@ object DatabricksAtomicReadProtocol extends Logging {
4141

4242
private implicit val formats = Serialization.formats(NoTypeHints)
4343

44-
// Visible for testing.
45-
private[spark] var testingFs: Option[FileSystem] = None
44+
// Visible because it's used as a hack in PartitioningAwareFileIndex.
45+
var testingFs: Option[FileSystem] = None
4646

4747
// Visible for testing.
48-
private[spark] var clock: Clock = new SystemClock
48+
private[directory] var clock: Clock = new SystemClock
49+
4950

5051
/**
5152
* Given a directory listing, filters out files that are uncommitted. A file is considered
@@ -57,7 +58,9 @@ object DatabricksAtomicReadProtocol extends Logging {
5758
def filterDirectoryListing(
5859
fs: FileSystem, dir: Path, initialFiles: Seq[FileStatus]): Seq[FileStatus] = {
5960
// we use SparkEnv for this escape-hatch flag since this may be called on executors
60-
if (!SparkEnv.get.conf.get(DIRECTORY_COMMIT_FILTER_UNCOMMITTED)) {
61+
if (!SparkEnv.get.conf.get(
62+
DIRECTORY_COMMIT_FILTER_UNCOMMITTED.key,
63+
DIRECTORY_COMMIT_FILTER_UNCOMMITTED.defaultValueString).toBoolean) {
6164
return initialFiles
6265
}
6366

@@ -173,14 +176,16 @@ object DatabricksAtomicReadProtocol extends Logging {
173176
* The same issue can occur with data file writes re-ordered after commit marker creation. In
174177
* this situation we also must re-list if data files are suspected to be missing.
175178
*/
176-
private[transaction] def resolveCommitState(
179+
def resolveCommitState(
177180
fs: FileSystem,
178181
dir: Path,
179182
initialFiles: Seq[FileStatus]): (CommitState, Seq[FileStatus]) = {
180183
val state = resolveCommitState0(fs, dir, initialFiles)
181184

182185
// Optimization: can assume the list request was atomic if the files have not changed recently.
183-
val horizonMillis = SparkEnv.get.conf.get(DIRECTORY_COMMIT_WRITE_REORDERING_HORIZON_MS)
186+
val horizonMillis = SparkEnv.get.conf.get(
187+
DIRECTORY_COMMIT_WRITE_REORDERING_HORIZON_MS.key,
188+
DIRECTORY_COMMIT_WRITE_REORDERING_HORIZON_MS.defaultValueString).toLong
184189

185190
if ((state.missingMarkers.nonEmpty || state.missingDataFiles.nonEmpty) &&
186191
state.lastModified > clock.getTimeMillis - horizonMillis) {
@@ -287,7 +292,9 @@ object DatabricksAtomicReadProtocol extends Logging {
287292

288293
case NonFatal(e) =>
289294
// we use SparkEnv for this escape-hatch flag since this may be called on executors
290-
if (SparkEnv.get.conf.get(DIRECTORY_COMMIT_IGNORE_CORRUPT_MARKERS)) {
295+
if (SparkEnv.get.conf.get(
296+
DIRECTORY_COMMIT_IGNORE_CORRUPT_MARKERS.key,
297+
DIRECTORY_COMMIT_IGNORE_CORRUPT_MARKERS.defaultValueString).toBoolean) {
291298
logWarning("Failed to read job commit marker: " + stat, e)
292299
corruptCommitMarkers.add(txnId)
293300
} else {

sql/core/src/main/scala/com/databricks/sql/transaction/VacuumTableCommand.scala renamed to sql/core/src/main/scala/com/databricks/sql/transaction/directory/VacuumTableCommand.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package com.databricks.sql.transaction
18+
package com.databricks.sql.transaction.directory
1919

2020
import java.net.URI
2121

@@ -25,7 +25,6 @@ import org.apache.spark.sql.{Row, SparkSession}
2525
import org.apache.spark.sql.catalyst.TableIdentifier
2626
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
2727
import org.apache.spark.sql.execution.command.RunnableCommand
28-
import org.apache.spark.sql.transaction.DatabricksAtomicCommitProtocol
2928
import org.apache.spark.sql.types._
3029

3130
case class VacuumTableCommand(
@@ -42,7 +41,7 @@ case class VacuumTableCommand(
4241
} else {
4342
getCoveringPaths(sparkSession, table.get)
4443
}
45-
DatabricksAtomicCommitProtocol.vacuum(sparkSession, pathsToVacuum, horizonHours)
44+
DirectoryAtomicCommitProtocol.vacuum(sparkSession, pathsToVacuum, horizonHours)
4645
.map(p => Row(p.toString))
4746
}
4847

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.databricks.util
19+
20+
/**
21+
* An interface to represent clocks, so that they can be mocked out in unit tests.
22+
*/
23+
trait Clock {
24+
def getTimeMillis(): Long
25+
def waitTillTime(targetTime: Long): Long
26+
}
27+
28+
/**
29+
* A clock backed by the actual time from the OS as reported by the `System` API.
30+
*/
31+
class SystemClock extends Clock {
32+
33+
val minPollTime = 25L
34+
35+
/**
36+
* @return the same time (milliseconds since the epoch)
37+
* as is reported by `System.currentTimeMillis()`
38+
*/
39+
def getTimeMillis(): Long = System.currentTimeMillis()
40+
41+
/**
42+
* @param targetTime block until the current time is at least this value
43+
* @return current system time when wait has completed
44+
*/
45+
def waitTillTime(targetTime: Long): Long = {
46+
var currentTime = 0L
47+
currentTime = System.currentTimeMillis()
48+
49+
var waitTime = targetTime - currentTime
50+
if (waitTime <= 0) {
51+
return currentTime
52+
}
53+
54+
val pollTime = math.max(waitTime / 10.0, minPollTime).toLong
55+
56+
while (true) {
57+
currentTime = System.currentTimeMillis()
58+
waitTime = targetTime - currentTime
59+
if (waitTime <= 0) {
60+
return currentTime
61+
}
62+
val sleepTime = math.min(waitTime, pollTime)
63+
Thread.sleep(sleepTime)
64+
}
65+
-1
66+
}
67+
}
68+
69+
/**
70+
* A `Clock` whose time can be manually set and modified. Its reported time does not change
71+
* as time elapses, but only as its time is modified by callers. This is mainly useful for
72+
* testing.
73+
*
74+
* @param time initial time (in milliseconds since the epoch)
75+
*/
76+
class ManualClock(private var time: Long) extends Clock {
77+
78+
/**
79+
* @return `ManualClock` with initial time 0
80+
*/
81+
def this() = this(0L)
82+
83+
def getTimeMillis(): Long =
84+
synchronized {
85+
time
86+
}
87+
88+
/**
89+
* @param timeToSet new time (in milliseconds) that the clock should represent
90+
*/
91+
def setTime(timeToSet: Long): Unit = synchronized {
92+
time = timeToSet
93+
notifyAll()
94+
}
95+
96+
/**
97+
* @param timeToAdd time (in milliseconds) to add to the clock's time
98+
*/
99+
def advance(timeToAdd: Long): Unit = synchronized {
100+
time += timeToAdd
101+
notifyAll()
102+
}
103+
104+
/**
105+
* @param targetTime block until the clock time is set or advanced to at least this time
106+
* @return current time reported by the clock when waiting finishes
107+
*/
108+
def waitTillTime(targetTime: Long): Long = synchronized {
109+
while (time < targetTime) {
110+
wait(10)
111+
}
112+
getTimeMillis()
113+
}
114+
}

0 commit comments

Comments
 (0)