Skip to content

Commit c3135d0

Browse files
yanboliangshivaram
authored andcommitted
[SPARK-12393][SPARKR] Add read.text and write.text for SparkR
Add ```read.text``` and ```write.text``` for SparkR. cc sun-rui felixcheung shivaram Author: Yanbo Liang <[email protected]> Closes #10348 from yanboliang/spark-12393. (cherry picked from commit d1fea41) Signed-off-by: Shivaram Venkataraman <[email protected]>
1 parent bf3dca2 commit c3135d0

File tree

5 files changed

+82
-1
lines changed

5 files changed

+82
-1
lines changed

R/pkg/NAMESPACE

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ exportMethods("arrange",
9494
"withColumnRenamed",
9595
"write.df",
9696
"write.json",
97-
"write.parquet")
97+
"write.parquet",
98+
"write.text")
9899

99100
exportClasses("Column")
100101

@@ -274,6 +275,7 @@ export("as.DataFrame",
274275
"parquetFile",
275276
"read.df",
276277
"read.parquet",
278+
"read.text",
277279
"sql",
278280
"table",
279281
"tableNames",

R/pkg/R/DataFrame.R

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,34 @@ setMethod("saveAsParquetFile",
661661
write.parquet(x, path)
662662
})
663663

664+
#' write.text
665+
#'
666+
#' Saves the content of the DataFrame in a text file at the specified path.
667+
#' The DataFrame must have only one column of string type with the name "value".
668+
#' Each row becomes a new line in the output file.
669+
#'
670+
#' @param x A SparkSQL DataFrame
671+
#' @param path The directory where the file is saved
672+
#'
673+
#' @family DataFrame functions
674+
#' @rdname write.text
675+
#' @name write.text
676+
#' @export
677+
#' @examples
678+
#'\dontrun{
679+
#' sc <- sparkR.init()
680+
#' sqlContext <- sparkRSQL.init(sc)
681+
#' path <- "path/to/file.txt"
682+
#' df <- read.text(sqlContext, path)
683+
#' write.text(df, "/tmp/sparkr-tmp/")
684+
#'}
685+
setMethod("write.text",
686+
signature(x = "DataFrame", path = "character"),
687+
function(x, path) {
688+
write <- callJMethod(x@sdf, "write")
689+
invisible(callJMethod(write, "text", path))
690+
})
691+
664692
#' Distinct
665693
#'
666694
#' Return a new DataFrame containing the distinct rows in this DataFrame.

R/pkg/R/SQLContext.R

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,32 @@ parquetFile <- function(sqlContext, ...) {
295295
dataFrame(sdf)
296296
}
297297

298+
#' Create a DataFrame from a text file.
299+
#'
300+
#' Loads a text file and returns a DataFrame with a single string column named "value".
301+
#' Each line in the text file is a new row in the resulting DataFrame.
302+
#'
303+
#' @param sqlContext SQLContext to use
304+
#' @param path Path of file to read. A vector of multiple paths is allowed.
305+
#' @return DataFrame
306+
#' @rdname read.text
307+
#' @name read.text
308+
#' @export
309+
#' @examples
310+
#'\dontrun{
311+
#' sc <- sparkR.init()
312+
#' sqlContext <- sparkRSQL.init(sc)
313+
#' path <- "path/to/file.txt"
314+
#' df <- read.text(sqlContext, path)
315+
#' }
316+
read.text <- function(sqlContext, path) {
317+
# Allow the user to have a more flexible definiton of the text file path
318+
paths <- as.list(suppressWarnings(normalizePath(path)))
319+
read <- callJMethod(sqlContext, "read")
320+
sdf <- callJMethod(read, "text", paths)
321+
dataFrame(sdf)
322+
}
323+
298324
#' SQL Query
299325
#'
300326
#' Executes a SQL query using Spark, returning the result as a DataFrame.

R/pkg/R/generics.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,10 @@ setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet")
549549
#' @export
550550
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
551551

552+
#' @rdname write.text
553+
#' @export
554+
setGeneric("write.text", function(x, path) { standardGeneric("write.text") })
555+
552556
#' @rdname schema
553557
#' @export
554558
setGeneric("schema", function(x) { standardGeneric("schema") })

R/pkg/inst/tests/testthat/test_sparkSQL.R

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1493,6 +1493,27 @@ test_that("read/write Parquet files", {
14931493
unlink(parquetPath4)
14941494
})
14951495

1496+
test_that("read/write text files", {
1497+
# Test write.df and read.df
1498+
df <- read.df(sqlContext, jsonPath, "text")
1499+
expect_is(df, "DataFrame")
1500+
expect_equal(colnames(df), c("value"))
1501+
expect_equal(count(df), 3)
1502+
textPath <- tempfile(pattern = "textPath", fileext = ".txt")
1503+
write.df(df, textPath, "text", mode="overwrite")
1504+
1505+
# Test write.text and read.text
1506+
textPath2 <- tempfile(pattern = "textPath2", fileext = ".txt")
1507+
write.text(df, textPath2)
1508+
df2 <- read.text(sqlContext, c(textPath, textPath2))
1509+
expect_is(df2, "DataFrame")
1510+
expect_equal(colnames(df2), c("value"))
1511+
expect_equal(count(df2), count(df) * 2)
1512+
1513+
unlink(textPath)
1514+
unlink(textPath2)
1515+
})
1516+
14961517
test_that("describe() and summarize() on a DataFrame", {
14971518
df <- read.json(sqlContext, jsonPath)
14981519
stats <- describe(df, "age")

0 commit comments

Comments
 (0)