Skip to content

Commit

Permalink
Merge pull request #17 from Open-EO/danlooo/develop
Browse files Browse the repository at this point in the history
Add Reducer
  • Loading branch information
danlooo authored Jul 26, 2023
2 parents 523a425 + 4b2bacc commit 296038b
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 103 deletions.
45 changes: 39 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,47 @@ Connect to an openEO backend server and load a collection:

```julia
using OpenEOClient
connection = connect("earthengine.openeo.org", "v1.0")
connection.load_collection(
"COPERNICUS/S2", (16.06, 48.06, 16.65, 48.35),
c = connect("earthengine.openeo.org", "v1.0")
c.load_collection(
"COPERNICUS/S2", BoundingBox(west=16.06, south=48.06, east=16.65, north=48.35),
["2020-01-20", "2020-01-30"], ["B10"]
)
# openEO ProcessCall load_collection with parameters:
#openEO ProcessNode load_collection_tQ79zrFEGi8= with parameters:
# bands: ["B10"]
# id: "COPERNICUS/S2"
# spatial_extent: BoundingBox{Float64}(16.06, 48.06, 16.65, 48.35)
# temporal_extent: ["2020-01-20", "2020-01-30"]
```

load the remote sensing data set, calculate the median of all time points for each pixel, execute the processes on the backend and download the result as a JPG image using an authorized connection:


```julia
using OpenEOClient
c = connect("earthengine.openeo.org", "v1.0", "my_username", "my_password")
step1 = c.load_collection(
"COPERNICUS/S2", BoundingBox(west=16.06, south=48.06, east=16.65, north=48.35),
["2020-01-01", "2020-01-31"], ["B10"]
)
step2 = c.reduce_dimension(step1, Reducer("median"), "t", nothing)
step3 = c.save_result(step2, "JPEG", Dict())
c.compute_result(step3)
# "out.jpeg"
```

Explore the executed process graph:

```julia
g = ProcessGraph(step3)
# openEO ProcessGraph with steps:
# 1: load_collection(["B10"], COPERNICUS/S2, BoundingBox{Float64}(16.06, 48.06, 16.65, 48.35), ["2020-01-01", "2020-01-31"])
# 2: reduce_dimension(nothing, Reducer(OrderedCollections.OrderedDict{Symbol, ProcessNode}(:reduce1 => ProcessNode("reduce1", "median", Dict{Symbol, Any}(:data => Dict(:from_parameter => "data")), true))), OpenEOClient.ProcessNodeReference("load_collection_YwIbbFrt5Ws="), t)
# 3: save_result(Dict{Any, Any}(), JPEG, OpenEOClient.ProcessNodeReference("reduce_dimension_7ezKGDXsnoE="))

g[1]
# openEO ProcessNode load_collection_YwIbbFrt5Ws= with parameters:
# bands: ["B10"]
# spatial_extent: (16.06, 48.06, 16.65, 48.35)
# id: "COPERNICUS/S2"
# temporal_extent: ["2020-01-20", "2020-01-30"]
# spatial_extent: BoundingBox{Float64}(16.06, 48.06, 16.65, 48.35)
# temporal_extent: ["2020-01-01", "2020-01-31"]
```
4 changes: 4 additions & 0 deletions src/API.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Lists all predefined processes and returns detailed process descriptions, includ
"""
function list_processes(connection::AbstractConnection)
response = fetchApi(connection, "processes"; output_type=ProcessesRoot)
response isa Exception ? throw(response) : true
return response.processes
end

Expand All @@ -13,6 +14,7 @@ Lists all batch jobs submitted by a user.
"""
function list_jobs(connection::AuthorizedConnection)
response = fetchApi(connection, "jobs")
response isa Exception ? throw(response) : true
jobs = response["jobs"]
return jobs
end
Expand All @@ -22,6 +24,7 @@ Lists available collections with at least the required information.
"""
function list_collections(connection::AbstractConnection)
response = fetchApi(connection, "collections"; output_type=CollectionsRoot)
response isa Exception ? throw(response) : true
collections = response.collections
return collections
end
Expand All @@ -31,6 +34,7 @@ Lists all information about a specific collection specified by the identifier
"""
function describe_collection(connection::AbstractConnection, id::String)
response = fetchApi(connection, "collections/$(id)")
response isa Exception ? throw(response) : true
return response
end

