Skip to content

Commit 8fa6568

Browse files
authored
Merge pull request #40 from terrytangyuan/r-wrappers-impl
Added R wrappers for released dataset ops
2 parents 2954f9c + 8db6829 commit 8fa6568

20 files changed

+467
-34
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,9 @@ __pycache__
1313
*.pbxproj
1414
*.xcworkspacedata
1515
.ipynb_checkpoints
16+
17+
# Auto-generated files by `R CMD check`
18+
tfio.Rcheck/
19+
tfio_*.tar.gz
20+
.Rproj.user
21+

R-package/.Rbuildignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
^.*\.Rproj$
22
^\.Rproj\.user$
33
^man-roxygen/
4+
scripts

R-package/DESCRIPTION

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Imports:
2424
reticulate (>= 1.10),
2525
tensorflow (>= 1.9),
2626
tfdatasets (>= 1.9),
27+
forge,
2728
magrittr,
2829
rlang,
2930
tidyselect,
@@ -32,7 +33,5 @@ Roxygen: list(markdown = TRUE)
3233
RoxygenNote: 6.1.0
3334
Suggests:
3435
testthat,
35-
knitr,
36-
tfestimators,
37-
keras
36+
knitr
3837
VignetteBuilder: knitr

R-package/NAMESPACE

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,36 @@
11
# Generated by roxygen2: do not edit by hand
22

33
export("%>%")
4+
export(dataset_batch)
5+
export(dataset_cache)
6+
export(dataset_concatenate)
7+
export(dataset_filter)
8+
export(dataset_flat_map)
9+
export(dataset_interleave)
10+
export(dataset_map)
11+
export(dataset_map_and_batch)
12+
export(dataset_padded_batch)
13+
export(dataset_prefetch)
14+
export(dataset_prefetch_to_device)
15+
export(dataset_prepare)
16+
export(dataset_repeat)
17+
export(dataset_shard)
18+
export(dataset_shuffle)
19+
export(dataset_shuffle_and_repeat)
20+
export(dataset_skip)
21+
export(dataset_take)
22+
export(ignite_dataset)
423
export(install_tensorflow)
24+
export(kafka_dataset)
25+
export(kinesis_dataset)
26+
export(next_batch)
27+
export(sequence_file_dataset)
528
export(tf)
629
export(tf_config)
30+
export(tf_version)
31+
export(until_out_of_range)
32+
export(with_dataset)
33+
import(forge)
734
import(rlang)
835
import(tfdatasets)
936
import(tidyselect)
@@ -16,3 +43,25 @@ importFrom(reticulate,tuple)
1643
importFrom(tensorflow,install_tensorflow)
1744
importFrom(tensorflow,tf)
1845
importFrom(tensorflow,tf_config)
46+
importFrom(tensorflow,tf_version)
47+
importFrom(tfdatasets,dataset_batch)
48+
importFrom(tfdatasets,dataset_cache)
49+
importFrom(tfdatasets,dataset_concatenate)
50+
importFrom(tfdatasets,dataset_filter)
51+
importFrom(tfdatasets,dataset_flat_map)
52+
importFrom(tfdatasets,dataset_interleave)
53+
importFrom(tfdatasets,dataset_map)
54+
importFrom(tfdatasets,dataset_map_and_batch)
55+
importFrom(tfdatasets,dataset_padded_batch)
56+
importFrom(tfdatasets,dataset_prefetch)
57+
importFrom(tfdatasets,dataset_prefetch_to_device)
58+
importFrom(tfdatasets,dataset_prepare)
59+
importFrom(tfdatasets,dataset_repeat)
60+
importFrom(tfdatasets,dataset_shard)
61+
importFrom(tfdatasets,dataset_shuffle)
62+
importFrom(tfdatasets,dataset_shuffle_and_repeat)
63+
importFrom(tfdatasets,dataset_skip)
64+
importFrom(tfdatasets,dataset_take)
65+
importFrom(tfdatasets,next_batch)
66+
importFrom(tfdatasets,until_out_of_range)
67+
importFrom(tfdatasets,with_dataset)

