Skip to content

Commit

Permalink
refactor: Make icelake buildable along with opendal v0.46 (#274)
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo committed May 6, 2024
1 parent 54fd72f commit be8b2c2
Show file tree
Hide file tree
Showing 19 changed files with 142 additions and 245 deletions.
20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ anyhow = "1"
async-trait = "0.1"
# branch is icelake-dev
apache-avro = { git = "https://github.com/icelake-io/avro.git", branch = "icelake-dev", recv = "4b828e9283e7248fd3ca42f5b590c2160b201785",features = ["derive"] }
arrow-array = { version = ">=48" }
arrow-schema = { version = ">=48" }
arrow-select = { version = ">=48" }
arrow-row = { version = ">=48" }
arrow-buffer = { version = ">=48" }
arrow-arith = { version = ">=48" }
arrow-csv = { version = ">=48" }
arrow-cast = { version = ">=48" }
arrow-ord = { version = ">=48" }
arrow-array = { version = ">=50" }
arrow-schema = { version = ">=50" }
arrow-select = { version = ">=50" }
arrow-row = { version = ">=50" }
arrow-buffer = { version = ">=50" }
arrow-arith = { version = ">=50" }
arrow-csv = { version = ">=50" }
arrow-cast = { version = ">=50" }
arrow-ord = { version = ">=50" }
bytes = "1"
opendal = { version = ">=0.40", features = ["layers-prometheus"] }
opendal = { version = ">=0.46", features = ["layers-prometheus"] }
uuid = { version = "1", features = ["v4"] }
serde = "1"
serde_json = "1"
Expand Down
1 change: 1 addition & 0 deletions icelake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ csv = { workspace = true }
env_logger = { workspace = true }
arrow-csv = { workspace = true }
libtest-mimic = { workspace = true }
opendal = { workspace = true, features = ["layers-prometheus", "services-fs"] }

[features]
prometheus = ["dep:prometheus"]
Expand Down
5 changes: 3 additions & 2 deletions icelake/src/catalog/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ impl<O: OperatorCreator> StorageCatalog<O> {
.operator_creator
.create()?
.read(format!("{table_path}/metadata/version-hint.text").as_str())
.await?;
.await?
.to_vec();
let version_hint = String::from_utf8(content).map_err(|err| {
Error::new(
crate::ErrorKind::IcebergDataInvalid,
Expand All @@ -107,7 +108,7 @@ impl<O: OperatorCreator> StorageCatalog<O> {

/// Read table metadata of the given version.
async fn read_table_metadata(&self, path: &str) -> Result<types::TableMetadata> {
let content = self.operator()?.read(path).await?;
let content = self.operator()?.read(path).await?.to_bytes();

let metadata = types::parse_table_metadata(&content)?;

Expand Down
11 changes: 7 additions & 4 deletions icelake/src/io/appender/rolling_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ impl RollingWriter {
assert!(self.current_writer.is_none());

let location = self.location_generator.generate_name();
let file_writer = self.operator.writer(&location).await?;
let file_writer = self
.operator
.writer(&location)
.await?
.into_futures_async_write();
let current_writer = {
let mut props = WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_1_0)
Expand Down Expand Up @@ -244,7 +248,6 @@ mod test {
use std::{fs, sync::Arc};

use arrow_array::{ArrayRef, Int64Array, RecordBatch};
use bytes::Bytes;
use opendal::{services::Memory, Operator};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;

Expand Down Expand Up @@ -306,8 +309,8 @@ mod test {
for data_file in data_files {
let res = op
.read(data_file.file_path.strip_prefix("/tmp/table").unwrap())
.await?;
let res = Bytes::from(res);
.await?
.to_bytes();
let reader = ParquetRecordBatchReaderBuilder::try_new(res)
.unwrap()
.build()
Expand Down
4 changes: 0 additions & 4 deletions icelake/src/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,4 @@ mod write;
pub use write::ParquetWriter;
pub use write::ParquetWriterBuilder;

mod stream;
pub use stream::ParquetStream;
pub use stream::ParquetStreamBuilder;

mod track_writer;
109 changes: 0 additions & 109 deletions icelake/src/io/parquet/stream.rs

This file was deleted.

12 changes: 6 additions & 6 deletions icelake/src/io/parquet/track_writer.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use futures::AsyncWrite;
use std::{
pin::Pin,
sync::{atomic::AtomicU64, Arc},
};

use opendal::Writer;
use tokio::io::AsyncWrite;
use opendal::FuturesAsyncWriter;

/// `TrackWriter` is used to track the written size.
pub struct TrackWriter {
writer: Writer,
writer: FuturesAsyncWriter,
written_size: Arc<AtomicU64>,
}

impl TrackWriter {
pub fn new(writer: Writer) -> Self {
pub fn new(writer: FuturesAsyncWriter) -> Self {
Self {
writer,
written_size: Arc::new(AtomicU64::new(0)),
Expand All @@ -26,7 +26,7 @@ impl TrackWriter {
}
}

impl AsyncWrite for TrackWriter {
impl tokio::io::AsyncWrite for TrackWriter {
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
Expand Down Expand Up @@ -54,6 +54,6 @@ impl AsyncWrite for TrackWriter {
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
Pin::new(&mut self.writer).poll_shutdown(cx)
Pin::new(&mut self.writer).poll_close(cx)
}
}
15 changes: 6 additions & 9 deletions icelake/src/io/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use opendal::Writer;
use opendal::FuturesAsyncWriter;
use parquet::arrow::AsyncArrowWriter;
use parquet::file::properties::WriterProperties;
use parquet::format::FileMetaData;
Expand All @@ -14,7 +14,7 @@ use super::track_writer::TrackWriter;

/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
pub struct ParquetWriterBuilder {
writer: Writer,
writer: FuturesAsyncWriter,
arrow_schema: SchemaRef,

/// `buffer_size` determines the initial size of the intermediate buffer.
Expand All @@ -25,7 +25,7 @@ pub struct ParquetWriterBuilder {

impl ParquetWriterBuilder {
/// Initiate a new builder.
pub fn new(w: Writer, arrow_schema: SchemaRef) -> Self {
pub fn new(w: FuturesAsyncWriter, arrow_schema: SchemaRef) -> Self {
Self {
writer: w,
arrow_schema,
Expand Down Expand Up @@ -60,8 +60,7 @@ impl ParquetWriterBuilder {
let writer = TrackWriter::new(self.writer);
let written_size = writer.get_wrriten_size();

let writer =
AsyncArrowWriter::try_new(writer, self.arrow_schema, self.buffer_size, self.props)?;
let writer = AsyncArrowWriter::try_new(writer, self.arrow_schema, self.props)?;

Ok(ParquetWriter {
writer,
Expand Down Expand Up @@ -116,7 +115,6 @@ mod tests {
use arrow_array::ArrayRef;
use arrow_array::Int64Array;
use arrow_array::RecordBatch;
use bytes::Bytes;
use opendal::services::Memory;
use opendal::Operator;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
Expand All @@ -130,14 +128,13 @@ mod tests {
let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();

let w = op.writer("test").await?;
let w = op.writer("test").await?.into_futures_async_write();
let mut pw = ParquetWriterBuilder::new(w, to_write.schema()).build()?;
pw.write(&to_write).await?;
pw.write(&to_write).await?;
pw.close().await?;

let res = op.read("test").await?;
let res = Bytes::from(res);
let res = op.read("test").await?.to_bytes();
let mut reader = ParquetRecordBatchReaderBuilder::try_new(res)
.unwrap()
.build()
Expand Down
8 changes: 5 additions & 3 deletions icelake/src/io/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl AsyncFileReader for ParquetFileReader {
.read_with(&self.path)
.range(range.start as u64..range.end as u64)
.await
.map(|data| data.into())
.map(|data| data.to_bytes())
.map_err(|e| ParquetError::General(format!("{}", e)))
})
}
Expand Down Expand Up @@ -239,7 +239,8 @@ impl AsyncFileReader for ParquetFileReader {
.read_with(&self.path)
.range((file_size - (FOOTER_SIZE as u64))..file_size)
.await
.map_err(|e| ParquetError::General(format!("{}", e)))?;
.map_err(|e| ParquetError::General(format!("{}", e)))?
.to_bytes();

assert_eq!(footer_buffer.len(), FOOTER_SIZE);
footer.copy_from_slice(&footer_buffer);
Expand All @@ -263,7 +264,8 @@ impl AsyncFileReader for ParquetFileReader {
.read_with(&self.path)
.range(start..(start + metadata_len as u64))
.await
.map_err(|e| ParquetError::General(format!("{}", e)))?;
.map_err(|e| ParquetError::General(format!("{}", e)))?
.to_bytes();
Ok(Arc::new(decode_metadata(&metadata_bytes)?))
})
}
Expand Down
19 changes: 8 additions & 11 deletions icelake/src/io_v2/file_writer/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct ParquetWriterBuilder<L: LocationGenerator> {
operator: Operator,
/// `buffer_size` determines the initial size of the intermediate buffer.
/// The intermediate buffer will automatically be resized if necessary
init_buffer_size: usize,
_init_buffer_size: usize,
props: WriterProperties,
table_location: String,
location_generator: L,
Expand All @@ -50,7 +50,7 @@ impl<L: LocationGenerator> ParquetWriterBuilder<L> {
}
Self {
operator,
init_buffer_size,
_init_buffer_size: init_buffer_size,
props: props.build(),
table_location,
location_generator,
Expand All @@ -67,16 +67,14 @@ impl<L: LocationGenerator> FileWriterBuilder for ParquetWriterBuilder<L> {

let written_size = Arc::new(AtomicI64::new(0));
let writer = TrackWriter::new(
self.operator.writer(&file_name).await?,
self.operator
.writer(&file_name)
.await?
.into_futures_async_write(),
written_size.clone(),
);

let writer = AsyncArrowWriter::try_new(
writer,
schema.clone(),
self.init_buffer_size,
Some(self.props),
)?;
let writer = AsyncArrowWriter::try_new(writer, schema.clone(), Some(self.props))?;

Ok(ParquetWriter {
file_path: format!("{}/{}", self.table_location, file_name),
Expand Down Expand Up @@ -231,7 +229,6 @@ mod tests {
use arrow_array::ArrayRef;
use arrow_array::Int64Array;
use arrow_array::RecordBatch;
use bytes::Bytes;
use opendal::services::Memory;
use opendal::Operator;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
Expand Down Expand Up @@ -269,7 +266,7 @@ mod tests {
pw.close().await?;

let res = op.read("test").await?;
let res = Bytes::from(res);
let res = res.to_bytes();
let mut reader = ParquetRecordBatchReaderBuilder::try_new(res)
.unwrap()
.build()
Expand Down
Loading

0 comments on commit be8b2c2

Please sign in to comment.