Let's do this incrementally, shall we?
First, let's make get_files_in_dir() idiomatic. We will get back to errors later.
fn get_files_in_dir(dir: &str) -> Option<Vec<PathBuf>> {
fs::read_dir(dir)
.ok()?
.map(|res| res.map(|e| e.path()))
.collect::<Result<Vec<_>, _>>()
.ok()
}
Now, in read_parquet_dir(), if the unwraps stem from confidence that we will never get errors, then we can confidently ignore them (we will get back to the errors later).
fn read_parquet_dir(entries: &Vec<String>) -> impl Iterator<Item = record::Row> {
// ignore all errors
entries.iter()
.cloned()
.filter_map(|p| SerializedFileReader::try_from(p).ok())
.flat_map(|r| r.into_iter())
.filter_map(|r| r.ok())
}
Now, let's go back to get_files_in_dir(), and not ignore errors.
fn get_files_in_dir(dir: &str) -> Result<Vec<PathBuf>, io::Error>
{
fs::read_dir(dir)?
.map(|res| res.map(|e| e.path()))
.collect::<Result<Vec<_>, _>>()
}
fn main() -> Result<(), io::Error> {
let args = Args::parse();
- let entries = match get_files_in_dir(&args.dir)
- {
- Some(entries) => entries,
- None => return Ok(())
- };
-
+ let entries = get_files_in_dir(&args.dir)?;
let mut wtr = WriterBuilder::new().from_writer(io::stdout());
for (idx, row) in read_parquet_dir(&entries.iter().map(|p| p.display().to_string()).collect()).enumerate() {
Now, SerializedFileReader::try_from() is implemented for &Path, and PathBuf derefs to &Path. So your dance of converting to display then to string (which is lossy btw) is not needed.
While we're at it, let's use a slice instead of &Vec<_> in the signature (clippy would tell you about this if you have it set up with rust-analyzer).
fn read_parquet_dir(entries: &[PathBuf]) -> impl Iterator<Item = record::Row> {
// ignore all errors
entries.iter()
.filter_map(|p| SerializedFileReader::try_from(&**p).ok())
.flat_map(|r| r.into_iter())
.filter_map(|r| r.ok())
}
let entries = get_files_in_dir(&args.dir)?;
let mut wtr = WriterBuilder::new().from_writer(io::stdout());
- for (idx, row) in read_parquet_dir(&entries.iter().map(|p| p.display().to_string()).collect()).enumerate() {
+ for (idx, row) in read_parquet_dir(&entries).enumerate() {
let values: Vec<String> = row.get_column_iter().map(|(_column, value)| value.to_string()).collect();
if idx == 0 {
wtr.serialize(row.get_column_iter().map(|(column, _value)| column.to_string()).collect::<Vec<String>>())?;
Now let's see what we can do about not ignoring errors in read_parquet_dir().
Approach 1: Save intermediate reader results
This consumes all readers before getting further. So, it's a behavioral change. The signature may also scare some people 😉
fn read_parquet_dir(entries: &Vec<PathBuf>) -> Result<impl Iterator<Item = Result<record::Row, ParquetError>>, ParquetError> {
Ok(entries
.iter()
.map(|p| SerializedFileReader::try_from(&**p))
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.flat_map(|r| r.into_iter()))
}
Approach 2: Wrapper iterator type
How can we combine errors from readers with flat record results?
This is how.
enum ErrorOrRows {
Error(Option<ParquetError>),
Rows(record::reader::RowIter<'static>)
}
impl Iterator for ErrorOrRows {
type Item = Result<record::Row, ParquetError>;
fn next(&mut self) -> Option<Self::Item> {
match self {
Self::Error(e_opt) => e_opt.take().map(Err),
Self::Rows(row_iter) => row_iter.next(),
}
}
}
fn read_parquet_dir(entries: &[PathBuf]) -> impl Iterator<Item = Result<record::Row, ParquetError>>
{
entries
.iter()
.flat_map(|p| match SerializedFileReader::try_from(&**p) {
Err(e) => ErrorOrRows::Error(Some(e)),
Ok(sr) => ErrorOrRows::Rows(sr.into_iter()),
})
}
let mut wtr = WriterBuilder::new().from_writer(io::stdout());
for (idx, row) in read_parquet_dir(&entries).enumerate() {
+ let row = row?;
let values: Vec<String> = row.get_column_iter().map(|(_column, value)| value.to_string()).collect();
if idx == 0 {
wtr.serialize(row.get_column_iter().map(|(column, _value)| column.to_string()).collect::<Vec<String>>())?;
Approach 3 (bonus): Using unstable #![feature(gen_blocks)]
fn read_parquet_dir(entries: &[PathBuf]) -> impl Iterator<Item = Result<record::Row, ParquetError>> {
gen move {
for p in entries {
match SerializedFileReader::try_from(&**p) {
Err(e) => yield Err(e),
Ok(sr) => for row_res in sr { yield row_res; }
}
}
}
}
Cool.
Is it all in rust-mail repo?
And how much of "Rust" in this image is actually open?