R-package/R/dataset_utils.R

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
as_tf_dataset <- function (dataset) {
2-
if (!is_dataset(dataset))
2+
if (!is_dataset(dataset))
33
stop("Provided dataset is not a TensorFlow Dataset")
4-
if (!inherits(dataset, "tf_dataset"))
4+
if (!inherits(dataset, "tf_dataset"))
55
class(dataset) <- c("tf_dataset", class(dataset))
66
dataset
77
}
88

99
is_dataset <- function (x) {
10-
inherits(x, "tensorflow.python.data.ops.dataset_ops.Dataset") || is_tfio_dataset(X)
10+
inherits(x, "tensorflow.python.data.ops.dataset_ops.Dataset") || is_tfio_dataset(x)
1111
}
1212

1313
is_tfio_dataset <- function(x) {
14-
"tensorflow_io" %in% class(x)
14+
grepl("tensorflow_io", class(x))
1515
}

R-package/R/hadoop_dataset.R

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#' Create a `SequenceFileDataset`.
2+
#'
3+
#' This function allows a user to read data from a hadoop sequence
4+
#' file. A sequence file consists of (key value) pairs sequentially. At
5+
#' the moment, `org.apache.hadoop.io.Text` is the only serialization type
6+
#' being supported, and there is no compression support.
7+
#'
8+
#' @param filenames A `tf.string` tensor containing one or more filenames.
9+
#'
10+
#' @export
11+
sequence_file_dataset <- function(filenames) {
12+
dataset <- tfio_lib$hadoop$SequenceFileDataset(filenames = filenames)
13+
as_tf_dataset(dataset)
14+
}

R-package/R/ignite_dataset.R

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#' Create a `IgniteDataset`.
2+
#'
3+
#' Apache Ignite is a memory-centric distributed database, caching, and
4+
#' processing platform for transactional, analytical, and streaming workloads,
5+
#' delivering in-memory speeds at petabyte scale. This contrib package
6+
#' contains an integration between Apache Ignite and TensorFlow. The
7+
#' integration is based on tf.data from TensorFlow side and Binary Client
8+
#' Protocol from Apache Ignite side. It allows to use Apache Ignite as a
9+
#' datasource for neural network training, inference and all other
10+
#' computations supported by TensorFlow. Ignite Dataset is based on Apache
11+
#' Ignite Binary Client Protocol.
12+
#'
13+
#' @param cache_name Cache name to be used as datasource.
14+
#' @param host Apache Ignite Thin Client host to be connected.
15+
#' @param port Apache Ignite Thin Client port to be connected.
16+
#' @param local Local flag that defines to query only local data.
17+
#' @param part Number of partitions to be queried.
18+
#' @param page_size Apache Ignite Thin Client page size.
19+
#' @param username Apache Ignite Thin Client authentication username.
20+
#' @param password Apache Ignite Thin Client authentication password.
21+
#' @param certfile File in PEM format containing the certificate as well as any
22+
#' number of CA certificates needed to establish the certificate's
23+
#' authenticity.
24+
#' @param keyfile File containing the private key (otherwise the private key
25+
#' will be taken from certfile as well).
26+
#' @param cert_password Password to be used if the private key is encrypted and
27+
#' a password is necessary.
28+
#'
29+
#' @export
30+
ignite_dataset <- function(
31+
cache_name,
32+
host = "localhost",
33+
port = 10800,
34+
local = FALSE,
35+
part = -1,
36+
page_size = 100,
37+
username = NULL,
38+
password = NULL,
39+
certfile = NULL,
40+
keyfile = NULL,
41+
cert_password = NULL) {
42+
dataset <- tfio_lib$ignite$IgniteDataset(
43+
cache_name = cache_name,
44+
host = host,
45+
port = cast_scalar_integer(port),
46+
local = cast_logical(local),
47+
part = cast_scalar_integer(part),
48+
page_size = cast_scalar_integer(page_size),
49+
username = cast_nullable_string(username),
50+
password = cast_nullable_string(password),
51+
certfile = cast_nullable_string(certfile),
52+
keyfile = cast_nullable_string(keyfile),
53+
cert_password = cast_nullable_string(cert_password)
54+
)
55+
as_tf_dataset(dataset)
56+
}

