Skip to content

Commit 5204dfd

Browse files
hn5092Wayne1c
authored andcommitted
apache#11 SQLConf support thread local prop
1 parent 529ae3e commit 5204dfd

File tree

2 files changed

+30
-3
lines changed

2 files changed

+30
-3
lines changed

core/src/main/scala/org/apache/spark/internal/config/ConfigReader.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import java.util.{Map => JMap}
2222
import scala.collection.mutable.HashMap
2323
import scala.util.matching.Regex
2424

25+
import org.apache.commons.lang3.SerializationUtils
26+
2527
private object ConfigReader {
2628

2729
private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r
@@ -56,6 +58,17 @@ private[spark] class ConfigReader(conf: ConfigProvider) {
5658
bindEnv(new EnvProvider())
5759
bindSystem(new SystemProvider())
5860

61+
protected[spark] val localProperties =
62+
new InheritableThreadLocal[java.util.HashMap[String, String]] {
63+
override protected def childValue(parent: java.util.HashMap[String, String]):
64+
java.util.HashMap[String, String] = {
65+
// Note: make a clone such that changes in the parent properties aren't reflected in
66+
// the those of the children threads, which has confusing semantics (SPARK-10563).
67+
SerializationUtils.clone(parent)
68+
}
69+
override protected def initialValue(): java.util.HashMap[String, String] =
70+
new java.util.HashMap[String, String]()
71+
}
5972
/**
6073
* Binds a prefix to a provider. This method is not thread-safe and should be called
6174
* before the instance is used to expand values.
@@ -76,7 +89,9 @@ private[spark] class ConfigReader(conf: ConfigProvider) {
7689
/**
7790
* Reads a configuration key from the default provider, and apply variable substitution.
7891
*/
79-
def get(key: String): Option[String] = conf.get(key).map(substitute)
92+
def get(key: String): Option[String] = {
93+
Option(localProperties.get().get(key)).orElse(conf.get(key)).map(substitute)
94+
}
8095

8196
/**
8297
* Perform variable substitution on the given input string.

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,6 +1121,13 @@ class SQLConf extends Serializable with Logging {
11211121
getOrElse(throw new NoSuchElementException(key))
11221122
}
11231123

1124+
def setLocalProperty(key: String, value: String): Unit = {
1125+
if (value == null) {
1126+
reader.localProperties.get().remove(key)
1127+
} else {
1128+
reader.localProperties.get().put(key, value)
1129+
}
1130+
}
11241131
/**
11251132
* Return the value of Spark SQL configuration property for the given key. If the key is not set
11261133
* yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the
@@ -1168,8 +1175,13 @@ class SQLConf extends Serializable with Logging {
11681175
* Return all the configuration properties that have been set (i.e. not the default).
11691176
* This creates a new copy of the config properties in the form of a Map.
11701177
*/
1171-
def getAllConfs: immutable.Map[String, String] =
1172-
settings.synchronized { settings.asScala.toMap }
1178+
def getAllConfs: immutable.Map[String, String] = {
1179+
settings.synchronized {
1180+
var map = settings.asScala.toMap
1181+
reader.localProperties.get().asScala.foreach(entry => map += (entry._1 -> entry._2))
1182+
map
1183+
}
1184+
}
11731185

11741186
/**
11751187
* Return all the configuration definitions that have been defined in [[SQLConf]]. Each

0 commit comments

Comments
 (0)