Skip to content

[SPARK-4205][SQL] Timestamp and Date classes which work in the catalyst DSL. #3066

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 80 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
495eacd
Adding Timestamp and Date classes which support the standard comparison
culler Nov 3, 2014
0c389a7
Correcting a typo in the documentation.
culler Nov 3, 2014
d6e4c59
Close #2971.
rxin Nov 3, 2014
001acc4
[SPARK-4177][Doc]update build doc since JDBC/CLI support hive 13 now
scwf Nov 3, 2014
76386e1
[SPARK-4163][Core][WebUI] Send the fetch failure message back to Web UI
zsxwing Nov 3, 2014
7c8b2c0
Correcting the bugs and issues pointed out in liancheng's very helpfu…
culler Nov 3, 2014
2aca97c
[EC2] Factor out Mesos spark-ec2 branch
nchammas Nov 3, 2014
3cca196
[SPARK-4148][PySpark] fix seed distribution and add some tests for rd…
mengxr Nov 3, 2014
75690de
Make implicit conversions for Literal op Symbol return a specific type,
culler Nov 3, 2014
df607da
[SPARK-4211][Build] Fixes hive.version in Maven profile hive-0.13.1
coderfi Nov 3, 2014
2b6e1ce
[SPARK-4207][SQL] Query which has syntax like 'not like' is not worki…
ravipesala Nov 3, 2014
24544fb
[SPARK-3594] [PySpark] [SQL] take more rows to infer schema or sampling
davies Nov 3, 2014
c238fb4
[SPARK-4202][SQL] Simple DSL support for Scala UDF
liancheng Nov 3, 2014
e83f13e
[SPARK-4152] [SQL] Avoid data change in CTAS while table already existed
chenghao-intel Nov 3, 2014
25bef7e
[SQL] More aggressive defaults
marmbrus Nov 3, 2014
2812815
SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader in…
sryza Nov 3, 2014
15b58a2
[SQL] Convert arguments to Scala UDFs
marmbrus Nov 4, 2014
97a466e
[SPARK-4168][WebUI] web statges number should show correctly when sta…
liyezhang556520 Nov 4, 2014
4f035dd
[SPARK-611] Display executor thread dumps in web UI
JoshRosen Nov 4, 2014
c5912ec
[FIX][MLLIB] fix seed in BaggedPointSuite
mengxr Nov 4, 2014
04450d1
[SPARK-4192][SQL] Internal API for Python UDT
mengxr Nov 4, 2014
39b8ad1
Reversed random line permutations. Eliminated all getters and setter…
culler Nov 4, 2014
3dd0da9
A couple more pointless changes undone.
culler Nov 4, 2014
1a9c6cd
[SPARK-3573][MLLIB] Make MLlib's Vector compatible with SQL's SchemaRDD
mengxr Nov 4, 2014
9bdc841
[SPARK-4163][Core] Add a backward compatibility test for FetchFailed
zsxwing Nov 4, 2014
b671ce0
[SPARK-4166][Core] Add a backward compatibility test for ExecutorLost…
zsxwing Nov 4, 2014
e4f4263
[SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer…
Nov 4, 2014
bcecd73
fixed MLlib Naive-Bayes java example bug
dkobylarz Nov 4, 2014
f90ad5d
[Spark-4060] [MLlib] exposing special rdd functions to the public
Nov 4, 2014
5e73138
[SPARK-2938] Support SASL authentication in NettyBlockTransferService
aarondav Nov 5, 2014
515abb9
[SQL] Add String option for DSL AS
marmbrus Nov 5, 2014
c8abddc
[SPARK-3964] [MLlib] [PySpark] add Hypothesis test Python API
Nov 5, 2014
5f13759
[SPARK-4029][Streaming] Update streaming driver to reliably save and …
tdas Nov 5, 2014
73d8017
In tests where the scalatest assert conversion collides with the new DSL
culler Nov 5, 2014
a911240
Removed Date and Timestamp from NativeTypes as this would force changes
culler Nov 5, 2014
5b3b6f6
[SPARK-4197] [mllib] GradientBoosting API cleanup and examples in Sca…
jkbradley Nov 5, 2014
3a9e31b
Added tests for the features in this PR. Added Date and Timestamp as
culler Nov 5, 2014
4c42986
[SPARK-4242] [Core] Add SASL to external shuffle service
aarondav Nov 5, 2014
a46497e
[SPARK-3984] [SPARK-3983] Fix incorrect scheduler delay and display t…
kayousterhout Nov 5, 2014
f37817b
SPARK-4222 [CORE] use readFully in FixedLengthBinaryRecordReader
industrial-sloth Nov 5, 2014
61a5cce
[SPARK-3797] Run external shuffle service in Yarn NM
Nov 5, 2014
868cd4c
SPARK-4040. Update documentation to exemplify use of local (n) value,…
Nov 5, 2014
f7ac8c2
SPARK-3223 runAsSparkUser cannot change HDFS write permission properl…
jongyoul Nov 5, 2014
cb0eae3
[SPARK-4158] Fix for missing resources.
brndnmtthws Nov 6, 2014
c315d13
[SPARK-4254] [mllib] MovieLensALS bug fix
jkbradley Nov 6, 2014
3d2b5bc
[SPARK-4262][SQL] add .schemaRDD to JavaSchemaRDD
mengxr Nov 6, 2014
db45f5a
[SPARK-4137] [EC2] Don't change working dir on user
nchammas Nov 6, 2014
5f27ae1
[SPARK-4255] Fix incorrect table striping
kayousterhout Nov 6, 2014
b41a39e
[SPARK-4186] add binaryFiles and binaryRecords in Python
Nov 6, 2014
6e87d72
Removed accidentlay extraneous import from Row.scala.
culler Nov 6, 2014
a5205b5
... and removed another extraneous import.
culler Nov 6, 2014
76a18dc
Tiny style issue.
culler Nov 6, 2014
b6a4374
Cleaning up comments.
culler Nov 6, 2014
0dc0ff0
One last comment clarification.
culler Nov 6, 2014
23eaf0e
[SPARK-4264] Completion iterator should only invoke callback once
aarondav Nov 6, 2014
d15c6e9
[SPARK-4249][GraphX]fix a problem of EdgePartitionBuilder in Graphx
lianhuiwang Nov 6, 2014
0d2e389
Adding a test which appeared after the PR and uses assert(X === Y).
culler Nov 6, 2014
470881b
[HOT FIX] Make distribution fails
Nov 6, 2014
96136f2
[SPARK-3797] Minor addendum to Yarn shuffle service
Nov 7, 2014
6e9ef10
[SPARK-4277] Support external shuffle service on Standalone Worker
aarondav Nov 7, 2014
f165b2b
[SPARK-4188] [Core] Perform network-level retry of shuffle file fetches
aarondav Nov 7, 2014
48a19a6
[SPARK-4236] Cleanup removed applications' files in shuffle service
aarondav Nov 7, 2014
3abdb1b
[SPARK-4204][Core][WebUI] Change Utils.exceptionString to contain the…
zsxwing Nov 7, 2014
c3f9ce1
Removing new test until I rebase the repository.
culler Nov 7, 2014
908fc6a
Adding Timestamp and Date classes which support the standard comparison
culler Nov 3, 2014
dc2ed72
Correcting a typo in the documentation.
culler Nov 3, 2014
b003619
Correcting the bugs and issues pointed out in liancheng's very helpfu…
culler Nov 3, 2014
a006ddb
Make implicit conversions for Literal op Symbol return a specific type,
culler Nov 3, 2014
1935289
Reversed random line permutations. Eliminated all getters and setter…
culler Nov 4, 2014
d59e5d9
A couple more pointless changes undone.
culler Nov 4, 2014
43406fe
In tests where the scalatest assert conversion collides with the new DSL
culler Nov 5, 2014
c304b16
Removed Date and Timestamp from NativeTypes as this would force changes
culler Nov 5, 2014
32df474
Added tests for the features in this PR. Added Date and Timestamp as
culler Nov 5, 2014
d52e6d7
Removed accidentlay extraneous import from Row.scala.
culler Nov 6, 2014
45f9478
... and removed another extraneous import.
culler Nov 6, 2014
f126042
Tiny style issue.
culler Nov 6, 2014
2ec6f6b
Cleaning up comments.
culler Nov 6, 2014
3a14915
One last comment clarification.
culler Nov 6, 2014
d0a27ab
One more test to fix after rebasing.
culler Nov 7, 2014
098bb5d
Rebasing PR to current master.
culler Nov 7, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ SUCH DAMAGE.


========================================================================
For Timsort (core/src/main/java/org/apache/spark/util/collection/Sorter.java):
For Timsort (core/src/main/java/org/apache/spark/util/collection/TimSort.java):
========================================================================
Copyright (C) 2008 The Android Open Source Project

Expand All @@ -771,6 +771,25 @@ See the License for the specific language governing permissions and
limitations under the License.


========================================================================
For LimitedInputStream
(network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java):
========================================================================
Copyright (C) 2007 The Guava Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


========================================================================
BSD-style licenses
========================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ $(function() {
var column = "table ." + $(this).attr("name");
$(column).hide();
});
// Stripe table rows after rows have been hidden to ensure correct striping.
stripeTables();

$("input:checkbox").click(function() {
var column = "table ." + $(this).attr("name");
Expand Down
5 changes: 0 additions & 5 deletions core/src/main/resources/org/apache/spark/ui/static/table.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,3 @@ function stripeTables() {
});
});
}

