Skip to content

[SPARK-13817][SQL][MINOR] Renames Dataset.newDataFrame to Dataset.ofRows #11889

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap)
Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation()))
Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation()))
}

/**
Expand Down Expand Up @@ -176,7 +176,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap)
Dataset.newDataFrame(sqlContext, StreamingRelation(dataSource.createSource()))
Dataset.ofRows(sqlContext, StreamingRelation(dataSource.createSource()))
}

/**
Expand Down Expand Up @@ -376,7 +376,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
parsedOptions)
}

Dataset.newDataFrame(
Dataset.ofRows(
sqlContext,
LogicalRDD(
schema.toAttributes,
Expand Down Expand Up @@ -424,7 +424,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* @since 1.4.0
*/
def table(tableName: String): DataFrame = {
Dataset.newDataFrame(sqlContext,
Dataset.ofRows(sqlContext,
sqlContext.sessionState.catalog.lookupRelation(
sqlContext.sessionState.sqlParser.parseTableIdentifier(tableName)))
}
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[sql] object Dataset {
new Dataset(sqlContext, logicalPlan, implicitly[Encoder[T]])
}

def newDataFrame(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
def ofRows(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
val qe = sqlContext.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sqlContext, logicalPlan, RowEncoder(qe.analyzed.schema))
Expand Down Expand Up @@ -2330,7 +2330,7 @@ class Dataset[T] private[sql](

/** A convenient function to wrap a logical plan and produce a DataFrame. */
@inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = {
Dataset.newDataFrame(sqlContext, logicalPlan)
Dataset.ofRows(sqlContext, logicalPlan)
}

/** A convenient function to wrap a logical plan and produce a Dataset. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class KeyValueGroupedDataset[K, V] private[sql](

private def groupedData = {
new RelationalGroupedDataset(
Dataset.newDataFrame(sqlContext, logicalPlan),
Dataset.ofRows(sqlContext, logicalPlan),
groupingAttributes,
RelationalGroupedDataset.GroupByType)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@ class RelationalGroupedDataset protected[sql](

groupType match {
case RelationalGroupedDataset.GroupByType =>
Dataset.newDataFrame(
Dataset.ofRows(
df.sqlContext, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
case RelationalGroupedDataset.RollupType =>
Dataset.newDataFrame(
Dataset.ofRows(
df.sqlContext, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan))
case RelationalGroupedDataset.CubeType =>
Dataset.newDataFrame(
Dataset.ofRows(
df.sqlContext, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan))
case RelationalGroupedDataset.PivotType(pivotCol, values) =>
val aliasedGrps = groupingExprs.map(alias)
Dataset.newDataFrame(
Dataset.ofRows(
df.sqlContext, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan))
}
}
Expand Down
26 changes: 13 additions & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ class SQLContext private[sql](
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
val attributeSeq = schema.toAttributes
val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType))
Dataset.newDataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self))
Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRDD)(self))
}

/**
Expand All @@ -370,7 +370,7 @@ class SQLContext private[sql](
SQLContext.setActive(self)
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
val attributeSeq = schema.toAttributes
Dataset.newDataFrame(self, LocalRelation.fromProduct(attributeSeq, data))
Dataset.ofRows(self, LocalRelation.fromProduct(attributeSeq, data))
}

/**
Expand All @@ -380,7 +380,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
Dataset.newDataFrame(this, LogicalRelation(baseRelation))
Dataset.ofRows(this, LogicalRelation(baseRelation))
}

/**
Expand Down Expand Up @@ -435,7 +435,7 @@ class SQLContext private[sql](
rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
}
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
Dataset.newDataFrame(this, logicalPlan)
Dataset.ofRows(this, logicalPlan)
}


Expand Down Expand Up @@ -470,7 +470,7 @@ class SQLContext private[sql](
// TODO: use MutableProjection when rowRDD is another DataFrame and the applied
// schema differs from the existing schema on any field data type.
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
Dataset.newDataFrame(this, logicalPlan)
Dataset.ofRows(this, logicalPlan)
}

/**
Expand Down Expand Up @@ -498,7 +498,7 @@ class SQLContext private[sql](
*/
@DeveloperApi
def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = {
Dataset.newDataFrame(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala))
Dataset.ofRows(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala))
}

/**
Expand All @@ -517,7 +517,7 @@ class SQLContext private[sql](
val localBeanInfo = Introspector.getBeanInfo(Utils.classForName(className))
SQLContext.beansToRows(iter, localBeanInfo, attributeSeq)
}
Dataset.newDataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this))
Dataset.ofRows(this, LogicalRDD(attributeSeq, rowRdd)(this))
}

/**
Expand Down Expand Up @@ -545,7 +545,7 @@ class SQLContext private[sql](
val className = beanClass.getName
val beanInfo = Introspector.getBeanInfo(beanClass)
val rows = SQLContext.beansToRows(data.asScala.iterator, beanInfo, attrSeq)
Dataset.newDataFrame(self, LocalRelation(attrSeq, rows.toSeq))
Dataset.ofRows(self, LocalRelation(attrSeq, rows.toSeq))
}

/**
Expand Down Expand Up @@ -763,7 +763,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def sql(sqlText: String): DataFrame = {
Dataset.newDataFrame(this, parseSql(sqlText))
Dataset.ofRows(this, parseSql(sqlText))
}

/**
Expand All @@ -786,7 +786,7 @@ class SQLContext private[sql](
}

private def table(tableIdent: TableIdentifier): DataFrame = {
Dataset.newDataFrame(this, sessionState.catalog.lookupRelation(tableIdent))
Dataset.ofRows(this, sessionState.catalog.lookupRelation(tableIdent))
}

/**
Expand All @@ -798,7 +798,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tables(): DataFrame = {
Dataset.newDataFrame(this, ShowTablesCommand(None))
Dataset.ofRows(this, ShowTablesCommand(None))
}

/**
Expand All @@ -810,7 +810,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tables(databaseName: String): DataFrame = {
Dataset.newDataFrame(this, ShowTablesCommand(Some(databaseName)))
Dataset.ofRows(this, ShowTablesCommand(Some(databaseName)))
}

/**
Expand Down Expand Up @@ -875,7 +875,7 @@ class SQLContext private[sql](
schema: StructType): DataFrame = {

val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, schema).asInstanceOf[InternalRow])
Dataset.newDataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
Dataset.ofRows(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ case class CacheTableCommand(

override def run(sqlContext: SQLContext): Seq[Row] = {
plan.foreach { logicalPlan =>
sqlContext.registerDataFrameAsTable(Dataset.newDataFrame(sqlContext, logicalPlan), tableName)
sqlContext.registerDataFrameAsTable(Dataset.ofRows(sqlContext, logicalPlan), tableName)
}
sqlContext.cacheTable(tableName)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ case class DataSource(
}

def dataFrameBuilder(files: Array[String]): DataFrame = {
Dataset.newDataFrame(
Dataset.ofRows(
sqlContext,
LogicalRelation(
DataSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[sql] case class InsertIntoDataSource(

override def run(sqlContext: SQLContext): Seq[Row] = {
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
val data = Dataset.newDataFrame(sqlContext, query)
val data = Dataset.ofRows(sqlContext, query)
// Apply the schema of the existing table to the new data.
val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
relation.insert(df, overwrite)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
val partitionSet = AttributeSet(partitionColumns)
val dataColumns = query.output.filterNot(partitionSet.contains)

val queryExecution = Dataset.newDataFrame(sqlContext, query).queryExecution
val queryExecution = Dataset.ofRows(sqlContext, query).queryExecution
SQLExecution.withNewExecutionId(sqlContext, queryExecution) {
val relation =
WriteRelation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ case class CreateTempTableUsing(
options = options)
sqlContext.sessionState.catalog.registerTable(
tableIdent,
Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan)
Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan)

Seq.empty[Row]
}
Expand All @@ -116,7 +116,7 @@ case class CreateTempTableUsingAsSelect(
query: LogicalPlan) extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
val df = Dataset.newDataFrame(sqlContext, query)
val df = Dataset.ofRows(sqlContext, query)
val dataSource = DataSource(
sqlContext,
className = provider,
Expand All @@ -126,7 +126,7 @@ case class CreateTempTableUsingAsSelect(
val result = dataSource.write(mode, df)
sqlContext.sessionState.catalog.registerTable(
tableIdent,
Dataset.newDataFrame(sqlContext, LogicalRelation(result)).logicalPlan)
Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan)

Seq.empty[Row]
}
Expand All @@ -147,7 +147,7 @@ case class RefreshTable(tableIdent: TableIdentifier)
if (isCached) {
// Create a data frame to represent the table.
// TODO: Use uncacheTable once it supports database name.
val df = Dataset.newDataFrame(sqlContext, logicalPlan)
val df = Dataset.ofRows(sqlContext, logicalPlan)
// Uncache the logicalPlan.
sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
// Cache it again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,6 @@ private[sql] object FrequentItems extends Logging {
StructField(v._1 + "_freqItems", ArrayType(v._2, false))
}
val schema = StructType(outputCols).toAttributes
Dataset.newDataFrame(df.sqlContext, LocalRelation.fromExternalRows(schema, Seq(resultRow)))
Dataset.ofRows(df.sqlContext, LocalRelation.fromExternalRows(schema, Seq(resultRow)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,6 @@ private[sql] object StatFunctions extends Logging {
}
val schema = StructType(StructField(tableName, StringType) +: headerNames)

Dataset.newDataFrame(df.sqlContext, LocalRelation(schema.toAttributes, table)).na.fill(0.0)
Dataset.ofRows(df.sqlContext, LocalRelation(schema.toAttributes, table)).na.fill(0.0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class StreamExecution(
val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000
logDebug(s"Optimized batch in ${optimizerTime}ms")

val nextBatch = Dataset.newDataFrame(sqlContext, newPlan)
val nextBatch = Dataset.ofRows(sqlContext, newPlan)
sink.addBatch(currentBatchId - 1, nextBatch)

awaitBatchLock.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}

def toDF()(implicit sqlContext: SQLContext): DataFrame = {
Dataset.newDataFrame(sqlContext, logicalPlan)
Dataset.ofRows(sqlContext, logicalPlan)
}

def addData(data: A*): Offset = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ object functions {
* @since 1.5.0
*/
def broadcast(df: DataFrame): DataFrame = {
Dataset.newDataFrame(df.sqlContext, BroadcastHint(df.logicalPlan))
Dataset.ofRows(df.sqlContext, BroadcastHint(df.logicalPlan))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
assert(e2.getMessage.contains("Inserting into an RDD-based table is not allowed."))

// error case: insert into an OneRowRelation
Dataset.newDataFrame(sqlContext, OneRowRelation).registerTempTable("one_row")
Dataset.ofRows(sqlContext, OneRowRelation).registerTempTable("one_row")
val e3 = intercept[AnalysisException] {
insertion.write.insertInto("one_row")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ trait StreamTest extends QueryTest with Timeouts {
}

implicit class RichSource(s: Source) {
def toDF(): DataFrame = Dataset.newDataFrame(sqlContext, StreamingRelation(s))
def toDF(): DataFrame = Dataset.ofRows(sqlContext, StreamingRelation(s))

def toDS[A: Encoder](): Dataset[A] = Dataset(sqlContext, StreamingRelation(s))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
l.copy(relation =
r.copy(bucketSpec = Some(BucketSpec(numBuckets = buckets, "c1" :: Nil, Nil))))
}
Dataset.newDataFrame(sqlContext, bucketed)
Dataset.ofRows(sqlContext, bucketed)
} else {
df
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ private[sql] trait SQLTestUtils
* way to construct [[DataFrame]] directly out of local data without relying on implicits.
*/
protected implicit def logicalPlanToSparkQuery(plan: LogicalPlan): DataFrame = {
Dataset.newDataFrame(sqlContext, plan)
Dataset.ofRows(sqlContext, plan)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ case class CreateMetastoreDataSourceAsSelect(
createMetastoreTable = true
}

val data = Dataset.newDataFrame(hiveContext, query)
val data = Dataset.ofRows(hiveContext, query)
val df = existingSchema match {
// If we are inserting into an existing table, just use the existing schema.
case Some(s) => sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ abstract class SQLBuilderTest extends QueryTest with TestHiveSingleton {
""".stripMargin)
}

checkAnswer(sqlContext.sql(generatedSQL), Dataset.newDataFrame(sqlContext, plan))
checkAnswer(sqlContext.sql(generatedSQL), Dataset.ofRows(sqlContext, plan))
}

protected def checkSQL(df: DataFrame, expectedSQL: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue
// Create a new df to make sure its physical operator picks up
// spark.sql.TungstenAggregate.testFallbackStartsAt.
// todo: remove it?
val newActual = Dataset.newDataFrame(sqlContext, actual.logicalPlan)
val newActual = Dataset.ofRows(sqlContext, actual.logicalPlan)

QueryTest.checkAnswer(newActual, expectedAnswer) match {
case Some(errorMessage) =>
Expand Down