apply
export applyThis file mainly defines the apply function.
In general, the idea behind the apply framework is to take as input any geometry, vector of geometries, or feature collection, deconstruct it to the given trait target (any arbitrary GI.AbstractTrait or TraitTarget union thereof, like PointTrait or PolygonTrait) and perform some operation on it. Then, the geometry or structure is rebuilt.
This allows for a simple and consistent framework within which users can define their own operations trivially easily, and removes a lot of the complexity involved with handling complex geometry structures.
For example, a simple way to flip the x and y coordinates of a geometry is:
flipped_geom = GO.apply(GI.PointTrait(), geom) do p
(GI.y(p), GI.x(p))
endAs simple as that. There's no need to implement your own decomposition because it's done for you.
Functions like flip, reproject, transform, even segmentize and simplify have been implemented using the apply framework. Similarly, centroid, area and distance have been implemented using the applyreduce framework.
GeometryOpsCore.apply Function
apply(f, target::Union{TraitTarget, GI.AbstractTrait}, obj; kw...)Reconstruct a geometry, feature, feature collection, or nested vectors of either using the function f on the target trait.
f(target_geom) => x where x also has the target trait, or a trait that can be substituted. For example, swapping PolgonTrait to MultiPointTrait will fail if the outer object has MultiPolygonTrait, but should work if it has FeatureTrait.
Objects "shallower" than the target trait are always completely rebuilt, like a Vector of FeatureCollectionTrait of FeatureTrait when the target has PolygonTrait and is held in the features. These will always be GeoInterface geometries/feature/feature collections. But "deeper" objects may remain unchanged or be whatever GeoInterface compatible objects f returns.
The result is a functionally similar geometry with values depending on f.
threaded:trueorfalse. Whether to use multithreading. Defaults tofalse.crs: The CRS to attach to geometries. Defaults tonothing.calc_extent:trueorfalse. Whether to calculate the extent. Defaults tofalse.
Example
Flipped point the order in any feature or geometry, or iterables of either:
import GeoInterface as GI
import GeometryOps as GO
geom = GI.Polygon([GI.LinearRing([(1, 2), (3, 4), (5, 6), (1, 2)]),
GI.LinearRing([(3, 4), (5, 6), (6, 7), (3, 4)])])
flipped_geom = GO.apply(GI.PointTrait, geom) do p
(GI.y(p), GI.x(p))
endWhat is apply?
apply applies some function to every geometry matching the Target GeoInterface trait, in some arbitrarily nested object made up of:
AbstractArrays (we also try to iterate other non-GeoInteface compatible object)FeatureCollectionTraitobjectsFeatureTraitobjectsAbstractGeometryTraitobjects
apply recursively calls itself through these nested layers until it reaches objects with the Target GeoInterface trait. When found apply applies the function f, and stops.
The outer recursive functions then progressively rebuild the object using GeoInterface objects matching the original traits.
If PointTrait is found but it is not the Target, an error is thrown. This likely means the object contains a different geometry trait to the target, such as MultiPointTrait when LineStringTrait was specified.
To handle this possibility it may be necessary to make Target a Union of traits found at the same level of nesting, and define methods of f to handle all cases.
Be careful making a union across "levels" of nesting, e.g. Union{FeatureTrait,PolygonTrait}, as _apply will just never reach PolygonTrait when all the polygons are wrapped in a FeatureTrait object.
Embedding
extent and crs can be embedded in all geometries, features, and feature collections as part of apply. Geometries deeper than Target will of course not have new extent or crs embedded.
calc_extentsignals to recalculate anExtentand embed it.crswill be embedded as-is
Threading
Threading is used at the outermost level possible - over an array, feature collection, or e.g. a MultiPolygonTrait where each PolygonTrait sub-geometry may be calculated on a different thread.
Currently, threading defaults to false for all objects, but can be turned on by passing the keyword argument threaded=true to apply.
Threading uses StableTasks.jl to provide type-stable tasks (base Julia Threads.@spawn is not type stable). This is completely cost-free and saves some allocations when running multithreaded.
The current strategy is to launch 2 tasks for each CPU thread, to provide load balancing. We assume Julia will manage these tasks efficiently, and we don't want to run too many tasks since each task does have some overhead when it's created. This may need revisiting in the future, but it's a pretty easy heuristic to use.
Implementation
Literate.jl source code is below.
"""
apply(f, target::Union{TraitTarget, GI.AbstractTrait}, obj; kw...)
Reconstruct a geometry, feature, feature collection, or nested vectors of
either using the function `f` on the `target` trait.
`f(target_geom) => x` where `x` also has the `target` trait, or a trait that can
be substituted. For example, swapping `PolgonTrait` to `MultiPointTrait` will fail
if the outer object has `MultiPolygonTrait`, but should work if it has `FeatureTrait`.
Objects "shallower" than the target trait are always completely rebuilt, like
a `Vector` of `FeatureCollectionTrait` of `FeatureTrait` when the target
has `PolygonTrait` and is held in the features. These will always be GeoInterface
geometries/feature/feature collections. But "deeper" objects may remain
unchanged or be whatever GeoInterface compatible objects `f` returns.
The result is a functionally similar geometry with values depending on `f`.
$APPLY_KEYWORDS
# Example
Flipped point the order in any feature or geometry, or iterables of either:
```julia
import GeoInterface as GI
import GeometryOps as GO
geom = GI.Polygon([GI.LinearRing([(1, 2), (3, 4), (5, 6), (1, 2)]),
GI.LinearRing([(3, 4), (5, 6), (6, 7), (3, 4)])])
flipped_geom = GO.apply(GI.PointTrait, geom) do p
(GI.y(p), GI.x(p))
end
```
"""
@inline function apply(
f::F, target, geom; calc_extent=false, threaded=false, kw...
) where F
threaded = booltype(threaded)
calc_extent = booltype(calc_extent)
_apply(f, TraitTarget(target), geom; threaded, calc_extent, kw...)
endCall _apply again with the trait of geom
@inline _apply(f::F, target, geom; kw...) where F =
_apply(f, target, GI.trait(geom), geom; kw...)There is no trait and this is an AbstractArray - so just iterate over it calling _apply on the contents
@inline function _apply(f::F, target, ::Nothing, A::AbstractArray; threaded, kw...) where F
applicator = ApplyToArray(f, target, A; kw...)
_maptasks(applicator, eachindex(A), threaded)
endThere is no trait and this is not an AbstractArray. Try to call _apply over it. We can't use threading as we don't know if we can can index into it. So just map.
@inline function _apply(f::F, target, ::Nothing, iterable::IterableType; threaded, kw...) where {F, IterableType}Try the Tables.jl interface first
if Tables.istable(iterable)
_apply_table(f, target, iterable; threaded, kw...)
else # this is probably some form of iterable...
if threaded isa Truecollect first so we can use threads
_apply(f, target, collect(iterable); threaded, kw...)
else
apply_to_iterable(x) = _apply(f, target, x; kw...)
map(apply_to_iterable, iterable)
end
end
end
#=
Doing this inline in `_apply` is _heavily_ type unstable, so it's best to separate this
by a function barrier.
This function operates `apply` on the `geometry` column of the table, and returns a new table
with the same schema, but with the new geometry column.
This new table may be of the same type as the old one iff `Tables.materializer` is defined for
that table. If not, then a `NamedTuple` is returned.
=#
function _apply_table(f::F, target, iterable::IterableType; geometrycolumn = nothing, preserve_default_metadata = false, threaded, kw...) where {F, IterableType} # We extract the geometry column and run `apply` on it.First, we need the table schema:
input_schema = Tables.schema(iterable)
input_colnames = input_schema.namesthen, we find the geometry column(s)
geometry_columns = if isnothing(geometrycolumn)
GI.geometrycolumns(iterable)
elseif geometrycolumn isa NTuple{N, <: Symbol} where N
geometrycolumn
elseif geometrycolumn isa Symbol
(geometrycolumn,)
else
throw(ArgumentError("geometrycolumn must be a Symbol or a tuple of Symbols, got a $(typeof(geometrycolumn))"))
end
if !Base.issubset(geometry_columns, input_colnames)
throw(ArgumentError(
"""
`apply`: the `geometrycolumn` kwarg must be a subset of the column names of the table,
got $(geometry_columns)
but the table has columns
$(input_colnames)
"""
))
endhere we apply the function to the geometry column(s).
apply_kw = if isempty(used_reconstruct_table_kwargs(iterable))
kw
else
Base.structdiff(values(kw), NamedTuple{used_reconstruct_table_kwargs(iterable)})
end
new_geometry_vecs = map(geometry_columns) do colname
_apply(f, target, Tables.getcolumn(iterable, colname); threaded, apply_kw...)
endThen, we filter the geometry column(s) out,
new_names = filter(x -> !(x in geometry_columns), input_colnames)and try to rebuild the same table as the best type - either the original type of iterable, or a named tuple which is the default fallback. See the function directly below this one for the actual fallback implementation.
result = reconstruct_table(iterable, geometry_columns, new_geometry_vecs, new_names; kw...)Finally, we ensure that metadata is propagated correctly. This can only happen if the original table supports metadata reads, and the result supports metadata writes.
if DataAPI.metadatasupport(typeof(result)).writeCopy over all metadata from the original table to the new table, if the original table supports metadata reading.
if DataAPI.metadatasupport(IterableType).read
for (key, (value, style)) in DataAPI.metadata(iterable; style = true)Default styles are not preserved on data transformation, so we must skip them!
if !preserve_default_metadata
style == :default && continue
endWe assume that any other style is preserved.
DataAPI.metadata!(result, key, value; style)
end
endWe don't usually care about the original table's metadata for GEOINTERFACE namespaced keys, so we should set the crs and geometrycolumns metadata if they are present. Ensure that GEOINTERFACE:geometrycolumns and GEOINTERFACE:crs are set!
mdk = DataAPI.metadatakeys(result)If the user has asked for geometry columns to persist, they would be here, so we don't need to set them.
if !("GEOINTERFACE:geometrycolumns" in mdk)If the geometry columns are not already set, we need to set them.
DataAPI.metadata!(result, "GEOINTERFACE:geometrycolumns", geometry_columns; style = :note)
endForce reset CRS always, since you can pass crs to apply.
new_crs = get(kw, :crs, GI.crs(iterable))
DataAPI.metadata!(result, "GEOINTERFACE:crs", new_crs; style = :note)
end
return result
end
"""
used_reconstruct_table_kwargs(input)
Return a tuple of the kwargs that should be passed to `reconstruct_table` for the given input.
This is "semi-public" API, and required for any input type that defines `reconstruct_table`.
"""
function used_reconstruct_table_kwargs(input)
()
end
"""
reconstruct_table(input, geometry_column_names, geometry_columns, other_column_names, args...; kwargs...)
Reconstruct a table from the given input, geometry column names,
geometry columns, and other column names.
Any function that defines `reconstruct_table` must also define `used_reconstruct_table_kwargs`.
The input must be a table.
The function should return a best-effort attempt at a table of the same type as the input,
with the new geometry column(s) and other columns.
The fallback implementation invokes `Tables.materializer`. But if you want to be efficient
and pass e.g. arbitrary kwargs to the materializer, or materialize in a different way, you
can do so by overloading this function for your desired input type.
This is "semi-public" API and while it may add optional arguments, it will not add new required
positional arguments. All implementations must allow arbitrary kwargs to pass through and harvest
what they need.
"""
function reconstruct_table(input, geometry_column_names, geometry_columns, other_column_names, args...; kwargs...)
@assert Tables.istable(input)
_get_col_pair(colname) = colname => Tables.getcolumn(input, colname)
return Tables.materializer(input)(
merge(
NamedTuple{geometry_column_names, Base.Tuple{typeof.(geometry_columns)...}}(geometry_columns),
NamedTuple(Iterators.map(_get_col_pair, other_column_names))
)
)
endRewrap all FeatureCollectionTrait feature collections as GI.FeatureCollection Maybe use threads to call _apply on component features
@inline function _apply(f::F, target, ::GI.FeatureCollectionTrait, fc;
crs=GI.crs(fc), calc_extent=False(), threaded
) where FRun _apply on all features in the feature collection, possibly threaded
applicator = ApplyToFeatures(f, target, fc; crs, calc_extent)
features = _maptasks(applicator, 1:GI.nfeature(fc), threaded)
if calc_extent isa TrueCalculate the extent of the features
extent = mapreduce(GI.extent, Extents.union, features)Return a FeatureCollection with features, crs and calculated extent
return GI.FeatureCollection(features; crs, extent)
elseReturn a FeatureCollection with features and crs
return GI.FeatureCollection(features; crs)
end
endRewrap all FeatureTrait features as GI.Feature, keeping the properties
@inline function _apply(f::F, target, ::GI.FeatureTrait, feature;
crs=GI.crs(feature), calc_extent=False(), threaded
) where FRun _apply on the contained geometry
geometry = _apply(f, target, GI.geometry(feature); crs, calc_extent, threaded)Get the feature properties
properties = GI.properties(feature)
if calc_extent isa TrueCalculate the extent of the geometry
extent = GI.extent(geometry)Return a new Feature with the new geometry and calculated extent, but the original properties and crs
return GI.Feature(geometry; properties, crs, extent)
elseReturn a new Feature with the new geometry, but the original properties and crs
return GI.Feature(geometry; properties, crs)
end
endReconstruct nested geometries, maybe using threads to call _apply on component geoms
@inline function _apply(f::F, target, trait, geom;
crs=GI.crs(geom), calc_extent=False(), threaded
)::(GI.geointerface_geomtype(trait)) where FMap _apply over all sub geometries of geom to create a new vector of geometries TODO handle zero length
applicator = ApplyToGeom(f, target, geom; crs, calc_extent)
geoms = _maptasks(applicator, 1:GI.ngeom(geom), threaded)
return _apply_inner(geom, geoms, crs, calc_extent)
end
@inline function _apply(f::F, target::TraitTarget{<:PointTrait}, trait::GI.PolygonTrait, geom;
crs=GI.crs(geom), calc_extent=False(), threaded
)::(GI.geointerface_geomtype(trait)) where FWe need to force rebuilding a LinearRing not a LineString
applicator = ApplyPointsToPolygon(f, target, geom; crs, calc_extent)
geoms = _maptasks(applicator, 1:GI.ngeom(geom), threaded)
return _apply_inner(geom, geoms, crs, calc_extent)
end
function _apply_inner(geom, geoms, crs, calc_extent::True)Calculate the extent of the sub geometries
extent = mapreduce(GI.extent, Extents.union, geoms)Return a new geometry of the same trait as geom, holding the new geoms with crs and calculated extent
return rebuild(geom, geoms; crs, extent)
end
function _apply_inner(geom, geoms, crs, calc_extent::False)Return a new geometry of the same trait as geom, holding the new geoms with crs
return rebuild(geom, geoms; crs)
endFail loudly if we hit PointTrait without running f (after PointTrait there is no further to dig with _apply) @inline _apply(f, ::TraitTarget{Target}, trait::GI.PointTrait, geom; crs=nothing, kw...) where Target = throw(ArgumentError("target Target not found, but reached a PointTrait leaf")) Finally, these short methods are the main purpose of apply. The Trait is a subtype of the Target (or identical to it) So the Target is found. We apply f to geom and return it to previous _apply calls to be wrapped with the outer geometries/feature/featurecollection/array.
_apply(f::F, ::TraitTarget{Target}, ::Trait, geom; crs=GI.crs(geom), kw...) where {F,Target,Trait<:Target} = f(geom)
function _apply(a::WithTrait{F}, ::TraitTarget{Target}, trait::Trait, geom; crs=GI.crs(geom), kw...) where {F,Target,Trait<:Target}
a(trait, geom; Base.structdiff(values(kw), NamedTuple{(:threaded, :calc_extent)})...)
endDefine some specific cases of this match to avoid method ambiguity
for T in (
GI.PointTrait, GI.LinearRing, GI.LineString,
GI.MultiPoint, GI.FeatureTrait, GI.FeatureCollectionTrait
)
@eval begin
_apply(f::F, target::TraitTarget{<:$T}, trait::$T, x; kw...) where F = f(x)
function _apply(a::WithTrait{F}, target::TraitTarget{<:$T}, trait::$T, x; kw...) where F
a(trait, x; Base.structdiff(values(kw), NamedTuple{(:threaded, :calc_extent)})...)
end
end
end
### `_maptasks` - flexible, threaded `map`
using Base.Threads: nthreads, @threads, @spawnHere we used to use the compiler directive @assume_effects :foldable to force the compiler to lookup through the closure. This alone makes e.g. flip 2.5x faster!
But it caused inference to fail, so we've removed it. No effect on runtime so far as we can tell, at least in Julia 1.11.
@inline function _maptasks(f::F, taskrange, threaded::False)::Vector where F
map(f, taskrange)
endThreading utility, modified Mason Protters threading PSA run f over ntasks, where f receives an AbstractArray/range of linear indices
@inline function _maptasks(f::F, taskrange, threaded::True)::Vector where F
ntasks = length(taskrange)Customize this as needed. More tasks have more overhead, but better load balancing
tasks_per_thread = 2
chunk_size = max(1, ntasks ÷ (tasks_per_thread * nthreads()))partition the range into chunks
task_chunks = Iterators.partition(taskrange, chunk_size)Map over the chunks
tasks = map(task_chunks) do chunkSpawn a task to process this chunk
StableTasks.@spawn beginWhere we map f over the chunk indices
map(f, chunk)
end
endFinally we join the results into a new vector
return mapreduce(fetch, vcat, tasks)
end
@inline function _maptasks(a::Applicator{<:TaskFunctors}, taskrange, threaded::True)::Vector
ntasks = length(taskrange)
chunk_size = max(1, cld(ntasks, (length(a.f.functors))))partition the range into chunks
task_chunks = Iterators.partition(taskrange, chunk_size)Map over the chunks
tasks = map(task_chunks, view(a.f.functors, 1:length(task_chunks))) do chunk, ft
f = rebuild(a, ft)Spawn a task to process this chunk
StableTasks.@spawn beginWhere we map f over the chunk indices
map(f, chunk)
end
endFinally we join the results into a new vector
return mapreduce(fetch, vcat, tasks)
endThis page was generated using Literate.jl.