HDF5 Writer
Main loop
The H5Writer
runs through an overloaded version of run_model
. For example:
run_model(Model{ALCAP}, ALCAP.Example.INPUT, "example.h5")
⪡hdf5-run-model⪢≣
"""
run_model(::Type{Model{M}}, input::AbstractInput, filename::AbstractString) where M
Run a model and write output to HDF5.
run_model(Model{ALCAP}, ALCAP.Example.INPUT, "example.h5")
"""
function run_model(::Type{Model{M}}, input::AbstractInput, filename::AbstractString) where {M}
output = H5Output(input, filename)
run_model(Model{M}, input, output)
return filename
end
Implementation
file:src/Output/H5Writer.jl
module H5Writer
using HDF5
using Unitful
import ...CarboKitten: run_model, Model, AbstractOutput, AbstractInput, OutputSpec, AbstractState
using ...CarboKitten: time_axis, box_axes
using ...Components.WaterDepth: initial_topography
using ...Utility: in_units_of
using ..Abstract
import ..Abstract: add_data_set, set_attribute, frame_writer, state_writer
mutable struct H5Output <: AbstractOutput
header::Header
fid::HDF5.File
end
function H5Output(input::AbstractInput, filename::String)
t_axis = time_axis(input.time)
x_axis, y_axis = box_axes(input.box)
axes = Axes(x=x_axis, y=y_axis, t=t_axis)
h0 = initial_topography(input)
sl = input.sea_level.(t_axis)
header = Header(
tag=input.tag,
axes=axes,
Δt=input.time.Δt,
time_steps=input.time.steps,
grid_size=input.box.grid_size,
n_facies=length(input.facies),
initial_topography=h0,
sea_level=sl,
subsidence_rate=input.subsidence_rate,
data_sets=Dict(),
attributes=Dict())
fid = h5open(filename, "w")
create_group(fid, "input")
finalizer(H5Output(header, fid)) do x
close(x.fid)
end
end
axis_size(::Colon, a::Int) = a
axis_size(::Int, _) = 1
axis_size(r::AbstractRange{Int}, _) = length(r)
slice_str(::Colon) = ":"
slice_str(r::AbstractRange{Int}) = "$(r.start):$(r.stop)"
slice_str(i::Int) = "$(i)"
is_column(::Int, ::Int) = true
is_column(_, _) = false
function add_data_set(out::H5Output, name::Symbol, spec::OutputSpec)
header = out.header
fid = out.fid
nf = header.n_facies
nw = div(header.time_steps, spec.write_interval)
size = axis_size.(spec.slice, header.grid_size)
grp = HDF5.create_group(fid, string(name))
attrs = attributes(grp)
attrs["slice"] = join(slice_str.(spec.slice), ",")
attrs["write_interval"] = spec.write_interval
HDF5.create_dataset(grp, "production", datatype(Float64),
dataspace(nf, size..., nw),
chunk=(nf, size..., 1), deflate=3)
HDF5.create_dataset(grp, "disintegration", datatype(Float64),
dataspace(nf, size..., nw),
chunk=(nf, size..., 1), deflate=3)
HDF5.create_dataset(grp, "deposition", datatype(Float64),
dataspace(nf, size..., nw),
chunk=(nf, size..., 1), deflate=3)
HDF5.create_dataset(grp, "sediment_thickness", datatype(Float64),
dataspace(size..., nw + 1),
chunk=(size..., 1), deflate=3)
end
function get_group(fid::HDF5.File, name::String)
path = split(name, "/")
gid = fid["input"]
for n in path[1:end-1]
if !haskey(gid, n)
create_group(gid, n)
end
gid = gid[n]
end
return gid
end
function set_attribute(out::H5Output, name::String, value::AbstractArray{T,Dim}) where {T,Dim}
gid = get_group(out.fid, name)
tag = split(name,"/")[end]
gid[tag] = value
end
function set_attribute(out::H5Output, name::String, value)
gid = get_group(out.fid, name)
attr = attributes(gid)
tag = split(name,"/")[end]
attr[tag] = value
end
function state_writer(input::AbstractInput, out::H5Output)
output_spec = input.output
fid = out.fid
grid_size = out.header.grid_size
function (idx::Int, state::AbstractState)
for (k, v) in output_spec
size = axis_size.(v.slice, grid_size)
if mod(idx - 1, v.write_interval) == 0
fid[string(k)]["sediment_thickness"][:, :, div(idx - 1, v.write_interval)+1] =
(is_column(v.slice...) ?
Float64[state.sediment_height[v.slice...] |> in_units_of(u"m");] :
reshape(state.sediment_height[v.slice...], size) |> in_units_of(u"m"))
end
end
end
end
function frame_writer(input::AbstractInput, out::H5Output)
n_f = out.header.n_facies
grid_size = out.header.grid_size
output_spec = input.output
function (idx::Int, frame::Frame)
try_write(tgt, ::Nothing, v) = ()
function try_write(tgt, src::AbstractArray, v)
size = axis_size.(v.slice, grid_size)
tgt[:, :, :, div(idx - 1, v.write_interval)+1] +=
(reshape(src[:, v.slice...], (n_f, size...)) |> in_units_of(u"m"))
end
for (k, v) in input.output
n_writes = div(input.time.steps, v.write_interval)
if div(idx-1, v.write_interval) + 1 <= n_writes
grp = out.fid[string(k)]
try_write(grp["production"], frame.production, v)
try_write(grp["disintegration"], frame.disintegration, v)
try_write(grp["deposition"], frame.deposition, v)
end
end
end
end
<<hdf5-run-model>>
end
Tests
file:test/Output/H5WriterSpec.jl
module H5WriterSpec
using CarboKitten
using HDF5
using CarboKitten.Output.Abstract: frame_writer, add_data_set, Frame
using CarboKitten.Output.H5Writer: H5Output
using Unitful
using Test
const DummyFacies = [
ALCAP.Facies(
viability_range = (0, 0),
activation_range = (0, 0),
maximum_growth_rate=0.0u"m/Myr",
extinction_coefficient=0.0u"m^-1",
saturation_intensity=0.0u"W/m^2",
diffusion_coefficient=0.0u"m/yr")]
const input = ALCAP.Input(
tag="test",
box=Box{Periodic{2}}(grid_size=(5, 1), phys_scale=5.0u"m"),
time=TimeProperties(
Δt=0.0001u"Myr",
steps=10),
output=Dict(
:wi1 => OutputSpec(slice=(:,:), write_interval=1),
:wi2 => OutputSpec(slice=(:,:), write_interval=2),
:wi3 => OutputSpec(slice=(:,:), write_interval=3),
:wi4 => OutputSpec(slice=(:,:), write_interval=4)),
ca_interval=1,
initial_topography=(x, y) -> -0.0u"m",
sea_level = t -> 0.0u"m",
subsidence_rate=0.0u"m/Myr",
disintegration_rate=0.0u"m/Myr",
insolation=0.0u"W/m^2",
sediment_buffer_size=0,
depositional_resolution=0.0u"m",
facies=DummyFacies)
const filename = "testH5.h5"
@testset "Components/H5Writer" begin
mktempdir() do path
fpath = joinpath(path, filename)
output = H5Output(input, fpath)
write_frame = frame_writer(input, output)
ALCAP.write_header(input, output)
for (k, v) in input.output
add_data_set(output, k, v)
end
# create a frame of ones to be the deposition etc. each time step
dummy_data = ones(Float64, 1, 5, 1) * u"m"
inc = Frame(
production = dummy_data,
deposition = dummy_data,
disintegration = dummy_data
)
for t = 1:input.time.steps
write_frame(t, inc)
end
close(output.fid)
@testset "size of output array" begin
h5open(fpath, "r") do f
@test size(f["wi1"]["deposition"][])[4] == 10
@test size(f["wi2"]["deposition"][])[4] == 5
@test size(f["wi3"]["deposition"][])[4] == 3
@test size(f["wi4"]["deposition"][])[4] == 2
end
end
@testset "frame written only every write_interval" begin
h5open(fpath, "r") do f
@test all(f["wi1"]["deposition"][] .≈ attrs(f["wi1"])["write_interval"])
@test all(f["wi2"]["deposition"][] .≈ attrs(f["wi2"])["write_interval"])
@test all(f["wi3"]["deposition"][] .≈ attrs(f["wi3"])["write_interval"])
@test all(f["wi4"]["deposition"][] .≈ attrs(f["wi4"])["write_interval"])
end
end
end
end
end