Skip to content

Commit ea3d0ef

Browse files
committed
Bump Apache Arrow to 2.0.0 (tensorflow#1231)
* Bump Apache Arrow to 2.0.0 Also bumps Apache Thrift to 0.13.0 Signed-off-by: Yong Tang <[email protected]> * Update code to match Arrow Signed-off-by: Yong Tang <[email protected]> * Bump pyarrow to 2.0.0 Signed-off-by: Yong Tang <[email protected]> * Stay with version=1 for write_feather to pass tests Signed-off-by: Yong Tang <[email protected]> * Bump flatbuffers to 1.12.0 Signed-off-by: Yong Tang <[email protected]> * Fix Windows issue Signed-off-by: Yong Tang <[email protected]> * Fix tests Signed-off-by: Yong Tang <[email protected]> * Fix Windows Signed-off-by: Yong Tang <[email protected]> * Remove -std=c++11 and leave default -std=c++14 for arrow build Signed-off-by: Yong Tang <[email protected]> * Update sha256 of libapr1 As the hash changed by the repo. Signed-off-by: Yong Tang <[email protected]>
1 parent ca28fb5 commit ea3d0ef

File tree

16 files changed

+120
-10350
lines changed

16 files changed

+120
-10350
lines changed

.github/workflows/build.wheel.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ run_test() {
66
entry=$1
77
CPYTHON_VERSION=$($entry -c 'import sys; print(str(sys.version_info[0])+str(sys.version_info[1]))')
88
(cd wheelhouse && $entry -m pip install tensorflow_io-*-cp${CPYTHON_VERSION}-*.whl)
9-
$entry -m pip install -q pytest pytest-benchmark boto3 fastavro avro-python3 scikit-image pandas pyarrow==0.16.0 google-cloud-pubsub==2.1.0 google-cloud-bigquery-storage==1.1.0 google-cloud-bigquery==2.3.1 google-cloud-storage==1.32.0
9+
$entry -m pip install -q pytest pytest-benchmark boto3 fastavro avro-python3 scikit-image pandas pyarrow==2.0.0 google-cloud-pubsub==2.1.0 google-cloud-bigquery-storage==1.1.0 google-cloud-bigquery==2.3.1 google-cloud-storage==1.32.0
1010
(cd tests && $entry -m pytest --benchmark-disable -v --import-mode=append $(find . -type f \( -iname "test_*.py" ! \( -iname "test_*_eager.py" \) \)))
1111
(cd tests && $entry -m pytest --benchmark-disable -v --import-mode=append $(find . -type f \( -iname "test_*_eager.py" ! \( -iname "test_bigquery_eager.py" \) \)))
1212
# GRPC and test_bigquery_eager tests have to be executed separately because of https://github.com/grpc/grpc/issues/20034

WORKSPACE

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -256,22 +256,22 @@ http_archive(
256256
http_archive(
257257
name = "thrift",
258258
build_file = "//third_party:thrift.BUILD",
259-
sha256 = "b7452d1873c6c43a580d2b4ae38cfaf8fa098ee6dc2925bae98dce0c010b1366",
260-
strip_prefix = "thrift-0.12.0",
259+
sha256 = "5da60088e60984f4f0801deeea628d193c33cec621e78c8a43a5d8c4055f7ad9",
260+
strip_prefix = "thrift-0.13.0",
261261
urls = [
262-
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/apache/thrift/archive/0.12.0.tar.gz",
263-
"https://github.com/apache/thrift/archive/0.12.0.tar.gz",
262+
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/apache/thrift/archive/v0.13.0.tar.gz",
263+
"https://github.com/apache/thrift/archive/v0.13.0.tar.gz",
264264
],
265265
)
266266

267267
http_archive(
268268
name = "arrow",
269269
build_file = "//third_party:arrow.BUILD",
270-
sha256 = "d7b3838758a365c8c47d55ab0df1006a70db951c6964440ba354f81f518b8d8d",
271-
strip_prefix = "arrow-apache-arrow-0.16.0",
270+
sha256 = "ea299df9cf440cfc43393ce12ee6d9a4c9d0dfa9fde33c3bc9b70ec25520a844",
271+
strip_prefix = "arrow-apache-arrow-2.0.0",
272272
urls = [
273-
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/apache/arrow/archive/apache-arrow-0.16.0.tar.gz",
274-
"https://github.com/apache/arrow/archive/apache-arrow-0.16.0.tar.gz",
273+
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/apache/arrow/archive/apache-arrow-2.0.0.tar.gz",
274+
"https://github.com/apache/arrow/archive/apache-arrow-2.0.0.tar.gz",
275275
],
276276
)
277277

@@ -429,11 +429,11 @@ http_archive(
429429

430430
http_archive(
431431
name = "com_github_google_flatbuffers",
432-
sha256 = "12a13686cab7ffaf8ea01711b8f55e1dbd3bf059b7c46a25fefa1250bdd9dd23",
433-
strip_prefix = "flatbuffers-b99332efd732e6faf60bb7ce1ce5902ed65d5ba3",
432+
sha256 = "62f2223fb9181d1d6338451375628975775f7522185266cd5296571ac152bc45",
433+
strip_prefix = "flatbuffers-1.12.0",
434434
urls = [
435-
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/google/flatbuffers/archive/b99332efd732e6faf60bb7ce1ce5902ed65d5ba3.tar.gz",
436-
"https://github.com/google/flatbuffers/archive/b99332efd732e6faf60bb7ce1ce5902ed65d5ba3.tar.gz",
435+
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/google/flatbuffers/archive/v1.12.0.tar.gz",
436+
"https://github.com/google/flatbuffers/archive/v1.12.0.tar.gz",
437437
],
438438
)
439439

@@ -676,7 +676,7 @@ http_archive(
676676
patches = [
677677
"//third_party:libapr1.patch",
678678
],
679-
sha256 = "1a0909a1146a214a6ab9de28902045461901baab4e0ee43797539ec05b6dbae0",
679+
sha256 = "096968a363b2374f7450a3c65f3cc0b50561204a8da7bc03a2c39e080febd6e1",
680680
strip_prefix = "apr-1.6.5",
681681
urls = [
682682
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/apache/apr/archive/1.6.5.tar.gz",

tensorflow_io/arrow/kernels/arrow_dataset_ops.cc

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ limitations under the License.
1515

1616
#include "arrow/api.h"
1717
#include "arrow/ipc/api.h"
18+
#include "arrow/result.h"
1819
#include "arrow/util/io_util.h"
1920
#include "tensorflow/core/framework/dataset.h"
2021
#include "tensorflow/core/graph/graph.h"
@@ -476,12 +477,17 @@ class ArrowZeroCopyDatasetOp : public ArrowOpKernelBase {
476477
buffer_ = std::make_shared<arrow::Buffer>(dataset()->buffer_ptr_,
477478
dataset()->buffer_size_);
478479
buffer_reader_ = std::make_shared<arrow::io::BufferReader>(buffer_);
479-
CHECK_ARROW(arrow::ipc::RecordBatchFileReader::Open(
480-
buffer_reader_.get(), buffer_->size(), &reader_));
480+
arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchFileReader>>
481+
result = arrow::ipc::RecordBatchFileReader::Open(
482+
buffer_reader_.get(), buffer_->size());
483+
CHECK_ARROW(result.status());
484+
reader_ = std::move(result).ValueUnsafe();
481485
num_batches_ = reader_->num_record_batches();
482486
if (num_batches_ > 0) {
483-
CHECK_ARROW(
484-
reader_->ReadRecordBatch(current_batch_idx_, &current_batch_));
487+
arrow::Result<std::shared_ptr<arrow::RecordBatch>> result =
488+
reader_->ReadRecordBatch(current_batch_idx_);
489+
CHECK_ARROW(result.status());
490+
current_batch_ = std::move(result).ValueUnsafe();
485491
TF_RETURN_IF_ERROR(CheckBatchColumnTypes(current_batch_));
486492
}
487493
return Status::OK();
@@ -491,8 +497,10 @@ class ArrowZeroCopyDatasetOp : public ArrowOpKernelBase {
491497
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
492498
ArrowBaseIterator<Dataset>::NextStreamLocked(env);
493499
if (++current_batch_idx_ < num_batches_) {
494-
CHECK_ARROW(
495-
reader_->ReadRecordBatch(current_batch_idx_, &current_batch_));
500+
arrow::Result<std::shared_ptr<arrow::RecordBatch>> result =
501+
reader_->ReadRecordBatch(current_batch_idx_);
502+
CHECK_ARROW(result.status());
503+
current_batch_ = std::move(result).ValueUnsafe();
496504
}
497505
return Status::OK();
498506
}
@@ -604,12 +612,14 @@ class ArrowSerializedDatasetOp : public ArrowOpKernelBase {
604612
const string& batches = dataset()->batches_.scalar<tstring>()();
605613
auto buffer = std::make_shared<arrow::Buffer>(batches);
606614
auto buffer_reader = std::make_shared<arrow::io::BufferReader>(buffer);
607-
CHECK_ARROW(
608-
arrow::ipc::RecordBatchFileReader::Open(buffer_reader, &reader_));
615+
auto result = arrow::ipc::RecordBatchFileReader::Open(buffer_reader);
616+
CHECK_ARROW(result.status());
617+
reader_ = std::move(result).ValueUnsafe();
609618
num_batches_ = reader_->num_record_batches();
610619
if (num_batches_ > 0) {
611-
CHECK_ARROW(
612-
reader_->ReadRecordBatch(current_batch_idx_, &current_batch_));
620+
auto result = reader_->ReadRecordBatch(current_batch_idx_);
621+
CHECK_ARROW(result.status());
622+
current_batch_ = std::move(result).ValueUnsafe();
613623
TF_RETURN_IF_ERROR(CheckBatchColumnTypes(current_batch_));
614624
}
615625
return Status::OK();
@@ -619,8 +629,9 @@ class ArrowSerializedDatasetOp : public ArrowOpKernelBase {
619629
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
620630
ArrowBaseIterator<Dataset>::NextStreamLocked(env);
621631
if (++current_batch_idx_ < num_batches_) {
622-
CHECK_ARROW(
623-
reader_->ReadRecordBatch(current_batch_idx_, &current_batch_));
632+
auto result = reader_->ReadRecordBatch(current_batch_idx_);
633+
CHECK_ARROW(result.status());
634+
current_batch_ = std::move(result).ValueUnsafe();
624635
}
625636
return Status::OK();
626637
}
@@ -736,14 +747,18 @@ class ArrowFeatherDatasetOp : public ArrowOpKernelBase {
736747
new ArrowRandomAccessFile(tf_file.get(), size));
737748

738749
// Create the Feather reader
739-
std::unique_ptr<arrow::ipc::feather::TableReader> reader;
740-
CHECK_ARROW(arrow::ipc::feather::TableReader::Open(in_file, &reader));
750+
std::shared_ptr<arrow::ipc::feather::Reader> reader;
751+
arrow::Result<std::shared_ptr<arrow::ipc::feather::Reader>> result =
752+
arrow::ipc::feather::Reader::Open(in_file);
753+
CHECK_ARROW(result.status());
754+
reader = std::move(result).ValueUnsafe();
741755

742756
// Read file columns and build a table
743-
int64_t num_columns = reader->num_columns();
744757
std::shared_ptr<::arrow::Table> table;
745758
CHECK_ARROW(reader->Read(&table));
746759

760+
int64_t num_columns = table->num_columns();
761+
747762
// Convert the table to a sequence of batches
748763
arrow::TableBatchReader tr(*table.get());
749764
std::shared_ptr<arrow::RecordBatch> batch;
@@ -885,8 +900,10 @@ class ArrowStreamDatasetOp : public ArrowOpKernelBase {
885900
in_stream_ = socket_stream;
886901
}
887902

888-
CHECK_ARROW(arrow::ipc::RecordBatchStreamReader::Open(in_stream_.get(),
889-
&reader_));
903+
auto result =
904+
arrow::ipc::RecordBatchStreamReader::Open(in_stream_.get());
905+
CHECK_ARROW(result.status());
906+
reader_ = std::move(result).ValueUnsafe();
890907
CHECK_ARROW(reader_->ReadNext(&current_batch_));
891908
TF_RETURN_IF_ERROR(CheckBatchColumnTypes(current_batch_));
892909
return Status::OK();

tensorflow_io/arrow/kernels/arrow_kernels.cc

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,11 @@ class ArrowReadableFromMemoryInitOp
161161
auto buffer_reader = std::make_shared<arrow::io::BufferReader>(buffer_);
162162

163163
std::shared_ptr<arrow::Schema> schema;
164-
arrow::Status status =
165-
arrow::ipc::ReadSchema(buffer_reader.get(), nullptr, &schema);
166-
OP_REQUIRES(context, status.ok(),
164+
arrow::Result<std::shared_ptr<arrow::Schema>> result =
165+
arrow::ipc::ReadSchema(buffer_reader.get(), nullptr);
166+
OP_REQUIRES(context, result.ok(),
167167
errors::Internal("Error reading Arrow Schema"));
168+
schema = std::move(result).ValueUnsafe();
168169

169170
const Tensor* array_buffer_addrs_tensor;
170171
OP_REQUIRES_OK(context, context->input("array_buffer_addresses",
@@ -429,10 +430,10 @@ class ListFeatherColumnsOp : public OpKernel {
429430
::arrow::ipc::feather::fbs::GetCTable(buffer.data());
430431

431432
OP_REQUIRES(context,
432-
(table->version() >= ::arrow::ipc::feather::kFeatherVersion),
433+
(table->version() >= ::arrow::ipc::feather::kFeatherV1Version),
433434
errors::InvalidArgument(
434435
"feather file is old: ", table->version(), " vs. ",
435-
::arrow::ipc::feather::kFeatherVersion));
436+
::arrow::ipc::feather::kFeatherV1Version));
436437

437438
std::vector<string> columns;
438439
std::vector<string> dtypes;
@@ -577,10 +578,10 @@ class FeatherReadable : public IOReadableInterface {
577578
const ::arrow::ipc::feather::fbs::CTable* table =
578579
::arrow::ipc::feather::fbs::GetCTable(buffer.data());
579580

580-
if (table->version() < ::arrow::ipc::feather::kFeatherVersion) {
581+
if (table->version() < ::arrow::ipc::feather::kFeatherV1Version) {
581582
return errors::InvalidArgument("feather file is old: ", table->version(),
582583
" vs. ",
583-
::arrow::ipc::feather::kFeatherVersion);
584+
::arrow::ipc::feather::kFeatherV1Version);
584585
}
585586

586587
for (size_t i = 0; i < table->columns()->size(); i++) {
@@ -683,18 +684,20 @@ class FeatherReadable : public IOReadableInterface {
683684

684685
if (feather_file_.get() == nullptr) {
685686
feather_file_.reset(new ArrowRandomAccessFile(file_.get(), file_size_));
686-
arrow::Status s =
687-
arrow::ipc::feather::TableReader::Open(feather_file_, &reader_);
688-
if (!s.ok()) {
689-
return errors::Internal(s.ToString());
687+
arrow::Result<std::shared_ptr<arrow::ipc::feather::Reader>> result =
688+
arrow::ipc::feather::Reader::Open(feather_file_);
689+
if (!result.ok()) {
690+
return errors::Internal(result.status().ToString());
690691
}
692+
reader_ = std::move(result).ValueUnsafe();
691693
}
692694

693-
std::shared_ptr<arrow::ChunkedArray> column;
694-
arrow::Status s = reader_->GetColumn(column_index, &column);
695+
std::shared_ptr<arrow::Table> table;
696+
arrow::Status s = reader_->Read(&table);
695697
if (!s.ok()) {
696698
return errors::Internal(s.ToString());
697699
}
700+
std::shared_ptr<arrow::ChunkedArray> column = table->column(column_index);
698701

699702
std::shared_ptr<::arrow::ChunkedArray> slice =
700703
column->Slice(element_start, element_stop);
@@ -767,7 +770,7 @@ class FeatherReadable : public IOReadableInterface {
767770
std::unique_ptr<SizedRandomAccessFile> file_ TF_GUARDED_BY(mu_);
768771
uint64 file_size_ TF_GUARDED_BY(mu_);
769772
std::shared_ptr<ArrowRandomAccessFile> feather_file_ TF_GUARDED_BY(mu_);
770-
std::unique_ptr<arrow::ipc::feather::TableReader> reader_ TF_GUARDED_BY(mu_);
773+
std::shared_ptr<arrow::ipc::feather::Reader> reader_ TF_GUARDED_BY(mu_);
771774

772775
std::vector<DataType> dtypes_;
773776
std::vector<TensorShape> shapes_;

tensorflow_io/arrow/kernels/arrow_kernels.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,12 @@ class ArrowRandomAccessFile : public ::arrow::io::RandomAccessFile {
5151
return result.size();
5252
}
5353
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override {
54-
std::shared_ptr<arrow::ResizableBuffer> buffer;
55-
RETURN_NOT_OK(AllocateResizableBuffer(nbytes, &buffer));
54+
arrow::Result<std::shared_ptr<arrow::ResizableBuffer>> result =
55+
arrow::AllocateResizableBuffer(nbytes);
56+
ARROW_RETURN_NOT_OK(result);
57+
std::shared_ptr<arrow::ResizableBuffer> buffer =
58+
std::move(result).ValueUnsafe();
59+
5660
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
5761
Read(nbytes, buffer->mutable_data()));
5862
RETURN_NOT_OK(buffer->Resize(bytes_read));

tensorflow_io/arrow/kernels/arrow_stream_client_unix.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,11 @@ arrow::Result<int64_t> ArrowStreamClient::Read(int64_t nbytes, void* out) {
132132

133133
arrow::Result<std::shared_ptr<arrow::Buffer>> ArrowStreamClient::Read(
134134
int64_t nbytes) {
135-
std::shared_ptr<arrow::ResizableBuffer> buffer;
136-
ARROW_RETURN_NOT_OK(arrow::AllocateResizableBuffer(nbytes, &buffer));
135+
arrow::Result<std::shared_ptr<arrow::ResizableBuffer>> result =
136+
arrow::AllocateResizableBuffer(nbytes);
137+
ARROW_RETURN_NOT_OK(result);
138+
std::shared_ptr<arrow::ResizableBuffer> buffer =
139+
std::move(result).ValueUnsafe();
137140
int64_t bytes_read;
138141
ARROW_ASSIGN_OR_RAISE(bytes_read, Read(nbytes, buffer->mutable_data()));
139142
ARROW_RETURN_NOT_OK(buffer->Resize(bytes_read, false));

tensorflow_io/arrow/kernels/arrow_stream_client_windows.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,11 @@ arrow::Result<int64_t> ArrowStreamClient::Read(int64_t nbytes, void* out) {
154154

155155
arrow::Result<std::shared_ptr<arrow::Buffer>> ArrowStreamClient::Read(
156156
int64_t nbytes) {
157-
std::shared_ptr<arrow::ResizableBuffer> buffer;
158-
ARROW_RETURN_NOT_OK(arrow::AllocateResizableBuffer(nbytes, &buffer));
157+
arrow::Result<std::shared_ptr<arrow::ResizableBuffer>> result =
158+
arrow::AllocateResizableBuffer(nbytes);
159+
ARROW_RETURN_NOT_OK(result);
160+
std::shared_ptr<arrow::ResizableBuffer> buffer =
161+
std::move(result).ValueUnsafe();
159162
int64_t bytes_read;
160163
ARROW_ASSIGN_OR_RAISE(bytes_read, Read(nbytes, buffer->mutable_data()));
161164
ARROW_RETURN_NOT_OK(buffer->Resize(bytes_read, false));

tensorflow_io/bigquery/kernels/bigquery_dataset_op.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,12 @@ class BigQueryDatasetOp : public DatasetOpKernel {
102102

103103
arrow::ipc::DictionaryMemo dict_memo;
104104
arrow::io::BufferReader input(buffer_);
105-
arrow::Status arrow_status =
106-
arrow::ipc::ReadSchema(&input, &dict_memo, &arrow_schema_);
107-
OP_REQUIRES(ctx, arrow_status.ok(),
105+
arrow::Result<std::shared_ptr<arrow::Schema>> result =
106+
arrow::ipc::ReadSchema(&input, &dict_memo);
107+
OP_REQUIRES(ctx, result.ok(),
108108
errors::Internal("Error reading Arrow Schema",
109-
arrow_status.message()));
109+
result.status().message()));
110+
arrow_schema_ = std::move(result).ValueUnsafe();
110111
} else {
111112
ctx->CtxFailure(errors::InvalidArgument("Invalid data_format"));
112113
}

tensorflow_io/bigquery/kernels/bigquery_lib.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -224,12 +224,13 @@ class BigQueryReaderArrowDatasetIterator
224224
arrow::io::BufferReader buffer_reader_(buffer_);
225225
arrow::ipc::DictionaryMemo dict_memo;
226226

227-
auto arrow_status =
228-
arrow::ipc::ReadRecordBatch(this->dataset()->arrow_schema(), &dict_memo,
229-
&buffer_reader_, &this->record_batch_);
230-
if (!arrow_status.ok()) {
231-
return errors::Internal(arrow_status.ToString());
227+
auto result = arrow::ipc::ReadRecordBatch(
228+
this->dataset()->arrow_schema(), &dict_memo,
229+
arrow::ipc::IpcReadOptions::Defaults(), &buffer_reader_);
230+
if (!result.ok()) {
231+
return errors::Internal(result.status().ToString());
232232
}
233+
this->record_batch_ = std::move(result).ValueUnsafe();
233234

234235
VLOG(3) << "got record batch, rows:" << record_batch_->num_rows();
235236

tensorflow_io/core/kernels/csv_kernels.cc

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,24 @@ class CSVReadable : public IOReadableInterface {
4444

4545
csv_file_.reset(new ArrowRandomAccessFile(file_.get(), file_size_));
4646

47-
::arrow::Status status;
48-
49-
status = ::arrow::csv::TableReader::Make(
47+
auto result = ::arrow::csv::TableReader::Make(
5048
::arrow::default_memory_pool(), csv_file_,
5149
::arrow::csv::ReadOptions::Defaults(),
5250
::arrow::csv::ParseOptions::Defaults(),
53-
::arrow::csv::ConvertOptions::Defaults(), &reader_);
54-
if (!status.ok()) {
55-
return errors::InvalidArgument("unable to make a TableReader: ", status);
51+
::arrow::csv::ConvertOptions::Defaults());
52+
if (!result.status().ok()) {
53+
return errors::InvalidArgument("unable to make a TableReader: ",
54+
result.status());
5655
}
57-
status = reader_->Read(&table_);
58-
if (!status.ok()) {
59-
return errors::InvalidArgument("unable to read table: ", status);
56+
reader_ = std::move(result).ValueUnsafe();
57+
58+
{
59+
auto result = reader_->Read();
60+
if (!result.status().ok()) {
61+
return errors::InvalidArgument("unable to read table: ",
62+
result.status());
63+
}
64+
table_ = std::move(result).ValueUnsafe();
6065
}
6166

6267
for (int i = 0; i < table_->num_columns(); i++) {
@@ -108,11 +113,9 @@ class CSVReadable : public IOReadableInterface {
108113
case ::arrow::Type::TIMESTAMP:
109114
case ::arrow::Type::TIME32:
110115
case ::arrow::Type::TIME64:
111-
case ::arrow::Type::INTERVAL:
112116
case ::arrow::Type::DECIMAL:
113117
case ::arrow::Type::LIST:
114118
case ::arrow::Type::STRUCT:
115-
case ::arrow::Type::UNION:
116119
case ::arrow::Type::DICTIONARY:
117120
case ::arrow::Type::MAP:
118121
default:

0 commit comments

Comments
 (0)