Skip to content

Commit 523259f

Browse files
hn50927mming7
authored andcommitted
apache#11 SQLConf support thread local prop
1 parent 7e9dd25 commit 523259f

File tree

2 files changed

+30
-3
lines changed

2 files changed

+30
-3
lines changed

core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import java.util.{Map => JMap}
2222
import scala.collection.mutable.HashMap
2323
import scala.util.matching.Regex
2424

25+
import org.apache.commons.lang3.SerializationUtils
26+
2527
private object ConfigReader {
2628

2729
private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r
@@ -56,6 +58,17 @@ private[spark] class ConfigReader(conf: ConfigProvider) {
5658
bindEnv(new EnvProvider())
5759
bindSystem(new SystemProvider())
5860

61+
protected[spark] val localProperties =
62+
new InheritableThreadLocal[java.util.HashMap[String, String]] {
63+
override protected def childValue(parent: java.util.HashMap[String, String]):
64+
java.util.HashMap[String, String] = {
65+
// Note: make a clone such that changes in the parent properties aren't reflected in
66+
// the those of the children threads, which has confusing semantics (SPARK-10563).
67+
SerializationUtils.clone(parent)
68+
}
69+
override protected def initialValue(): java.util.HashMap[String, String] =
70+
new java.util.HashMap[String, String]()
71+
}
5972
/**
6073
* Binds a prefix to a provider. This method is not thread-safe and should be called
6174
* before the instance is used to expand values.
@@ -76,7 +89,9 @@ private[spark] class ConfigReader(conf: ConfigProvider) {
7689
/**
7790
* Reads a configuration key from the default provider, and apply variable substitution.
7891
*/
79-
def get(key: String): Option[String] = conf.get(key).map(substitute)
92+
def get(key: String): Option[String] = {
93+
Option(localProperties.get().get(key)).orElse(conf.get(key)).map(substitute)
94+
}
8095

8196
/**
8297
* Perform variable substitution on the given input string.

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3720,6 +3720,13 @@ class SQLConf extends Serializable with Logging {
37203720
getOrElse(throw new NoSuchElementException(key))
37213721
}
37223722

3723+
def setLocalProperty(key: String, value: String): Unit = {
3724+
if (value == null) {
3725+
reader.localProperties.get().remove(key)
3726+
} else {
3727+
reader.localProperties.get().put(key, value)
3728+
}
3729+
}
37233730
/**
37243731
* Return the value of Spark SQL configuration property for the given key. If the key is not set
37253732
* yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the
@@ -3795,8 +3802,13 @@ class SQLConf extends Serializable with Logging {
37953802
* Return all the configuration properties that have been set (i.e. not the default).
37963803
* This creates a new copy of the config properties in the form of a Map.
37973804
*/
3798-
def getAllConfs: immutable.Map[String, String] =
3799-
settings.synchronized { settings.asScala.toMap }
3805+
def getAllConfs: immutable.Map[String, String] = {
3806+
settings.synchronized {
3807+
var map = settings.asScala.toMap
3808+
reader.localProperties.get().asScala.foreach(entry => map += (entry._1 -> entry._2))
3809+
map
3810+
}
3811+
}
38003812

38013813
/**
38023814
* Return all the configuration definitions that have been defined in [[SQLConf]]. Each

0 commit comments

Comments
 (0)