Data export
We provide several ways to export reduced data from CarboKitten to CSV files that are easier to read, visualize and post-process for most people.
abstract type ExportSpecification end
@kwdef struct CSV <: ExportSpecification
grid_locations::Vector{NTuple{2,Int}}
output_files::IdDict{Symbol,String}
end
CSV(grid_locations, kwargs...) = CSV(grid_locations, IdDict(kwargs...))
using CarboKitten.Export: CSV
CSV(tuple.(10:20:70, 25),
:sediment_accumulation_curve => "run_06_sac.csv",
:age_depth_model => "run_06_adm.csv",
:stratigraphic_column => "run_06_sc.csv",
:metadata => "run_06.toml")
CarboKitten.Export.CSV([(10, 25), (30, 25), (50, 25), (70, 25)], IdDict(:age_depth_model => "run_06_adm.csv", :metadata => "run_06.toml", :sediment_accumulation_curve => "run_06_sac.csv", :stratigraphic_column => "run_06_sc.csv"))
There is a data_export
function that can be overloaded with any ExportSpcification
. When given a CSV
specification, files are written as given.
:sediment_accumulation_curve
(SAC) is another term forsediment_height
elsewhere in the code.:age_depth_model
(ADM) is a monotonic version of the SAC, relating depth to age.:stratigraphic_column
amount of deposited material per facies per time step, corrected for disintegrated material. The cumulative sum of the SC should add up to the ADM.:metadata
some metadata, written as a TOML file.
Tests
We have a test case with just three pixels.
- Uniform production
- Uniform production, top-hat disintegration, making the sediment accumulation non-monotonic
- Linearly increasing production (not sure what this adds)
const AXES1 = Axes(
x=[0.0, 1.0, 2.0] * u"m",
y=[1.0] * u"m",
t=(0.0:0.1:1.0) * u"Myr")
const HEADER1 = Header(
tag="test",
axes=AXES1,
write_interval=1,
Δt=0.1u"Myr",
time_steps=10,
initial_topography=zeros(typeof(1.0u"m"), 3, 3),
sea_level=zeros(typeof(1.0u"m"), 10),
subsidence_rate=10u"m/Myr")
const PRODUCTION1 = reshape(
hcat(ones(Amount, 10),
ones(Amount, 10),
cumsum(ones(Amount, 10)) / 5.5)',
1, 3, 1, 10)
const DISINTEGRATION1 = reshape(
hcat(zeros(Amount, 10),
1:10 .|> (x -> x < 4 || x > 6 ? 0.0u"m" : 2.0u"m"),
zeros(Amount, 10))',
1, 3, 1, 10)
const ELEVATION1 = cat(
[0.0, 0.0, 0.0]u"m",
cumsum(PRODUCTION1 .- DISINTEGRATION1; dims=4)[1, :, :, :];
dims=3)
const DATA1 = Data(
disintegration=DISINTEGRATION1,
production=PRODUCTION1,
deposition=PRODUCTION1 .- DISINTEGRATION1,
sediment_elevation=ELEVATION1)
const GRID_LOCATIONS1 = [(1, 1), (2, 1), (3, 1)]
Sediment Accumulation
Writing CSV files
The CSV.jl
module lets us write a DataFrame
to CSV, but doesn't work so well in combination with Units.
"""
unitful_headers(df::DataFrame)
Gets a string representation for all column names including their unit.
Returns a `Vector{String}`.
"""
unitful_headers(df::DataFrame) =
["$(e.variable) [$(unit(e.eltype))]" for e in eachrow(describe(df))]
"""
ustrip(df::DataFrame)
Strip units from a `DataFrame`. Returns a new `DataFrame`.
"""
Unitful.ustrip(df::DataFrame) =
DataFrame((e.variable => df[!, e.variable] / unit(e.eltype)
for e in eachrow(describe(df)))...)
"""
write_unitful_csv(io::IO, df::DataFrame)
Write a CSV from a `DataFrame` with `Unitful` units. The units will be
represented in the CSV header, and stripped from the individual values.
"""
write_unitful_csv(io, df::DataFrame) =
write_csv(io, ustrip(df), header=unitful_headers(df))
We may test that writing and reading the CSV back, gives the same result:
@testset "Hither and Dither" begin
io = IOBuffer(UInt8[], read=true, write=true)
data_export(CSVExportTrait{:sediment_accumulation_curve}, io, HEADER1, DATA1, GRID_LOCATIONS1)
seek(io, 0)
df = read_csv(io, DataFrame)
rename!(df, (n => split(n)[1] for n in names(df))...)
@test df.sac1 ≈ ELEVATION1[1, 1, :] / u"m"
@test df.sac2 ≈ ELEVATION1[2, 1, :] / u"m"
@test df.sac3 ≈ ELEVATION1[3, 1, :] / u"m"
end
Age-depth model
Base.accumulate(f) = (args...; kwargs...) -> accumulate(f, args...; kwargs...)
"""
age_depth_model(sediment_accumulation_curve::Vector)
age_depth_model(sediment_accumulation_curve::DataFrame)
Compute the ADM from the SAC. Implemented as:
reverse ∘ accumulate(min) ∘ reverse
The `DataFrame` version `select`s SAC columns, transformed into ADM.
"""
age_depth_model(sac::Vector{T}) where {T} = sac |> reverse |> accumulate(min) |> reverse
age_depth_model(sac_df::DataFrame) =
let sac_cols = filter(contains("sac"), names(sac_df)),
adm_cols = replace.(sac_cols, "sac" => "adm")
select(sac_df, "time", (sac => age_depth_model => adm
for (sac, adm) in zip(sac_cols, adm_cols))...)
end
We test that the constructed ADM is monotonic increasing in time:
@testset "ADM Monotonicity" begin
sac = extract_sac(HEADER1, DATA1, GRID_LOCATIONS1)
adm = sac |> age_depth_model
@test sac.sac1 == adm.adm1
@test sac.sac3 == adm.adm3
@test sac.sac2 != adm.adm2
@test all(adm.adm2[2:end] .- adm.adm2[1:end-1] .>= 0.0u"m")
end
Stratigraphic Column
"""
stratigraphic_column(header::Header, data::Data, loc::NTuple{2,Int}, facies::Int)
Compute the Stratigraphic Column for a given grid position `loc` and `facies` index.
Returns an `Array{Quantity, 2}` where the `Quantity` is in units of meters.
"""
function stratigraphic_column(header::Header, data::Data, loc::NTuple{2,Int}, facies::Int)
dc = DataColumn(
loc,
data.disintegration[:, loc..., :],
data.production[:, loc..., :],
data.deposition[:, loc..., :],
data.sediment_elevation[loc..., :])
return stratigraphic_column(header, dc, facies)
end
function stratigraphic_column(header::Header, data::DataColumn, facies::Int)
n_times = length(header.axes.t) - 1
sc = zeros(typeof(1.0u"m"), n_times)
for ts = 1:n_times
acc = data.deposition[facies, ts] - data.disintegration[facies, ts]
if acc > 0.0u"m"
sc[ts] = acc
continue
end
ts_down = ts - 1
while acc < 0.0u"m"
ts_down < 1 && break
if -acc < sc[ts_down]
sc[ts_down] -= acc
break
else
acc += sc[ts_down]
sc[ts_down] = 0.0u"m"
end
ts_down -= 1
end
end
sc
end
The stratigraphic column should sum to the age-depth model.
@testset "SC sum equals ADM" begin
sac = extract_sac(HEADER1, DATA1, GRID_LOCATIONS1)
adm = sac |> age_depth_model
sc = extract_sc(HEADER1, DATA1, GRID_LOCATIONS1)
@test [0.0u"m"; cumsum(sc.sc1_f1)] ≈ adm.adm1
@test [0.0u"m"; cumsum(sc.sc2_f1)] ≈ adm.adm2
@test [0.0u"m"; cumsum(sc.sc3_f1)] ≈ adm.adm3
end
Dispatch
struct CSVExportTrait{S} end
function data_export(spec::T, filepath::String) where {T<:ExportSpecification}
data_export(spec, read_data(filepath)...)
end
function data_export(spec::CSV, header::Header, data::Data)
for (key, filename) in spec.output_files
if key == :metadata
md = Dict(
"global" => Dict(
"tag" => header.tag,
"subsidence_rate" => header.subsidence_rate,
"time_steps" => header.time_steps,
"delta_t" => header.Δt),
"locations" => [Dict(
"number" => i,
"x" => header.axes.x[loc[1]],
"y" => header.axes.y[loc[2]],
"initial_topography" => header.initial_topography[loc...])
for (i, loc) in enumerate(spec.grid_locations)],
"files" => spec.output_files)
open(filename, "w") do io
TOML.print(io, md) do obj
if obj isa Quantity
[ustrip(obj), string(unit(obj))]
else
obj
end
end
end
continue
end
open(filename, "w") do io
data_export(CSVExportTrait{key}, io, header, data, spec.grid_locations)
end
end
end
function data_export(::Type{CSVExportTrait{S}}, args...) where {S}
error("Unknown CSV data export: `$(S)`")
end
function data_export(::Type{CSVExportTrait{:sediment_accumulation_curve}},
io::IO, header::Header, data::Data, grid_locations::Vector{NTuple{2,Int}})
sac = extract_sac(header, data, grid_locations)
write_unitful_csv(io, sac)
end
function data_export(::Type{CSVExportTrait{:age_depth_model}},
io::IO, header::Header, data::Data, grid_locations::Vector{NTuple{2,Int}})
adm = extract_sac(header, data, grid_locations) |> age_depth_model
write_unitful_csv(io, adm)
end
function data_export(::Type{CSVExportTrait{:stratigraphic_column}},
io::IO, header::Header, data::Data, grid_locations::Vector{NTuple{2,Int}})
sc = extract_sc(header, data, grid_locations)
write_unitful_csv(io, sc)
end
"""
extract_sac(header::Header, data::Data, grid_locations::Vector{NTuple{2,Int}})
Extract Sediment Accumumlation Curve (SAC) from the data. The SAC is directly copied from
`data.sediment_elevation`. Returns a `DataFrame` with `time` and `sac<n>` columns where `<n>`
is in the range `1:length(grid_locations)`.
"""
function extract_sac(header::Header, data::Data, grid_locations::Vector{NTuple{2,Int}})
DataFrame(:time => header.axes.t[1:end],
(Symbol("sac$(i)") => data.sediment_elevation[loc..., :]
for (i, loc) in enumerate(grid_locations))...)
end
"""
extract_sc(header::Header, data::Data, grid_locations::Vector{NTuple{2,Int}})
Extract Stratigraphic Column (SC) from the data. Returns a `DataFrame` with `time` and `sc<n>` columns where `<n>`
is in the range `1:length(grid_locations)`.
"""
function extract_sc(header::Header, data::Data, grid_locations::Vector{NTuple{2,Int}})
n_facies = size(data.production)[1]
DataFrame("time" => header.axes.t[1:end-1],
("sc$(i)_f$(f)" => stratigraphic_column(header, data, loc, f)
for f in 1:n_facies, (i, loc) in enumerate(grid_locations))...)
end
module Export
export Data, DataSlice, DataColumn, Header, CSV, read_data, read_slice, read_column, data_export
using HDF5
import CSV: write as write_csv
using TOML
using Unitful
using DataFrames
using .Iterators: flatten
const Rate = typeof(1.0u"m/Myr")
const Amount = typeof(1.0u"m")
const Length = typeof(1.0u"m")
const Time = typeof(1.0u"Myr")
const na = [CartesianIndex()]
<<export-specification>>
@kwdef struct Axes
x::Vector{Length}
y::Vector{Length}
t::Vector{Time}
end
@kwdef struct Header
tag::String
axes::Axes
Δt::Time
write_interval::Int
time_steps::Int
initial_topography::Matrix{Amount}
sea_level::Vector{Length}
subsidence_rate::Rate
end
@kwdef struct Data
disintegration::Array{Amount,4}
production::Array{Amount,4}
deposition::Array{Amount,4}
sediment_elevation::Array{Amount,3}
end
struct DataSlice
slice::NTuple{2,Union{Colon,Int}}
disintegration::Array{Amount,3}
production::Array{Amount,3}
deposition::Array{Amount,3}
sediment_elevation::Array{Amount,2}
end
struct DataColumn
slice::NTuple{2,Int}
disintegration::Array{Amount,2}
production::Array{Amount,2}
deposition::Array{Amount,2}
sediment_elevation::Array{Amount,1}
end
function read_header(fid)
attrs = HDF5.attributes(fid["input"])
axes = Axes(
fid["input/x"][] * u"m",
fid["input/y"][] * u"m",
fid["input/t"][] * u"Myr")
return Header(
attrs["tag"][],
axes,
attrs["delta_t"][] * u"Myr",
attrs["write_interval"][],
attrs["time_steps"][],
fid["input/initial_topography"][] * u"m",
fid["input/sea_level"][] * u"m",
attrs["subsidence_rate"][] * u"m/Myr")
end
function read_data(filename)
h5open(filename) do fid
header = read_header(fid)
data = Data(
fid["disintegration"][] * u"m",
fid["production"][] * u"m",
fid["deposition"][] * u"m",
fid["sediment_height"][] * u"m")
header, data
end
end
read_slice(fid::HDF5.File, slice...) = DataSlice(
slice,
fid["disintegration"][:, slice..., :] * u"m",
fid["production"][:, slice..., :] * u"m",
fid["deposition"][:, slice..., :] * u"m",
fid["sediment_height"][slice..., :] * u"m")
function read_slice(filename::AbstractString, slice...)
h5open(filename) do fid
header = read_header(fid)
data = read_slice(fid, slice...)
header, data
end
end
read_column(fid::HDF5.File, slice...) = DataColumn(
slice,
fid["disintegration"][:, slice..., :] * u"m",
fid["production"][:, slice..., :] * u"m",
fid["deposition"][:, slice..., :] * u"m",
fid["sediment_height"][slice..., :] * u"m")
function read_column(filename::AbstractString, slice...)
h5open(filename) do fid
header = read_header(fid)
data = read_column(fid, slice...)
header, data
end
end
<<export-function>>
end
using CarboKitten.Export: Axes, Header, Data, data_export, CSVExportTrait,
age_depth_model, extract_sac, extract_sc, CSV
using CSV: read as read_csv
using TOML
using DataFrames
using Unitful
const Amount = typeof(1.0u"m")
<<export-test-case>>
@testset "Data Export" begin
<<export-test>>
@testset "Write to folder" begin
mktempdir() do path
spec = CSV(GRID_LOCATIONS1,
:sediment_accumulation_curve => joinpath(path, "sac.csv"),
:age_depth_model => joinpath(path, "adm.csv"),
:stratigraphic_column => joinpath(path, "sc.csv"),
:metadata => joinpath(path, "metadata.toml"))
data_export(spec, HEADER1, DATA1)
for f in values(spec.output_files)
@test isfile(f)
end
metadata = TOML.parsefile(spec.output_files[:metadata])
@test IdDict(Symbol(k) => v for (k, v) in metadata["files"]) == spec.output_files
@test length(metadata["locations"]) == 3
adm = read_csv(spec.output_files[:age_depth_model], DataFrame)
rename!(adm, (n => split(n)[1] for n in names(adm))...)
@test adm == ustrip(extract_sac(HEADER1, DATA1, GRID_LOCATIONS1) |> age_depth_model)
end
end
end