Skip to content

Commit

Permalink
refactor select column by source coulumn id
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Aug 24, 2023
1 parent aa9c09a commit febd2ed
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 80 deletions.
70 changes: 64 additions & 6 deletions icelake/src/io/task_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ use crate::types::BoxedTransformFunction;
use crate::types::PartitionSpec;
use crate::types::{create_transform_function, DataFile, TableMetadata};
use crate::types::{struct_to_anyvalue_array_with_type, Any, AnyValue};
use crate::Error;
use crate::ErrorKind;
use arrow::array::ArrayRef;
use arrow::array::{Array, BooleanArray, StructArray};
use arrow::compute::filter_record_batch;
use arrow::datatypes::DataType;
use arrow::datatypes::Field;
use arrow::datatypes::Fields;
use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use arrow::record_batch::RecordBatch;
use arrow::row::{OwnedRow, RowConverter, SortField};
Expand Down Expand Up @@ -173,10 +178,10 @@ pub struct PartitionedWriter {
table_location: String,
location_generator: Arc<DataFileLocationGenerator>,
/// Paritition fields used to compute:
/// - usize: source column index of batch record
/// - Vec<usize>: index vector of the source column
/// - String: partition field name
/// - BoxedTransformFunction: transform function
partition_fields: Vec<(usize, String, BoxedTransformFunction)>,
partition_fields: Vec<(Vec<usize>, String, BoxedTransformFunction)>,
/// Partition value type
partition_type: Any,
table_config: TableConfigRef,
Expand All @@ -192,6 +197,47 @@ struct PartitionGroup {
}

impl PartitionedWriter {
/// Fetch the column index vector of the column id (We store it in Field of arrow as dict id).
/// e.g.
/// struct<struct<x:1,y:2>,z:3>
/// for source column id 2,
/// you will get the source column index vector [1,0]
fn fetch_column_index(fields: &Fields, index_vec: &mut Vec<usize>, col_id: i64) {
for (pos, field) in fields.iter().enumerate() {
let id: i64 = field
.metadata()
.get("column_id")
.expect("column_id must be set")
.parse()
.expect("column_id must can be parse as i64");
if col_id == id {
index_vec.push(pos);
return;
}
if let DataType::Struct(inner) = field.data_type() {
Self::fetch_column_index(inner, index_vec, col_id);
if !index_vec.is_empty() {
index_vec.push(pos);
return;
}
}
}
}

fn get_column_by_index_vec(batch: &RecordBatch, index_vec: &[usize]) -> ArrayRef {
let mut rev_iterator = index_vec.iter().rev();
let mut array = batch.column(*rev_iterator.next().unwrap()).clone();
for idx in rev_iterator {
array = array
.as_any()
.downcast_ref::<StructArray>()
.unwrap()
.column(*idx)
.clone();
}
array
}

/// Create a new `PartitionedWriter`.
pub fn new(
schema: ArrowSchemaRef,
Expand All @@ -206,9 +252,20 @@ impl PartitionedWriter {
.fields
.iter()
.map(|field| {
let index = schema.index_of(field.name.as_str()).unwrap();
let transform = create_transform_function(&field.transform)?;
Ok((index, field.name.clone(), transform))
let mut index_vec = vec![];
Self::fetch_column_index(
schema.fields(),
&mut index_vec,
field.source_column_id as i64,
);
if index_vec.is_empty() {
return Err(Error::new(
ErrorKind::IcebergDataInvalid,
format!("Can't find source column id: {}", field.source_column_id),
));
}
Ok((index_vec, field.name.clone(), transform))
})
.collect::<Result<Vec<_>>>()?;
Ok(Self {
Expand Down Expand Up @@ -239,8 +296,9 @@ impl PartitionedWriter {
let value_array = Arc::new(StructArray::from(
self.partition_fields
.iter()
.map(|(index_of_batch, field_name, transform)| {
let array = transform.transform(batch.column(*index_of_batch).clone())?;
.map(|(index_vec, field_name, transform)| {
let array = Self::get_column_by_index_vec(batch, index_vec);
let array = transform.transform(array)?;
let field = Arc::new(Field::new(
field_name.clone(),
array.data_type().clone(),
Expand Down
14 changes: 7 additions & 7 deletions icelake/src/types/arrow/to_arrow.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! to_arrow module provices the convert functions from iceberg in-memory
//! schema to arrow schema.

use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;

Expand Down Expand Up @@ -30,13 +31,12 @@ impl TryFrom<types::Field> for ArrowField {
type Error = Error;

fn try_from(value: types::Field) -> Result<Self, Self::Error> {
Ok(ArrowField::new_dict(
value.name,
value.field_type.try_into()?,
!value.required,
value.id as i64,
false,
))
let mut metadata = HashMap::new();
metadata.insert("column_id".to_string(), value.id.to_string());
Ok(
ArrowField::new(value.name, value.field_type.try_into()?, !value.required)
.with_metadata(metadata),
)
}
}

Expand Down
64 changes: 60 additions & 4 deletions icelake/tests/insert_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,40 @@ async fn test_insert_no_partition() {

let poetry = Poetry::new(format!("{}/../testdata/python", env!("CARGO_MANIFEST_DIR")));

let table_name = "t1".to_string();
let table_root = "demo/s1/t1".to_string();

let init_sqls = vec![
"CREATE SCHEMA IF NOT EXISTS s1".to_string(),
format!("DROP TABLE IF EXISTS s1.{table_name}"),
format!(
"
CREATE TABLE s1.{table_name}
(
id long,
v_int int,
v_long long,
v_float float,
v_double double,
v_varchar string,
v_bool boolean,
v_date date,
v_timestamp timestamp,
v_decimal decimal(36, 10),
v_ts_ntz timestamp_ntz
) USING iceberg
TBLPROPERTIES ('format-version'='2');"
),
];

let test_fixture = TestFixture {
spark,
minio,
poetry,
table_name: "t1".to_string(),
table_name,
csv_file: "data.csv".to_string(),
table_root: "demo/s1/t1".to_string(),
table_root,
init_sqls,
};

test_fixture.run().await
Expand All @@ -57,13 +84,42 @@ async fn test_insert_partition() {

let poetry = Poetry::new(format!("{}/../testdata/python", env!("CARGO_MANIFEST_DIR")));

let table_name = "t2".to_string();
let table_root = "demo/s1/t2".to_string();

let init_sqls = vec![
"CREATE SCHEMA IF NOT EXISTS s1".to_string(),
format!("DROP TABLE IF EXISTS s1.{}", table_name),
format!(
"
CREATE TABLE s1.{}
(
id long,
v_int int,
v_long long,
v_float float,
v_double double,
v_varchar string,
v_bool boolean,
v_date date,
v_timestamp timestamp,
v_decimal decimal(36, 10),
v_ts_ntz timestamp_ntz
) USING iceberg
PARTITIONED BY (v_int, v_long, v_float, v_double, v_varchar, v_bool, v_date, v_timestamp, v_ts_ntz)
TBLPROPERTIES ('format-version'='2');",
table_name
),
];

let test_fixture = TestFixture {
spark,
minio,
poetry,
table_name: "t2".to_string(),
table_name,
csv_file: "partition_data.csv".to_string(),
table_root: "demo/s1/t2".to_string(),
table_root,
init_sqls,
};

test_fixture.run().await
Expand Down
30 changes: 3 additions & 27 deletions icelake/tests/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub struct TestFixture<'a> {
pub table_name: String,
pub csv_file: String,
pub table_root: String,

pub init_sqls: Vec<String>,
}

impl TestFixture<'_> {
Expand All @@ -85,7 +87,7 @@ impl TestFixture<'_> {
];
let args: Vec<String> = args
.into_iter()
.chain(self.init_spark_table_sqls().into_iter())
.chain(self.init_sqls.clone().into_iter())
.collect();
self.poetry.run_file(
"init.py",
Expand Down Expand Up @@ -178,30 +180,4 @@ impl TestFixture<'_> {
self.write_data_with_icelake().await;
self.check_table_with_spark()
}

pub fn init_spark_table_sqls(&self) -> Vec<String> {
vec![
"CREATE SCHEMA IF NOT EXISTS s1".to_string(),
format!("DROP TABLE IF EXISTS s1.{}", self.table_name),
format!(
"
CREATE TABLE s1.{}
(
id long,
v_int int,
v_long long,
v_float float,
v_double double,
v_varchar string,
v_bool boolean,
v_date date,
v_timestamp timestamp,
v_decimal decimal(36, 10),
v_ts_ntz timestamp_ntz
) USING iceberg
TBLPROPERTIES ('format-version'='2');",
self.table_name
),
]
}
}
3 changes: 2 additions & 1 deletion icelake/tests/utils/poetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ impl Poetry {

let mut cmd = Command::new(POETRY_CMD);

cmd.arg("update").current_dir(proj_dir.as_str());
cmd.args(["update", "--quiet"])
.current_dir(proj_dir.as_str());

run_command(cmd, "poetry update");

Expand Down
16 changes: 8 additions & 8 deletions testdata/csv/partition_data.csv
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
1,1,1000,1.1,1.11,1-1,true,2022-11-01,2022-11-01 11:03:02.123456+04:00,389.11111,2022-11-01 11:03:02.123456
1,2,1000,1.1,1.11,1-1,true,2022-11-01,2022-11-01 11:03:02.123456+04:00,389.11111,2022-11-01 11:03:02.123456
1,3,1000,1.1,1.11,1-1,true,2022-11-01,2022-11-01 11:03:02.123456+04:00,389.11111,2022-11-01 11:03:02.123456
2,2,2000,2.2,2.22,2-2,false,2022-11-02,2022-11-02 11:03:02.123456+04:00,389.2222,2022-11-02 11:03:02.123456
2,3,2000,2.2,2.22,2-2,false,2022-11-02,2022-11-02 11:03:02.123456+04:00,389.2222,2022-11-02 11:03:02.123456
2,4,2000,2.2,2.22,2-2,false,2022-11-02,2022-11-02 11:03:02.123456+04:00,389.2222,2022-11-02 11:03:02.123456
3,4,3000,3.3,3.33,3-3,true,2022-11-03,2022-11-03 11:03:02.123456+04:00,389.3333,2022-11-03 11:03:02.123456
3,5,4000,4.4,4.44,4-4,false,2022-11-04,2022-11-04 11:04:02.123456+04:00,389.4444,2022-11-04 11:04:02.123456
4,6,5000,5.5,5.55,5-5,true,2022-11-05,2022-11-05 11:05:02.123456+04:00,389.5555,2022-11-05 11:05:02.123456
2,1,1000,1.1,1.11,1-1,true,2022-11-01,2022-11-01 11:03:02.123456+04:00,389.11111,2022-11-01 11:03:02.123456
3,1,1000,1.1,1.11,1-1,true,2022-11-01,2022-11-01 11:03:02.123456+04:00,389.11111,2022-11-01 11:03:02.123456
4,2,2000,2.2,2.22,2-2,false,2022-11-02,2022-11-02 11:03:02.123456+04:00,389.2222,2022-11-02 11:03:02.123456
5,2,2000,2.2,2.22,2-2,false,2022-11-02,2022-11-02 11:03:02.123456+04:00,389.2222,2022-11-02 11:03:02.123456
6,2,2000,2.2,2.22,2-2,false,2022-11-02,2022-11-02 11:03:02.123456+04:00,389.2222,2022-11-02 11:03:02.123456
7,3,3000,3.3,3.33,3-3,true,2022-11-03,2022-11-03 11:03:02.123456+04:00,389.3333,2022-11-03 11:03:02.123456
8,3,4000,4.4,4.44,4-4,false,2022-11-04,2022-11-04 11:04:02.123456+04:00,389.4444,2022-11-04 11:04:02.123456
9,4,5000,5.5,5.55,5-5,true,2022-11-05,2022-11-05 11:05:02.123456+04:00,389.5555,2022-11-05 11:05:02.123456
58 changes: 31 additions & 27 deletions testdata/python/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit febd2ed

Please sign in to comment.