Skip to content

Commit

Permalink
fix: switch any::Result to crate::error::Result in table.rs (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Jul 11, 2023
1 parent 8474af1 commit dc41051
Showing 1 changed file with 68 additions and 42 deletions.
110 changes: 68 additions & 42 deletions src/table.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::collections::HashMap;

use anyhow::anyhow;
use anyhow::Result;
use crate::error::Result;
use futures::StreamExt;
use opendal::layers::LoggingLayer;
use opendal::services::Fs;
use opendal::Operator;

use crate::io::task_writer::TaskWriter;
use crate::types;
use crate::types::DataFile;
use crate::{types, Error};

/// Table is the main entry point for the IceLake.
pub struct Table {
Expand Down Expand Up @@ -48,17 +47,18 @@ impl Table {
} else {
let files = self.list_table_metadata_paths().await?;

files
.into_iter()
.last()
.ok_or_else(|| anyhow!("no table metadata found"))?
files.into_iter().last().ok_or(Error::new(
crate::ErrorKind::IcebergDataInvalid,
"no table metadata found",
))?
};

let metadata = self.read_table_metadata(&path).await?;
// TODO: check if the metadata is out of date.
if metadata.last_updated_ms == 0 {
return Err(anyhow!(
"Timestamp when the table was last updated is invalid"
return Err(Error::new(
crate::ErrorKind::IcebergDataInvalid,
"Timestamp when the table was last updated is invalid",
));
}
self.current_version = metadata.last_updated_ms;
Expand Down Expand Up @@ -92,15 +92,15 @@ impl Table {
}

/// Fetch current table metadata.
pub fn current_table_metadata(&self) -> Result<&types::TableMetadata> {
pub fn current_table_metadata(&self) -> &types::TableMetadata {
assert!(
self.current_version != 0,
"table current version must be valid"
);

self.table_metadata
.get(&self.current_version)
.ok_or_else(|| anyhow!("table metadata not found"))
.expect("table metadata of current version must be exist")
}

/// # TODO
Expand All @@ -117,18 +117,25 @@ impl Table {
let meta = self
.table_metadata
.get(&self.current_version)
.ok_or_else(|| anyhow!("table metadata not found"))?;
.expect("table metadata of current version must be exist");

let current_snapshot_id = meta
.current_snapshot_id
.ok_or_else(|| anyhow!("current snapshot id is empty"))?;
let current_snapshot_id = meta.current_snapshot_id.ok_or(Error::new(
crate::ErrorKind::IcebergDataInvalid,
"current snapshot id is empty",
))?;
let current_snapshot = meta
.snapshots
.as_ref()
.ok_or_else(|| anyhow!("snapshots is emppty"))?
.ok_or(Error::new(
crate::ErrorKind::IcebergDataInvalid,
"snapshots is empty",
))?
.iter()
.find(|v| v.snapshot_id == current_snapshot_id)
.ok_or_else(|| anyhow!("snapshot with id {} is not found", current_snapshot_id))?;
.ok_or(Error::new(
crate::ErrorKind::IcebergDataInvalid,
format!("snapshot with id {} is not found", current_snapshot_id),
))?;

let manifest_list_path = self.rel_path(&current_snapshot.manifest_list)?;
let manifest_list_content = self.op.read(&manifest_list_path).await?;
Expand All @@ -147,19 +154,19 @@ impl Table {

/// Get the relpath related to the base of table location.
pub fn rel_path(&self, path: &str) -> Result<String> {
let location = self
.current_location
.as_ref()
.ok_or_else(|| anyhow!("table location is empty, maybe it's not loaded?"))?;
let location = self.current_location.as_ref().ok_or(Error::new(
crate::ErrorKind::IcebergDataInvalid,
"table location is empty, maybe it's not loaded?",
))?;

path.strip_prefix(location)
.ok_or_else(|| {
anyhow!(
.ok_or(Error::new(
crate::ErrorKind::IcebergDataInvalid,
format!(
"path {} is not starts with table location {}",
path,
location
)
})
path, location
),
))
.map(|v| v.to_string())
}

Expand All @@ -168,17 +175,30 @@ impl Table {
self.op
.is_exist("metadata/version-hint.text")
.await
.map_err(|e| anyhow!("check if version hint exist failed: {}", e))
.map_err(|e| {
Error::new(
crate::ErrorKind::IcebergDataInvalid,
format!("check if version hint exist failed: {}", e),
)
})
}

/// Read version hint of table.
async fn read_version_hint(&self) -> Result<i32> {
let content = self.op.read("metadata/version-hint.text").await?;
let version_hint = String::from_utf8(content)?;

version_hint
.parse()
.map_err(|e| anyhow!("parse version hint failed: {}", e))
let version_hint = String::from_utf8(content).map_err(|err| {
Error::new(
crate::ErrorKind::IcebergDataInvalid,
format!("Fail to covert version_hint from utf8 to string: {}", err),
)
})?;

version_hint.parse().map_err(|e| {
Error::new(
crate::ErrorKind::IcebergDataInvalid,
format!("parse version hint failed: {}", e),
)
})
}

/// Read table metadata of the given version.
Expand All @@ -196,16 +216,22 @@ impl Table {
///
/// TODO: we can imporve this by only fetch the latest metadata.
async fn list_table_metadata_paths(&self) -> Result<Vec<String>> {
let mut lister = self
.op
.list("metadata/")
.await
.map_err(|err| anyhow!("list metadata failed: {}", err))?;
let mut lister = self.op.list("metadata/").await.map_err(|err| {
Error::new(
crate::ErrorKind::Unexpected,
format!("list metadata failed: {}", err),
)
})?;

let mut paths = vec![];

while let Some(entry) = lister.next().await {
let entry = entry.map_err(|err| anyhow!("list metadata entry failed: {}", err))?;
let entry = entry.map_err(|err| {
Error::new(
crate::ErrorKind::Unexpected,
format!("list metadata entry failed: {}", err),
)
})?;

// Only push into paths if the entry is a metadata file.
if entry.path().ends_with(".metadata.json") {
Expand All @@ -222,7 +248,7 @@ impl Table {
/// Return a task writer used to write data into table.
pub async fn task_writer(&mut self) -> Result<TaskWriter> {
let task_writer = TaskWriter::try_new(
self.current_table_metadata()?.clone(),
self.current_table_metadata().clone(),
self.op.clone(),
0,
self.task_id,
Expand Down Expand Up @@ -321,7 +347,7 @@ mod tests {
let mut table = Table::new(op);
table.load().await?;

let table_metadata = table.current_table_metadata()?;
let table_metadata = table.current_table_metadata();
assert_eq!(table_metadata.format_version, types::TableFormatVersion::V1);
assert_eq!(table_metadata.last_updated_ms, 1686911671713);

Expand All @@ -347,7 +373,7 @@ mod tests {
let mut table = Table::new(op);
table.load().await?;

let table_metadata = table.current_table_metadata()?;
let table_metadata = table.current_table_metadata();
assert_eq!(table_metadata.format_version, types::TableFormatVersion::V1);
assert_eq!(table_metadata.last_updated_ms, 1672981042425);
assert_eq!(
Expand Down

0 comments on commit dc41051

Please sign in to comment.