|
| 1 | +# TensorFlow I/O Apache Arrow Datasets |
| 2 | + |
| 3 | +Apache Arrow is a standard for in-memory columnar data, see [here](https://arrow.apache.org) |
| 4 | +for more information on the project. An Arrow dataset makes it easy to bring in |
| 5 | +column-oriented data from other systems to TensorFlow using the following |
| 6 | +sources: |
| 7 | + |
| 8 | +## From a Pandas DataFrame |
| 9 | + |
| 10 | +An `ArrowDataset` can be made directly from an existing Pandas DataFrame, or |
| 11 | +pyarrow record batches, in a Python process. Tensor types and shapes can be |
| 12 | +inferred from the DataFrame, although currently only scalar and vector values |
| 13 | +with primitive types are supported. PyArrow must be installed to use this |
| 14 | +Dataset. Example usage: |
| 15 | + |
| 16 | +```python |
| 17 | +import tensorflow as tf |
| 18 | +from tensorflow_io.arrow import ArrowDataset |
| 19 | + |
| 20 | +# Assume `df` is an existing Pandas DataFrame |
| 21 | +dataset = ArrowDataset.from_pandas(df) |
| 22 | + |
| 23 | +iterator = dataset.make_one_shot_iterator() |
| 24 | +next_element = iterator.get_next() |
| 25 | + |
| 26 | +with tf.Session() as sess: |
| 27 | + for i in range(len(df)): |
| 28 | + print(sess.run(next_element)) |
| 29 | +``` |
| 30 | + |
| 31 | +NOTE: The entire DataFrame will be serialized to the Dataset and is not |
| 32 | +recommended for use with large amounts of data |
| 33 | + |
| 34 | +## From Arrow Feather Files |
| 35 | + |
| 36 | +Feather is a light-weight file format that provides a simple and efficient way |
| 37 | +to write Pandas DataFrames to disk, see [here](https://arrow.apache.org/docs/python/ipc.html#feather-format) |
| 38 | +for more information and limitations of the format. An `ArrowFeatherDataset` |
| 39 | +can be created to read one or more Feather files from the given pathnames. The |
| 40 | +following example shows how to write a feather file from a Pandas DataFrame, |
| 41 | +then read multiple files back as an `ArrowFeatherDataset`: |
| 42 | + |
| 43 | +```python |
| 44 | +from pyarrow.feather import write_feather |
| 45 | + |
| 46 | +# Assume `df` is an existing Pandas DataFrame with dtypes=(int32, float32) |
| 47 | +write_feather(df, '/path/to/a.feather') |
| 48 | +``` |
| 49 | + |
| 50 | +```python |
| 51 | +import tensorflow as tf |
| 52 | +from tensorflow_io.arrow import ArrowFeatherDataset |
| 53 | + |
| 54 | +# Each Feather file must have the same column types, here we use the above |
| 55 | +# DataFrame which has 2 columns with dtypes=(int32, float32) |
| 56 | +dataset = ArrowFeatherDataset( |
| 57 | + ['/path/to/a.feather', '/path/to/b.feather'], |
| 58 | + columns=(0, 1), |
| 59 | + output_types=(tf.int32, tf.float32), |
| 60 | + output_shapes=([], [])) |
| 61 | + |
| 62 | +iterator = dataset.make_one_shot_iterator() |
| 63 | +next_element = iterator.get_next() |
| 64 | + |
| 65 | +# This will iterate over each row of each file provided |
| 66 | +with tf.Session() as sess: |
| 67 | + while True: |
| 68 | + try: |
| 69 | + print(sess.run(next_element)) |
| 70 | + except tf.errors.OutOfRangeError: |
| 71 | + break |
| 72 | +``` |
| 73 | + |
| 74 | +An alternate constructor can also be used to infer output types and shapes from |
| 75 | +a given `pyarrow.Schema`, e.g. `dataset = ArrowFeatherDataset.from_schema(filenames, schema)` |
| 76 | + |
| 77 | +## From a Stream of Arrow Record Batches |
| 78 | + |
| 79 | +The `ArrowStreamDataset` provides a Dataset that will connect to a host over |
| 80 | +a socket that is serving Arrow record batches in the Arrow stream format. See |
| 81 | +[here](https://arrow.apache.org/docs/python/ipc.html#writing-and-reading-streams) |
| 82 | +for more on the stream format. The following example will create an |
| 83 | +`ArrowStreamDataset` that will connect to a host that is serving an Arrow |
| 84 | +stream of record batches with 2 columns of dtypes=(int32, float32): |
| 85 | + |
| 86 | +```python |
| 87 | +import tensorflow as tf |
| 88 | +from tensorflow_io.arrow import ArrowStreamDataset |
| 89 | + |
| 90 | +# The str `host` should be in the format '<HOSTNAME>:<PORT>' |
| 91 | +dataset = ArrowStreamDataset( |
| 92 | + host, |
| 93 | + columns=(0, 1), |
| 94 | + output_types=(tf.int32, tf.float32), |
| 95 | + output_shapes=([], [])) |
| 96 | + |
| 97 | +iterator = dataset.make_one_shot_iterator() |
| 98 | +next_element = iterator.get_next() |
| 99 | + |
| 100 | +# The host connection is made when the Dataset op is run and will iterate over |
| 101 | +# each row of each record batch until the Arrow stream is finished |
| 102 | +with tf.Session() as sess: |
| 103 | + while True: |
| 104 | + try: |
| 105 | + print(sess.run(next_element)) |
| 106 | + except tf.errors.OutOfRangeError: |
| 107 | + break |
| 108 | +``` |
| 109 | + |
| 110 | +An alternate constructor can also be used to infer output types and shapes from |
| 111 | +a given `pyarrow.Schema`, e.g. `dataset = ArrowStreamDataset.from_schema(host, schema)` |
0 commit comments