/* Stripe all tables after pages finish loading. */
$(function() {
stripeTables();
});
14 changes: 14 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ pre {
border: none;
}

.stacktrace-details {
max-height: 300px;
overflow-y: auto;
margin: 0;
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
}

.stacktrace-details.collapsed {
max-height: 0;
padding-top: 0;
padding-bottom: 0;
border: none;
}

span.expand-additional-metrics {
cursor: pointer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// Lower and upper bounds on the number of executors. These are required.
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
verifyBounds()

// How long there must be backlogged tasks for before an addition is triggered
private val schedulerBacklogTimeout = conf.getLong(
Expand All @@ -77,9 +76,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)

// How long an executor must be idle for before it is removed
private val removeThresholdSeconds = conf.getLong(
private val executorIdleTimeout = conf.getLong(
"spark.dynamicAllocation.executorIdleTimeout", 600)

// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)

validateSettings()

// Number of executors to add in the next round
private var numExecutorsToAdd = 1

Expand All @@ -103,17 +107,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// Polling loop interval (ms)
private val intervalMillis: Long = 100

// Whether we are testing this class. This should only be used internally.
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)

// Clock used to schedule when executors should be added and removed
private var clock: Clock = new RealClock

/**
* Verify that the lower and upper bounds on the number of executors are valid.
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
*/
private def verifyBounds(): Unit = {
private def validateSettings(): Unit = {
if (minNumExecutors < 0 || maxNumExecutors < 0) {
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
}
Expand All @@ -124,6 +125,22 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
}
if (schedulerBacklogTimeout <= 0) {
throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
}
if (sustainedSchedulerBacklogTimeout <= 0) {
throw new SparkException(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
}
if (executorIdleTimeout <= 0) {
throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!")
}
// Require external shuffle service for dynamic allocation
// Otherwise, we may lose shuffle files when killing executors
if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) {
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
}

/**
Expand Down Expand Up @@ -254,7 +271,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
val removeRequestAcknowledged = testing || sc.killExecutor(executorId)
if (removeRequestAcknowledged) {
logInfo(s"Removing executor $executorId because it has been idle for " +
s"$removeThresholdSeconds seconds (new desired total will be ${numExistingExecutors - 1})")
s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")
executorsPendingToRemove.add(executorId)
true
} else {
Expand Down Expand Up @@ -329,8 +346,8 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
private def onExecutorIdle(executorId: String): Unit = synchronized {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
s"scheduled to run on the executor (to expire in $removeThresholdSeconds seconds)")
removeTimes(executorId) = clock.getTimeMillis + removeThresholdSeconds * 1000
s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
}
}

Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.net.{Authenticator, PasswordAuthentication}
import org.apache.hadoop.io.Text

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.network.sasl.SecretKeyHolder

/**
* Spark class responsible for security.
Expand Down Expand Up @@ -84,7 +85,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
* Authenticator installed in the SecurityManager to how it does the authentication
* and in this case gets the user name and password from the request.
*
* - ConnectionManager -> The Spark ConnectionManager uses java nio to asynchronously
* - BlockTransferService -> The Spark BlockTransferServices uses java nio to asynchronously
* exchange messages. For this we use the Java SASL
* (Simple Authentication and Security Layer) API and again use DIGEST-MD5
* as the authentication mechanism. This means the shared secret is not passed
Expand All @@ -98,7 +99,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
* of protection they want. If we support those, the messages will also have to
* be wrapped and unwrapped via the SaslServer/SaslClient.wrap/unwrap API's.
*
* Since the connectionManager does asynchronous messages passing, the SASL
* Since the NioBlockTransferService does asynchronous messages passing, the SASL
* authentication is a bit more complex. A ConnectionManager can be both a client
* and a Server, so for a particular connection is has to determine what to do.
* A ConnectionId was added to be able to track connections and is used to
Expand All @@ -107,6 +108,10 @@ import org.apache.spark.deploy.SparkHadoopUtil
* and waits for the response from the server and does the handshake before sending
* the real message.
*
* The NettyBlockTransferService ensures that SASL authentication is performed
* synchronously prior to any other communication on a connection. This is done in
* SaslClientBootstrap on the client side and SaslRpcHandler on the server side.
*
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
* can be used. Yarn requires a specific AmIpFilter be installed for security to work
* properly. For non-Yarn deployments, users can write a filter to go through a
Expand Down Expand Up @@ -139,7 +144,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
* can take place.
*/

private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder {

// key used to store the spark secret in the Hadoop UGI
private val sparkSecretLookupKey = "sparkCookie"
Expand Down Expand Up @@ -337,4 +342,8 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
* @return the secret key as a String if authentication is enabled, otherwise returns null
*/
def getSecretKey(): String = secretKey

// Default SecurityManager only has a single secret key, so ignore appId.
override def getSaslUser(appId: String): String = getSaslUser()
override def getSecretKey(appId: String): String = getSecretKey()
}
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
*/
getAll.filter { case (k, _) => isAkkaConf(k) }

/**
* Returns the Spark application id, valid in the Driver after TaskScheduler registration and
* from the start in the Executor.
*/
def getAppId: String = get("spark.app.id")

/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)

Expand Down
35 changes: 32 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import scala.language.implicitConversions

import java.io._
import java.net.URI
import java.util.Arrays
import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Properties, UUID}
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
import scala.collection.generic.Growable
Expand All @@ -41,6 +40,7 @@ import akka.actor.Props
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.TriggerThreadDump
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
Expand All @@ -51,7 +51,7 @@ import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
import org.apache.spark.util._