Expand Down
7 changes: 5 additions & 2 deletions src/Connections.jl
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ function fetchApi(url; method="GET", headers=deepcopy(default_headers), output_t
return response
end
catch e
# make concise error message, not entire callstack
@error e
msg = e.response.body |> String |> JSON3.read |> x -> x.message
return ErrorException(msg)
end
end

Expand All @@ -63,6 +63,7 @@ function connect(host, version)
using OpenEOClient
const connection = OpenEOClient.UnAuthorizedConnection("$host", "$version")
const collections = OpenEOClient.list_collections(connection)
const processes = OpenEOClient.list_processes(connection)
$processes_code
end
Expand All @@ -82,6 +83,8 @@ function connect(host, version::String, username::String, password::String)
using OpenEOClient
const connection = OpenEOClient.BasicAuthConnection("$host", "$version", "$access_token")
const collections = OpenEOClient.list_collections(connection)
const processes = OpenEOClient.list_processes(connection)
compute_result(p) = OpenEOClient.compute_result(connection, p)
$processes_code
end
Expand Down
2 changes: 2 additions & 0 deletions src/OpenEOClient.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export
register_processes,
ProcessNode,
Processes,
ProcessGraph,
Reducer,
BoundingBox,
compute_result
end
54 changes: 47 additions & 7 deletions src/ProcessGraph.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using OrderedCollections

using Infiltrator

function flatten!(g::AbstractProcessNode, root_id, nodes=Vector{ProcessNode}())
processes = filter(((k, v),) -> v isa ProcessNode, g.arguments)

Expand All @@ -18,7 +16,24 @@ function flatten!(g::AbstractProcessNode, root_id, nodes=Vector{ProcessNode}())
end
end

function get_process_graph(process_call::ProcessNode)
abstract type AbstractProcessGraph end

mutable struct ProcessGraph <: AbstractProcessGraph
data::OrderedDict
end

StructTypes.StructType(::Type{ProcessGraph}) = StructTypes.CustomStruct()
StructTypes.lower(g::ProcessGraph) = g.data

function Base.show(io::IO, ::MIME"text/plain", g::ProcessGraph)
println(io, "openEO ProcessGraph with steps:")
for (id, step) in enumerate(values(g.data))
args = join(values(step.arguments), ", ")
println(io, " $(id):\t $(step.process_id)($(args))")
end
end

function ProcessGraph(process_call::ProcessNode)
g = deepcopy(process_call)
root_id = process_call.id
processes = flatten!(g, root_id)
Expand All @@ -30,21 +45,42 @@ function get_process_graph(process_call::ProcessNode)
res[p.id] = p
end

# set last node as result
l = last(res).second
l = ProcessNode(l.id, l.process_id, l.arguments, true)
res[l.id] = l

return res
return ProcessGraph(res)
end

function Base.getindex(g::ProcessGraph, i)
id = g.data.keys[i]
return Base.getindex(g.data, id)
end

struct Reducer <: AbstractProcessGraph
process_graph::OrderedDict
end

"""
Create a ProcessGraph to reduce dimesnions
"""
function Reducer(process::String="mean")
process_graph = Dict(
:reduce1 => ProcessNode(
"reduce1", process, Dict(:data => Dict(:from_parameter => "data")), true
)
)
return Reducer(process_graph)
end

"""
Process and download data synchronously
"""
function compute_result(connection::AbstractConnection, process_call::ProcessNode, filepath::String="", kw...)
function compute_result(connection::AuthorizedConnection, process_graph::ProcessGraph, filepath::String="", kw...)
query = Dict(
:process => Dict(
:process_graph => get_process_graph(process_call),
:process_graph => process_graph,
:parameters => []
)
)
Expand All @@ -55,6 +91,7 @@ function compute_result(connection::AbstractConnection, process_call::ProcessNod
]

response = fetchApi(connection, "result"; method="POST", headers=headers, body=JSON3.write(query))
response isa Exception ? throw(response) : true

if isempty(filepath)
file_extension = split(Dict(response.headers)["Content-Type"], "/")[2]
Expand All @@ -65,4 +102,7 @@ function compute_result(connection::AbstractConnection, process_call::ProcessNod
return filepath
end


function compute_result(connection::AuthorizedConnection, process_node::ProcessNode, kw...)
process_graph = ProcessGraph(process_node)
return compute_result(connection, process_graph, kw...)
end
17 changes: 14 additions & 3 deletions src/Processes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,33 @@ struct ProcessNodeReference <: AbstractProcessNode
from_node::String
end

struct ProcessNodeParameter <: AbstractProcessNode
from_parameter::String
end

struct ProcessNode <: AbstractProcessNode
id::String
process_id::String
arguments::Dict{Symbol,Any}
result::Bool
end
ProcessNode(id, process_id, arguments) = ProcessNode(id, process_id, arguments, false)
StructTypes.StructType(::Type{ProcessNode}) = StructTypes.Mutable()
StructTypes.excludes(::Type{ProcessNode}) = (:id,)

function ProcessNode(process_id::String, parameters)
id = (process_id, parameters) |> repr |> objectid |> base64encode |> x -> process_id * "_" * x
ProcessNode(id, process_id, parameters)
end

keywords = [
# julia keywords
"begin", "while", "if", "for", "try", "return", "break", "continue",
"function", "macro", "quote", "let", "local", "global", "const", "do",
"struct", "module", "baremodule", "using", "import", "export"
"struct", "module", "baremodule", "using", "import", "export",

# module symbols
"collections", "connection", "compute_result"
]

function pretty_print(io, d::AbstractDict, tabwidth=3)
Expand All @@ -85,7 +95,7 @@ end


function Base.show(io::IO, ::MIME"text/plain", p::ProcessNode)
println(io, "openEO ProcessCall $(p.id) with parameters:")
println(io, "openEO ProcessNode $(p.id) with parameters:")
pretty_print(io, p.arguments)
end

Expand All @@ -101,7 +111,8 @@ function get_parameters(parameters)
"array" => Vector,
# subtypes
"bounding-box" => BoundingBox,
"raster-cube" => ProcessNode
"raster-cube" => ProcessNode,
"process-graph" => AbstractProcessGraph
)

res = [] # result must be ordered
Expand Down
40 changes: 0 additions & 40 deletions test/example1-nested.json

This file was deleted.

41 changes: 0 additions & 41 deletions test/example1-ref.json

This file was deleted.

9 changes: 5 additions & 4 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ password = ENV["OPENEO_PASSWORD"]
c1 = connect(host, version)
c2 = connect(host, version, username, password)
@test allequal([c1, c2] .|> x -> size(x.collections))
@test allequal([c1, c2] .|> x -> names(x, all=true) |> length)
@test allequal([c1.processes[x].id == c2.processes[x].id for x in 1:length(c1.processes)])

step1 = c2.load_collection(
"COPERNICUS/S2", BoundingBox(west=16.06, south=48.06, east=16.65, north=48.35),
Expand All @@ -29,7 +29,8 @@ password = ENV["OPENEO_PASSWORD"]
@test Set(keys(step1.arguments)) == Set([:bands, :id, :spatial_extent, :temporal_extent])
@test step1.arguments[:bands] == ["B10"]

step2 = c2.save_result(step1, "GTIFF-ZIP", Dict())
result = compute_result(c2.connection, step2)
@test result == "out.zip"
step2 = c2.reduce_dimension(step1, Reducer("median"), "t", nothing)
step3 = c2.save_result(step2, "JPEG", Dict())
result = c2.compute_result(step3)
@test result == "out.jpeg"
end

0 comments on commit 296038b

Please sign in to comment.