From 4db6d3965bdc204a83ac9fc34afc6fc0fbe4af56 Mon Sep 17 00:00:00 2001 From: Jonas Isensee Date: Mon, 26 Aug 2024 09:55:24 +0200 Subject: [PATCH] code cleanup --- src/datasets.jl | 62 ++++++--------------- src/explicit_datasets.jl | 113 ++++++--------------------------------- src/inlineunion.jl | 8 ++- src/types.jl | 17 +++++- 4 files changed, 51 insertions(+), 149 deletions(-) diff --git a/src/datasets.jl b/src/datasets.jl index 69e9f379..ce50cfa2 100644 --- a/src/datasets.jl +++ b/src/datasets.jl @@ -213,7 +213,7 @@ function read_empty(rr::ReadRepresentation{T}, f::JLDFile, dimensions_attr.datatype == h5fieldtype(f, Int64, Int64, Val{true}) || throw(UnsupportedFeatureException()) seek(io, dimensions_attr.data_offset) - v = construct_array(io, T, Val(ndims)) + v = construct_array(io, T, ndims) if isconcretetype(T) for i = 1:length(v) @inbounds v[i] = jlconvert(rr, f, Ptr{Cvoid}(0), header_offset) @@ -244,30 +244,12 @@ function get_ndims_offset(f::JLDFile, dataspace::ReadDataspace, attributes::Abst end """ - construct_array{T}(io::IO, ::Type{T}, ::Val{ndims}) + construct_array(io::IO, eltype, ndims::Int) Construct array by reading `ndims` dimensions from `io`. Assumes `io` has already been seeked to the correct position. """ -function construct_array(io::IO, ::Type{T}, ::Val{1}) where {T} - n = jlread(io, Int64) - Vector{T}(undef, n) -end - -function construct_array(io::IO, ::Type{T}, ::Val{2}) where {T} - d2 = jlread(io, Int64) - d1 = jlread(io, Int64) - Matrix{T}(undef, d1, d2) -end - -function construct_array(io::IO, ::Type{T}, ::Val{3}) where {T} - d3 = jlread(io, Int64) - d2 = jlread(io, Int64) - d1 = jlread(io, Int64) - Array{T,3}(undef, d1, d2, d3) -end - -function construct_array(io::IO, ::Type{T}, ::Val{N})::Array{T,N} where {T,N} +function construct_array(io::IO, ::Type{T}, N::Int) where {T} ds = reverse(ntuple(i->jlread(io, Int64), Val(N))) Array{T,N}(undef, ds...) end @@ -283,7 +265,7 @@ end ndims, offset = get_ndims_offset(f, dataspace, attributes) seek(io, offset) - v = construct_array(io, T, Val(Int(ndims))) + v = construct_array(io, T, Int(ndims)) n = length(v) seek(io, data_offset) if iscompressed(filters) @@ -296,7 +278,7 @@ end else ndims, offset = get_ndims_offset(f, dataspace, attributes) seek(io, offset) - v = construct_array(io, T, Val(Int(ndims))) + v = construct_array(io, T, Int(ndims)) if layout.version == 3 # version 1 B-tree # This version appears to be padding incomplete chunks @@ -385,14 +367,17 @@ end seek(io, header_offset) f.end_of_data = header_offset + fullsz - if ismutabletype(typeof(data)) && !isa(wsession, JLDWriteSession{Union{}}) - wsession.h5offset[objectid(data)] = h5offset(f, header_offset) - push!(wsession.objects, data) - end + track!(wsession, data, header_offset) cio = begin_checksum_write(io, fullsz - 4) - write_object_header_and_dataspace_message(cio, f, psz, dataspace) - write_datatype_message(cio, datatype) + jlwrite(cio, ObjectStart(size_flag(psz))) + write_size(cio, psz) + write_header_message(cio, Val(HmFillValue); flags=0x09) + write_header_message(cio, Val(HmDataspace); dataspace.dataspace_type, dimensions=dataspace.size) + for attr in dataspace.attributes + write_header_message(cio, f, attr) + end + write_header_message(cio, Val(HmDatatype), 1 | (2*isa(dt, CommittedDatatype)); dt) # Data storage layout if layout_class == LcCompact @@ -429,25 +414,10 @@ end h5offset(f, header_offset) end -function write_object_header_and_dataspace_message(cio::IO, f::JLDFile, psz::Int, dataspace::WriteDataspace) - jlwrite(cio, ObjectStart(size_flag(psz))) - write_size(cio, psz) - write_header_message(cio, Val(HmFillValue); flags=0x09) - write_header_message(cio, Val(HmDataspace); dataspace.dataspace_type, dimensions=dataspace.size) - for attr in dataspace.attributes - write_header_message(cio, f, attr) - end -end - -write_datatype_message(cio::IO, dt::H5Datatype) = - write_header_message(cio, Val(HmDatatype), 1 | (2*isa(dt, CommittedDatatype)); dt) - @nospecializeinfer function write_dataset(f::JLDFile, @nospecialize(x), wsession::JLDWriteSession)::RelOffset - if ismutabletype(typeof(x)) && !isa(wsession, JLDWriteSession{Union{}}) - offset = get(wsession.h5offset, objectid(x), UNDEFINED_ADDRESS) - offset != UNDEFINED_ADDRESS && return offset - end + offset = get_tracked(wsession, x) + offset != UNDEFINED_ADDRESS && return offset odr = objodr(x) write_dataset(f, WriteDataspace(f, x, odr), h5type(f, x), odr, x, wsession)::RelOffset end diff --git a/src/explicit_datasets.jl b/src/explicit_datasets.jl index f4d782ab..7168f70e 100644 --- a/src/explicit_datasets.jl +++ b/src/explicit_datasets.jl @@ -118,12 +118,11 @@ end Write data to file using metadata prepared in the `dataset`. """ -function write_dataset(dataset::Dataset, data) +function write_dataset(dataset::Dataset, data, wsession::JLDWriteSession=JLDWriteSession()) f = dataset.parent.f if dataset.offset != UNDEFINED_ADDRESS throw(ArgumentError("Dataset has already been written to file")) end - wsession = JLDWriteSession() # first need to figure out if data type and dataspace are defined / correct if isnothing(dataset.datatype) dataset.datatype = h5type(f, data) @@ -134,103 +133,24 @@ function write_dataset(dataset::Dataset, data) dataset.dataspace = WriteDataspace(f, data, odr) end dataspace = dataset.dataspace - # Attributes - attributes = map(collect(dataset.attributes)) do (name, attr) - attr isa WrittenAttribute && return attr - return WrittenAttribute(dataset.parent.f, name, attr) - throw(ArgumentError("Invalid attribute: $a")) - end - io = f.io - odr = objodr(data) - datasz = odr_sizeof(odr)::Int * numel(dataspace)::Int - - psz = payload_size_without_storage_message(dataspace, datatype)::Int - - psz += sum(message_size.(attributes), init=0) - - # minimum extra space for continuation message - psz += jlsizeof(HeaderMessage) + jlsizeof(RelOffset) + jlsizeof(Length) - - - # determine layout class - # DataLayout object is only available after the data is written - if datasz == 0 || (!(data isa Array) && datasz < 8192) - layout_class = LcCompact - psz += jlsizeof(Val(HmDataLayout); layout_class, data_size=datasz) - elseif !isnothing(dataset.chunk) || !isempty(dataset.filters.filters) + if !isempty(dataset.filters.filters) filter_id = dataset.filters.filters[1].id invoke_again, compressor = get_compressor(filter_id) if invoke_again return Base.invokelatest(write_dataset, dset, data)::RelOffset end - # Do some additional checks on the data here - layout_class = LcChunked - # improve filter support here - psz += chunked_storage_message_size(ndims(data)) + pipeline_message_size(filter_id::UInt16) else - layout_class = LcContiguous - psz += jlsizeof(Val(HmDataLayout); layout_class) + compressor = nothing end - fullsz = jlsizeof(ObjectStart) + size_size(psz) + psz + 4 + offset = write_dataset(f, dataspace, datatype, odr, data, wsession, compressor) + !isempty(dataset.name) && (dataset.parent[dataset.name] = offset) - header_offset = f.end_of_data - seek(io, header_offset) - f.end_of_data = header_offset + fullsz - - if ismutabletype(typeof(data)) && !isa(wsession, JLDWriteSession{Union{}}) - wsession.h5offset[objectid(data)] = h5offset(f, header_offset) - push!(wsession.objects, data) - end - - cio = begin_checksum_write(io, fullsz - 4) - write_object_header_and_dataspace_message(cio, f, psz, dataspace) - write_datatype_message(cio, datatype) - for a in attributes - write_header_message(cio, f, a, wsession) - end - # Data storage layout - if layout_class == LcCompact - write_header_message(cio, Val(HmDataLayout); layout_class, data_size=datasz) - if datasz != 0 - write_data(cio, f, data, odr, datamode(odr), wsession) - end - dataset.header_chunk_info = (header_offset, position(cio)+20, position(cio)) - # Add NIL message replacable by continuation message - write_continuation_placeholder(cio) - jlwrite(io, end_checksum(cio)) - elseif layout_class == LcChunked - write_filter_pipeline_message(cio, filter_id) - - # deflate first - deflated = deflate_data(f, data, odr, wsession, compressor) - - write_chunked_storage_message(cio, odr_sizeof(odr), size(data), length(deflated), h5offset(f, f.end_of_data)) - dataset.header_chunk_info = (header_offset, position(cio)+20, position(cio)) - write_continuation_placeholder(cio) - jlwrite(f.io, end_checksum(cio)) - - seek(f.io, f.end_of_data) - f.end_of_data += length(deflated) - jlwrite(f.io, deflated) - else - # Align contiguous chunk to 8 bytes in the file - address = f.end_of_data + 8 - mod1(f.end_of_data, 8) - data_address = h5offset(f, address) - write_header_message(cio, Val(HmDataLayout); - layout_class, data_address, data_size=datasz) - - dataset.header_chunk_info = (header_offset, position(cio)+20, position(cio)) - # Add NIL message replacable by continuation message - write_continuation_placeholder(cio) - jlwrite(io, end_checksum(cio)) - - f.end_of_data = address + datasz - seek(io, address) - write_data(io, f, data, odr, datamode(odr), wsession) + # Attributes + # TODO: this can be optimized by writing all attributes at once + for (name, attr) in pairs(dataset.attributes) + add_attribute(dataset, name, value, wsession) end - offset = h5offset(f, header_offset) - !isempty(dataset.name) && (dataset.parent[dataset.name] = offset) return offset end @@ -301,11 +221,6 @@ function get_dataset(f::JLDFile, offset::RelOffset, g=f.root_group, name="") return dset end -function add_attribute(dset::Dataset, name::String, data::Dataset) - # link an existing dataset as attribute - throw(UnsupportedFeatureException("Not implemented")) -end - # Attributes message_size(msg::WrittenAttribute) = jlsizeof(HeaderMessage) + jlsizeof(msg) function write_header_message(io,f::JLDFile, msg::WrittenAttribute, wsession=JLDWriteSession()) @@ -562,8 +477,14 @@ function allocate_early(dset::Dataset, T::DataType) f.end_of_data = header_offset + fullsz cio = begin_checksum_write(io, fullsz - 4) - write_object_header_and_dataspace_message(cio, f, psz, dataspace) - write_datatype_message(cio, datatype) + jlwrite(cio, ObjectStart(size_flag(psz))) + write_size(cio, psz) + write_header_message(cio, Val(HmFillValue); flags=0x09) + write_header_message(cio, Val(HmDataspace); dataspace.dataspace_type, dimensions=dataspace.size) + for attr in dataspace.attributes + write_header_message(cio, f, attr) + end + write_header_message(cio, Val(HmDatatype), 1 | (2*isa(dt, CommittedDatatype)); dt) for a in attributes write_header_message(cio, f, a, wsession) end diff --git a/src/inlineunion.jl b/src/inlineunion.jl index a9bd7b37..e84debdb 100644 --- a/src/inlineunion.jl +++ b/src/inlineunion.jl @@ -35,10 +35,8 @@ end @nospecializeinfer function write_dataset(f::JLDFile, @nospecialize(x::Array), wsession::JLDWriteSession, @nospecialize(compress=f.compress)) T = eltype(x) - if !isa(wsession, JLDWriteSession{Union{}}) - offset = get(wsession.h5offset, objectid(x), UNDEFINED_ADDRESS) - offset != UNDEFINED_ADDRESS && return offset - end + offset = get_tracked(wsession, x) + offset != UNDEFINED_ADDRESS && return offset if T isa Union && writeasbits(T) # Conversion has to be done earlier here because # vectors are special cased in dispatch @@ -60,7 +58,7 @@ function read_array(f::JLDFile, dataspace::ReadDataspace, io = f.io ndims, offset = get_ndims_offset(f, dataspace, attributes) seek(io, offset) - v = construct_array(io, InlineUnionEl{T1,T2}, Val(Int(ndims))) + v = construct_array(io, InlineUnionEl{T1,T2}, Int(ndims)) n = length(v) seek(io, layout.data_offset) if iscompressed(filters) diff --git a/src/types.jl b/src/types.jl index 28e5fb3a..aa98ed54 100644 --- a/src/types.jl +++ b/src/types.jl @@ -120,8 +120,21 @@ struct JLDWriteSession{T<:Union{Dict{UInt,RelOffset},Union{}}} JLDWriteSession{T}(h5offset, objects) where T = new(h5offset, objects) end JLDWriteSession() = JLDWriteSession{Dict{UInt,RelOffset}}(Dict{UInt,RelOffset}(), Any[]) - - +track!(s::JLDWriteSession{Union{}}, args...) = nothing +function track!(s::JLDWriteSession, data, offset::RelOffset) + if ismutabletype(typeof(data)) + wsession.h5offset[objectid(data)] = h5offset(f, header_offset) + push!(wsession.objects, data) + end + nothing +end +get_tracked(wsession::JLDWriteSession{Union{}}, data) = UNDEFINED_ADDRESS +function get_tracked(wsession::JLDWriteSession, data) + if ismutabletype(typeof(data)) + return get(wsession.h5offset, objectid(data), UNDEFINED_ADDRESS) + end + return UNDEFINED_ADDRESS +end """ GlobalHeap