R-package/R/kafka_dataset.R

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#' Creates a `KafkaDataset`.
2+
#'
3+
#' @param topics A `tf.string` tensor containing one or more subscriptions, in
4+
#' the format of `[topic:partition:offset:length]`, by default length is -1
5+
#' for unlimited.
6+
#' @param servers A list of bootstrap servers.
7+
#' @param group The consumer group id.
8+
#' @param eof If True, the kafka reader will stop on EOF.
9+
#' @param timeout The timeout value for the Kafka Consumer to wait (in
10+
#' millisecond).
11+
#'
12+
#' @export
13+
kafka_dataset <- function(
14+
topics,
15+
servers = "localhost",
16+
group = "",
17+
eof = FALSE,
18+
timeout = 1000) {
19+
dataset <- tfio_lib$kafka$KafkaDataset(
20+
topics = topics,
21+
servers = servers,
22+
group = group,
23+
eof = cast_logical(eof),
24+
timeout = cast_scalar_integer(timeout)
25+
)
26+
as_tf_dataset(dataset)
27+
}

R-package/R/kinesis_dataset.R

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#' Creates a `KinesisDataset`.
2+
#'
3+
#' Kinesis is a managed service provided by AWS for data streaming.
4+
#' This dataset reads messages from Kinesis with each message presented
5+
#' as a `tf.string`.
6+
#'
7+
#' @param stream A `tf.string` tensor containing the name of the stream.
8+
#' @param shard A `tf.string` tensor containing the id of the shard.
9+
#' @param read_indefinitely If `True`, the Kinesis dataset will keep retry again
10+
#' on `EOF` after the `interval` period. If `False`, then the dataset will
11+
#' stop on `EOF`. The default value is `True`.
12+
#' @param interval The interval for the Kinesis Client to wait before it tries
13+
#' to get records again (in millisecond).
14+
#'
15+
#' @export
16+
kinesis_dataset <- function(
17+
stream,
18+
shard = "",
19+
read_indefinitely = TRUE,
20+
interval = 100000) {
21+
dataset <- tfio_lib$kinesis$KinesisDataset(
22+
stream = stream,
23+
shard = shard,
24+
read_indefinitely = cast_logical(read_indefinitely),
25+
interval = cast_scalar_integer(interval)
26+
)
27+
as_tf_dataset(dataset)
28+
}

R-package/R/package.R

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ NULL
1212
#' @import tidyselect
1313
#' @import rlang
1414
#' @import tfdatasets
15+
#' @import forge
1516
NULL
1617

1718
tfio_lib <- NULL
@@ -35,8 +36,7 @@ tfio_lib <- NULL
3536
}
3637
)
3738

38-
# TODO: This is commented out for now until we add the wrappers.
39-
# tfio_lib <<- import("tensorflow_io", delay_load = delay_load)
39+
tfio_lib <<- import("tensorflow_io", delay_load = delay_load)
4040

4141
}
4242

@@ -63,25 +63,3 @@ check_tensorflow_version <- function(displayed_warning) {
6363
.onDetach <- function(libpath) {
6464

6565
}
66-
67-
# Reusable function for registering a set of methods with S3 manually. The
68-
# methods argument is a list of character vectors, each of which has the form
69-
# c(package, genname, class).
70-
registerMethods <- function(methods) {
71-
lapply(methods, function(method) {
72-
pkg <- method[[1]]
73-
generic <- method[[2]]
74-
class <- method[[3]]
75-
func <- get(paste(generic, class, sep = "."))
76-
if (pkg %in% loadedNamespaces()) {
77-
registerS3method(generic, class, func, envir = asNamespace(pkg))
78-
}
79-
setHook(
80-
packageEvent(pkg, "onLoad"),
81-
function(...) {
82-
registerS3method(generic, class, func, envir = asNamespace(pkg))
83-
}
84-
)
85-
})
86-
}
87-

0 commit comments

Comments
 (0)