Skip to content

Commit

Permalink
refactor(java): simpilfy fragment (#3307)
Browse files Browse the repository at this point in the history
In PR #3240, python code is
refactored, fragment is dataclass now. This PR refactors Java code, make
the API consistent with python api.
  • Loading branch information
chenkovsky authored Jan 9, 2025
1 parent 64adfea commit 837ac24
Show file tree
Hide file tree
Showing 19 changed files with 738 additions and 308 deletions.
91 changes: 46 additions & 45 deletions java/core/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use crate::error::{Error, Result};
use crate::ffi::JNIEnvExt;
use crate::traits::FromJString;
use crate::traits::{export_vec, import_vec, FromJObjectWithEnv, FromJString};
use crate::utils::{extract_storage_options, extract_write_params, get_index_params};
use crate::{traits::IntoJava, RT};
use arrow::array::RecordBatchReader;
Expand Down Expand Up @@ -355,7 +355,7 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_commitAppend<'local>(
_obj: JObject,
path: JString,
read_version_obj: JObject, // Optional<Long>
fragments_obj: JObject, // List<String>, String is json serialized Fragment
fragments_obj: JObject, // List<FragmentMetadata>
storage_options_obj: JObject, // Map<String, String>
) -> JObject<'local> {
ok_or_throw!(
Expand All @@ -374,31 +374,18 @@ pub fn inner_commit_append<'local>(
env: &mut JNIEnv<'local>,
path: JString,
read_version_obj: JObject, // Optional<Long>
fragments_obj: JObject, // List<String>, String is json serialized Fragment)
fragment_objs: JObject, // List<FragmentMetadata>
storage_options_obj: JObject, // Map<String, String>
) -> Result<JObject<'local>> {
let json_fragments = env.get_strings(&fragments_obj)?;
let mut fragments: Vec<Fragment> = Vec::new();
for json_fragment in json_fragments {
let fragment = Fragment::from_json(&json_fragment)?;
fragments.push(fragment);
let fragment_objs = import_vec(env, &fragment_objs)?;
let mut fragments = Vec::with_capacity(fragment_objs.len());
for f in fragment_objs {
fragments.push(f.extract_object(env)?);
}
let op = Operation::Append { fragments };
let path_str = path.extract(env)?;
let read_version = env.get_u64_opt(&read_version_obj)?;
let jmap = JMap::from_env(env, &storage_options_obj)?;
let storage_options: HashMap<String, String> = env.with_local_frame(16, |env| {
let mut map = HashMap::new();
let mut iter = jmap.iter(env)?;
while let Some((key, value)) = iter.next(env)? {
let key_jstring = JString::from(key);
let value_jstring = JString::from(value);
let key_string: String = env.get_string(&key_jstring)?.into();
let value_string: String = env.get_string(&value_jstring)?.into();
map.insert(key_string, value_string);
}
Ok::<_, Error>(map)
})?;
let storage_options = extract_storage_options(env, &storage_options_obj)?;
let dataset = BlockingDataset::commit(&path_str, op, read_version, storage_options)?;
dataset.into_java(env)
}
Expand All @@ -410,7 +397,7 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_commitOverwrite<'local>(
path: JString,
arrow_schema_addr: jlong,
read_version_obj: JObject, // Optional<Long>
fragments_obj: JObject, // List<String>, String is json serialized Fragment
fragments_obj: JObject, // List<FragmentMetadata>
storage_options_obj: JObject, // Map<String, String>
) -> JObject<'local> {
ok_or_throw!(
Expand All @@ -431,14 +418,13 @@ pub fn inner_commit_overwrite<'local>(
path: JString,
arrow_schema_addr: jlong,
read_version_obj: JObject, // Optional<Long>
fragments_obj: JObject, // List<String>, String is json serialized Fragment)
fragments_obj: JObject, // List<FragmentMetadata>
storage_options_obj: JObject, // Map<String, String>
) -> Result<JObject<'local>> {
let json_fragments = env.get_strings(&fragments_obj)?;
let mut fragments: Vec<Fragment> = Vec::new();
for json_fragment in json_fragments {
let fragment = Fragment::from_json(&json_fragment)?;
fragments.push(fragment);
let fragment_objs = import_vec(env, &fragments_obj)?;
let mut fragments = Vec::with_capacity(fragment_objs.len());
for f in fragment_objs {
fragments.push(f.extract_object(env)?);
}
let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema;
let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) };
Expand Down Expand Up @@ -596,14 +582,14 @@ fn inner_open_native<'local>(
}

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_getJsonFragments<'a>(
pub extern "system" fn Java_com_lancedb_lance_Dataset_getFragmentsNative<'a>(
mut env: JNIEnv<'a>,
jdataset: JObject,
) -> JObject<'a> {
ok_or_throw!(env, inner_get_json_fragments(&mut env, jdataset))
ok_or_throw!(env, inner_get_fragments(&mut env, jdataset))
}

fn inner_get_json_fragments<'local>(
fn inner_get_fragments<'local>(
env: &mut JNIEnv<'local>,
jdataset: JObject,
) -> Result<JObject<'local>> {
Expand All @@ -612,22 +598,37 @@ fn inner_get_json_fragments<'local>(
unsafe { env.get_rust_field::<_, _, BlockingDataset>(jdataset, NATIVE_DATASET) }?;
dataset.inner.get_fragments()
};
let fragments = fragments
.iter()
.map(|f| f.metadata().clone())
.collect::<Vec<Fragment>>();
export_vec(env, &fragments)
}

let array_list_class = env.find_class("java/util/ArrayList")?;

let array_list = env.new_object(array_list_class, "()V", &[])?;
#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Dataset_getFragmentNative<'a>(
mut env: JNIEnv<'a>,
jdataset: JObject,
fragment_id: jint,
) -> JObject<'a> {
ok_or_throw!(env, inner_get_fragment(&mut env, jdataset, fragment_id))
}

for fragment in fragments {
let json_string = serde_json::to_string(fragment.metadata())?;
let jstring = env.new_string(json_string)?;
env.call_method(
&array_list,
"add",
"(Ljava/lang/Object;)Z",
&[(&jstring).into()],
)?;
}
Ok(array_list)
fn inner_get_fragment<'local>(
env: &mut JNIEnv<'local>,
jdataset: JObject,
fragment_id: jint,
) -> Result<JObject<'local>> {
let fragment = {
let dataset =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(jdataset, NATIVE_DATASET) }?;
dataset.inner.get_fragment(fragment_id as usize)
};
let obj = match fragment {
Some(f) => f.metadata().into_java(env)?,
None => JObject::default(),
};
Ok(obj)
}

#[no_mangle]
Expand Down
Loading

0 comments on commit 837ac24

Please sign in to comment.