/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
Expand Down Expand Up @@ -313,6 +313,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
val applicationId: String = taskScheduler.applicationId()
conf.set("spark.app.id", applicationId)

env.blockManager.initialize(applicationId)

val metricsSystem = env.metricsSystem

// The metrics system for Driver need to be set spark.app.id to app ID.
Expand Down Expand Up @@ -361,6 +363,29 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
}

/**
* Called by the web UI to obtain executor thread dumps. This method may be expensive.
* Logs an error and returns None if we failed to obtain a thread dump, which could occur due
* to an executor being dead or unresponsive or due to network issues while sending the thread
* dump message back to the driver.
*/
private[spark] def getExecutorThreadDump(executorId: String): Option[Array[ThreadStackTrace]] = {
try {
if (executorId == SparkContext.DRIVER_IDENTIFIER) {
Some(Utils.getThreadDump())
} else {
val (host, port) = env.blockManager.master.getActorSystemHostPortForExecutor(executorId).get
val actorRef = AkkaUtils.makeExecutorRef("ExecutorActor", conf, host, port, env.actorSystem)
Some(AkkaUtils.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump, actorRef,
AkkaUtils.numRetries(conf), AkkaUtils.retryWaitMs(conf), AkkaUtils.askTimeout(conf)))
}
} catch {
case e: Exception =>
logError(s"Exception getting thread dump from executor $executorId", e)
None
}
}

private[spark] def getLocalProperties: Properties = localProperties.get()

private[spark] def setLocalProperties(props: Properties) {
Expand Down Expand Up @@ -535,6 +560,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {


/**
* :: Experimental ::
*
* Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
* (useful for binary data)
*
Expand Down Expand Up @@ -577,6 +604,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
}

/**
* :: Experimental ::
*
* Load data from a flat binary file, assuming the length of each record is constant.
*
* @param path Directory to the input data files
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ object SparkEnv extends Logging {
val blockTransferService =
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
case "netty" =>
new NettyBlockTransferService(conf)
new NettyBlockTransferService(conf, securityManager)
case "nio" =>
new NioBlockTransferService(conf, securityManager)
}
Expand All @@ -285,8 +285,9 @@ object SparkEnv extends Logging {
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)

// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

Expand Down
Loading