Skip to content

Commit e1825cd

Browse files
committed
Make blocking configurable
1 parent b3e873a commit e1825cd

File tree

1 file changed

+25
-16
lines changed

1 file changed

+25
-16
lines changed

parquet/src/arrow/async_reader.rs

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -133,31 +133,40 @@ impl Storage for Box<dyn Storage> {
133133

134134
pub struct FileStorage {
135135
file: Option<File>,
136+
spawn_blocking: bool,
136137
}
137138
impl FileStorage {
138-
pub fn new(file: File) -> Self {
139-
Self { file: Some(file) }
139+
pub fn new(file: File, spawn_blocking: bool) -> Self {
140+
Self {
141+
file: Some(file),
142+
spawn_blocking,
143+
}
140144
}
141145

142146
pub async fn asyncify<F, T>(&mut self, f: F) -> Result<T>
143147
where
144148
F: FnOnce(&mut File) -> Result<T> + Send + 'static,
145149
T: Send + 'static,
146150
{
147-
// let mut file = self.file.take().expect("FileStorage poisoned");
148-
// let (file, result) = tokio::task::spawn_blocking(move || {
149-
// let result = f(&mut file);
150-
// (file, result)
151-
// })
152-
// .await
153-
// .expect("background task panicked");
154-
//
155-
// self.file = Some(file);
156-
// result
157-
158-
// TODO: Temporary use blocking file IO in tokio worker
159-
let file = self.file.as_mut().unwrap();
160-
f(file)
151+
match self.spawn_blocking {
152+
true => {
153+
let mut file = self.file.take().expect("FileStorage poisoned");
154+
let (file, result) = tokio::task::spawn_blocking(move || {
155+
let result = f(&mut file);
156+
(file, result)
157+
})
158+
.await
159+
.expect("background task panicked");
160+
161+
self.file = Some(file);
162+
result
163+
}
164+
false => {
165+
// Use blocking file IO in tokio worker
166+
let file = self.file.as_mut().unwrap();
167+
f(file)
168+
}
169+
}
161170
}
162171
}
163172

0 commit comments

Comments
 (0)