Skip to content

Commit

Permalink
3246 Rajout des IRVE dans les GeoData (#3312)
Browse files Browse the repository at this point in the history
* bare files for creating import

* Allow to import IRVE to the DB

* Add method to find relevant dataset

* add Oban job

* Wip perform

* working irve module

* Add a test to find relevant dataset

* Use CSV module instead of NimbleCSV

* refactor

* refactor preparation fn for csv

* typos on parking relais test

* Add import test

* schedule oban job

* mix format

* Update apps/transport/lib/jobs/geo_data/irve_to_geo_data.ex

Co-authored-by: Antoine Augusti <[email protected]>

* change timeout

* change UUID

* remove comment

* Update apps/transport/lib/jobs/geo_data/irve_to_geo_data.ex

Co-authored-by: Antoine Augusti <[email protected]>

* use perform_job instead of launching job directly

* use perform job in another unrelated test

* add assert to perform job

* use keywords

* mix format

* change IRVE publisher to data.gouv.fr

* Update apps/transport/test/transport/jobs/dataset_history_job_test.exs

Co-authored-by: Antoine Augusti <[email protected]>

* Update apps/transport/lib/jobs/geo_data/base.ex

Co-authored-by: Antoine Augusti <[email protected]>

---------

Co-authored-by: Antoine Augusti <[email protected]>
  • Loading branch information
vdegove and AntoineAugusti authored Jul 13, 2023
1 parent bd438e8 commit f543830
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 46 deletions.
42 changes: 31 additions & 11 deletions apps/transport/lib/jobs/geo_data/base.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,44 @@ defmodule Transport.Jobs.BaseGeoData do
%DB.ResourceHistory{id: latest_resource_history_id, payload: %{"permanent_url" => permanent_url}},
prepare_data_for_insert_fn
) do
DB.Repo.transaction(fn ->
unless is_nil(current_geo_data_import) do
# thanks to cascading delete, it will also clean geo_data table corresponding entries
current_geo_data_import |> DB.Repo.delete!()
end

%{id: geo_data_import_id} = DB.Repo.insert!(%DB.GeoDataImport{resource_history_id: latest_resource_history_id})
http_client = Transport.Shared.Wrapper.HTTPoison.impl()
%{status_code: 200, body: body} = http_client.get!(permanent_url)
insert_data(body, geo_data_import_id, prepare_data_for_insert_fn)
end)
DB.Repo.transaction(
fn ->
unless is_nil(current_geo_data_import) do
# thanks to cascading delete, it will also clean geo_data table corresponding entries
current_geo_data_import |> DB.Repo.delete!()
end

%{id: geo_data_import_id} = DB.Repo.insert!(%DB.GeoDataImport{resource_history_id: latest_resource_history_id})
http_client = Transport.Shared.Wrapper.HTTPoison.impl()
%{status_code: 200, body: body} = http_client.get!(permanent_url)
insert_data(body, geo_data_import_id, prepare_data_for_insert_fn)
end,
timeout: 60_000
)
end

# keep 6 digits for WGS 84, see https://en.wikipedia.org/wiki/Decimal_degrees#Precision
def parse_coordinate(s) do
s |> string_to_float() |> Float.round(6)
end

def prepare_csv_data_for_import(body, prepare_data_fn, opts \\ []) do
opts = Keyword.validate!(opts, separator_char: ?,, escape_char: ?", filter_fn: fn _ -> true end)
{:ok, stream} = StringIO.open(body)

stream
|> IO.binstream(:line)
|> CSV.decode(
separator: Keyword.fetch!(opts, :separator_char),
escape_character: Keyword.fetch!(opts, :escape_char),
headers: true,
validate_row_length: true
)
|> Stream.filter(Keyword.fetch!(opts, :filter_fn))
|> Stream.map(fn {:ok, m} -> m end)
|> Stream.map(prepare_data_fn)
end

# remove spaces (U+0020) and non-break spaces (U+00A0) from the string
defp string_to_float(s), do: s |> String.trim() |> String.replace([" ", " "], "") |> String.to_float()
end
16 changes: 4 additions & 12 deletions apps/transport/lib/jobs/geo_data/bnlc_to_geo_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ defmodule Transport.Jobs.BNLCToGeoData do
"""
use Oban.Worker, max_attempts: 3
import Ecto.Query
alias NimbleCSV.RFC4180, as: CSV
require Logger

@impl Oban.Worker
Expand All @@ -26,16 +25,7 @@ defmodule Transport.Jobs.BNLCToGeoData do
end

def prepare_data_for_insert(body, geo_data_import_id) do
body
|> CSV.parse_string(skip_headers: false)
|> Stream.transform([], fn r, acc ->
if acc == [] do
{%{}, r}
else
{[acc |> Enum.zip(r) |> Enum.into(%{})], acc}
end
end)
|> Stream.map(fn m ->
prepare_data_fn = fn m ->
%{
geo_data_import_id: geo_data_import_id,
geom: %Geo.Point{
Expand All @@ -46,6 +36,8 @@ defmodule Transport.Jobs.BNLCToGeoData do
},
payload: m |> Map.drop(["Xlong", "Ylat"])
}
end)
end

Transport.Jobs.BaseGeoData.prepare_csv_data_for_import(body, prepare_data_fn)
end
end
47 changes: 47 additions & 0 deletions apps/transport/lib/jobs/geo_data/irve_to_geo_data.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
defmodule Transport.Jobs.IRVEToGeoData do
@moduledoc """
Job in charge of taking the charge stations stored in the Base nationale des Infrastructures de Recharge pour Véhicules Électriques and storing the result in the `geo_data` table.
"""
use Oban.Worker, max_attempts: 3
import Ecto.Query
require Logger

@datagouv_organization_id "646b7187b50b2a93b1ae3d45"
@resource_datagouv_id "8d9398ae-3037-48b2-be19-412c24561fbb"

@impl Oban.Worker
def perform(%Oban.Job{}) do
[resource] =
relevant_dataset()
|> DB.Dataset.official_resources()
|> Enum.filter(&match?(%DB.Resource{datagouv_id: @resource_datagouv_id, format: "csv"}, &1))

Transport.Jobs.BaseGeoData.import_replace_data(resource, &prepare_data_for_insert/2)

:ok
end

def prepare_data_for_insert(body, geo_data_import_id) do
prepare_data_fn = fn m ->
%{
geo_data_import_id: geo_data_import_id,
geom: %Geo.Point{
coordinates:
{m["consolidated_longitude"] |> Transport.Jobs.BaseGeoData.parse_coordinate(),
m["consolidated_latitude"] |> Transport.Jobs.BaseGeoData.parse_coordinate()},
srid: 4326
},
payload: m |> Map.drop(["consolidated_longitude", "consolidated_latitude", "coordonnesXY"])
}
end

Transport.Jobs.BaseGeoData.prepare_csv_data_for_import(body, prepare_data_fn)
end

def relevant_dataset do
DB.Dataset.base_query()
|> preload(:resources)
|> where([d], d.type == "charging-stations" and d.organization_id == @datagouv_organization_id)
|> DB.Repo.one!()
end
end
16 changes: 9 additions & 7 deletions apps/transport/lib/jobs/geo_data/parkings_relais_to_geo_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,9 @@ defmodule Transport.Jobs.ParkingsRelaisToGeoData do
defp pr_count(str), do: String.to_integer(str)

def prepare_data_for_insert(body, geo_data_import_id) do
{:ok, stream} = StringIO.open(body)
filter_fn = fn {:ok, line} -> pr_count(line["nb_pr"]) > 0 end

stream
|> IO.binstream(:line)
|> CSV.decode(separator: ?;, headers: true, validate_row_length: true)
|> Stream.filter(fn {:ok, line} -> pr_count(line["nb_pr"]) > 0 end)
|> Stream.map(fn {:ok, m} ->
prepare_data_fn = fn m ->
%{
geo_data_import_id: geo_data_import_id,
geom: %Geo.Point{
Expand All @@ -48,6 +44,12 @@ defmodule Transport.Jobs.ParkingsRelaisToGeoData do
},
payload: m |> Map.drop(["Xlong", "Ylat"])
}
end)
end

Transport.Jobs.BaseGeoData.prepare_csv_data_for_import(body, prepare_data_fn,
filter_fn: filter_fn,
separator_char: ?;,
escape_char: ?"
)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ defmodule Transport.Test.Transport.Jobs.DatasetHistoryJobTest do
%{id: id_2} = insert(:dataset)
%{id: id_3} = insert(:dataset, is_active: false)

Transport.Jobs.DatasetHistoryDispatcherJob.perform(%{})
assert :ok = perform_job(Transport.Jobs.DatasetHistoryDispatcherJob, %{})

assert_enqueued([worker: Transport.Jobs.DatasetHistoryJob, args: %{"dataset_id" => id_1}], 50)
assert_enqueued([worker: Transport.Jobs.DatasetHistoryJob, args: %{"dataset_id" => id_2}], 50)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule Transport.Jobs.BNLCToGeoDataTest do
use ExUnit.Case, async: true
use Oban.Testing, repo: DB.Repo
alias Transport.Jobs.{BaseGeoData, BNLCToGeoData}
import DB.Factory
import Mox
Expand Down Expand Up @@ -70,14 +71,14 @@ defmodule Transport.Jobs.BNLCToGeoDataTest do
|> expect(:get!, 2, fn "url" -> %HTTPoison.Response{status_code: 200, body: @bnlc_content} end)

# launch job
Transport.Jobs.BNLCToGeoData.perform(%{})
assert :ok = perform_job(BNLCToGeoData, %{})

# data is imported
[%{id: geo_data_import_1, resource_history_id: ^id_0}] = DB.GeoDataImport |> DB.Repo.all()
assert DB.GeoData |> DB.Repo.all() |> Enum.count() == 2

# relaunch job
Transport.Jobs.BNLCToGeoData.perform(%{})
assert :ok = perform_job(BNLCToGeoData, %{})

# no change
[%{id: ^geo_data_import_1}] = DB.GeoDataImport |> DB.Repo.all()
Expand All @@ -91,7 +92,7 @@ defmodule Transport.Jobs.BNLCToGeoDataTest do
})

# relaunch job
Transport.Jobs.BNLCToGeoData.perform(%{})
assert :ok = perform_job(BNLCToGeoData, %{})

# geo_data and geo_data_import are updated accordingly
[%{id: geo_data_import_2, resource_history_id: ^id_1}] = DB.GeoDataImport |> DB.Repo.all()
Expand Down
121 changes: 121 additions & 0 deletions apps/transport/test/transport/jobs/geo_data/irve_to_geodata_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
defmodule Transport.Jobs.IRVEToGeoDataTest do
use ExUnit.Case, async: true
use Oban.Testing, repo: DB.Repo
alias Transport.Jobs.{BaseGeoData, IRVEToGeoData}
import DB.Factory
import Mox

setup do
Ecto.Adapters.SQL.Sandbox.checkout(DB.Repo)
end

setup :verify_on_exit!

@irve_content ~S"""
nom_amenageur,siren_amenageur,contact_amenageur,nom_operateur,contact_operateur,telephone_operateur,nom_enseigne,id_station_itinerance,id_station_local,nom_station,implantation_station,adresse_station,code_insee_commune,coordonneesXY,nbre_pdc,id_pdc_itinerance,id_pdc_local,puissance_nominale,prise_type_ef,prise_type_2,prise_type_combo_ccs,prise_type_chademo,prise_type_autre,gratuit,paiement_acte,paiement_cb,paiement_autre,tarification,condition_acces,reservation,horaires,accessibilite_pmr,restriction_gabarit,station_deux_roues,raccordement,num_pdl,date_mise_en_service,observations,date_maj,cable_t2_attache,last_modified,datagouv_dataset_id,datagouv_resource_id,datagouv_organization_or_owner,consolidated_longitude,consolidated_latitude,consolidated_code_postal,consolidated_commune,consolidated_is_lon_lat_correct,consolidated_is_code_insee_verified
,,[email protected],,[email protected],,STATION SUPER U BELLEVIGNY 4,FRCPIE6610355,FRCPIE6610355,STATION SUPER U BELLEVIGNY 4,Parking privé à usage public,"23 Av. Atlant’Vie, 85170 Bellevigny",,"[-1.429227, 46.776249]",6,FRCPIE66103552,FRCPIE66103552,21,false,true,false,false,false,false,true,false,,,Accès réservé,false,24/7,Accessibilité inconnue,inconnu,false,,,2022-10-12,EF connector is available at the site separately,2023-07-10,false,2023-07-11T03:08:58.394000+00:00,64060c2ac773dcf3fabbe5d2,b11113db-875d-41c7-8673-0cf8ad43e917,eco-movement,-1.429227,46.776249,,,False,False
,,[email protected],,[email protected],,Giberville Sud,FRIONE4171,FRIONE4171,Giberville Sud,Station dédiée à la recharge rapide,"Aire de Giberville Sud, A13, km 220, 14730 Giberville",,"[-0.276864, 49.166746]",5,FRIONE41715,FRIONE41715,50,false,true,true,true,false,false,true,true,,,Accès libre,false,24/7,Accessibilité inconnue,inconnu,false,,,2021-11-20,EF connector is available at the site separately,2023-07-11,false,2023-07-11T03:08:58.394000+00:00,64060c2ac773dcf3fabbe5d2,b11113db-875d-41c7-8673-0cf8ad43e917,eco-movement,-0.276864,49.166746,,,False,False
"""

@dataset_info %{
type: "charging-stations",
custom_title: "Infrastructures de Recharge pour Véhicules Électriques - IRVE",
organization: "data.gouv.fr",
organization_id: "646b7187b50b2a93b1ae3d45"
}

test "import an IRVE to the DB" do
%{id: id} = insert(:geo_data_import)
BaseGeoData.insert_data(@irve_content, id, &IRVEToGeoData.prepare_data_for_insert/2)
[row1 | _t] = DB.GeoData |> DB.Repo.all()

assert %{
geo_data_import_id: ^id,
geom: %Geo.Point{
coordinates: {-1.429227, 46.776249},
srid: 4326
},
payload: %{
"nom_enseigne" => "STATION SUPER U BELLEVIGNY 4",
"nom_station" => "STATION SUPER U BELLEVIGNY 4",
"id_station_itinerance" => "FRCPIE6610355",
"nbre_pdc" => "6"
}
} = row1
end

test "Finds the relevant dataset" do
%DB.Dataset{id: dataset_id} = insert(:dataset, @dataset_info)
assert %DB.Dataset{id: ^dataset_id} = IRVEToGeoData.relevant_dataset()
end

test "IRVE data update logic" do
now = DateTime.utc_now()
now_100 = now |> DateTime.add(-100)
now_50 = now |> DateTime.add(-50)
now_25 = now |> DateTime.add(-25)

assert [] = DB.GeoData |> DB.Repo.all()
assert [] = DB.GeoDataImport |> DB.Repo.all()

%DB.Dataset{id: dataset_id} = insert(:dataset, @dataset_info)

# We don’t want to match community resources (but want to have them in base)
insert(:resource, %{
dataset_id: dataset_id,
is_community_resource: true,
datagouv_id: Ecto.UUID.generate()
})

%{id: resource_id} =
insert(:resource, %{
dataset_id: dataset_id,
datagouv_id: "8d9398ae-3037-48b2-be19-412c24561fbb",
format: "csv"
})

%{id: id_0} =
insert(:resource_history, %{
resource_id: resource_id,
inserted_at: now_100,
payload: %{"dataset_id" => dataset_id, "permanent_url" => "url"}
})

# another random resource history, just in case
insert(:resource_history, %{inserted_at: now_25, payload: %{"dataset_id" => dataset_id + 5}})

# download IRVE Mock
Transport.HTTPoison.Mock
|> expect(:get!, 2, fn "url" -> %HTTPoison.Response{status_code: 200, body: @irve_content} end)

# launch job
assert :ok = perform_job(IRVEToGeoData, %{})

# data is imported
[%{id: geo_data_import_1, resource_history_id: ^id_0}] = DB.GeoDataImport |> DB.Repo.all()
assert DB.GeoData |> DB.Repo.all() |> Enum.count() == 2

# relaunch job
assert :ok = perform_job(IRVEToGeoData, %{})

# no change
[%{id: ^geo_data_import_1}] = DB.GeoDataImport |> DB.Repo.all()

# new (more recent) resource history
%{id: id_1} =
insert(:resource_history, %{
resource_id: resource_id,
inserted_at: now_50,
payload: %{"dataset_id" => dataset_id, "permanent_url" => "url"}
})

# relaunch job
assert :ok = perform_job(IRVEToGeoData, %{})

# geo_data and geo_data_import are updated accordingly
[%{id: geo_data_import_2, resource_history_id: ^id_1}] = DB.GeoDataImport |> DB.Repo.all()
assert geo_data_import_2 !== geo_data_import_1

[%{geo_data_import_id: ^geo_data_import_2}, %{geo_data_import_id: ^geo_data_import_2}] = DB.GeoData |> DB.Repo.all()
end
end
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule Transport.Jobs.LowEmissionZonesToGeoDataTest do
use ExUnit.Case, async: true
use Oban.Testing, repo: DB.Repo
alias Transport.Jobs.{BaseGeoData, LowEmissionZonesToGeoData}
import DB.Factory
import Mox
Expand Down Expand Up @@ -94,14 +95,14 @@ defmodule Transport.Jobs.LowEmissionZonesToGeoDataTest do
|> expect(:get!, 2, fn "url" -> %HTTPoison.Response{status_code: 200, body: @lez_aires_content} end)

# launch job
Transport.Jobs.LowEmissionZonesToGeoData.perform(%{})
assert :ok = perform_job(LowEmissionZonesToGeoData, %{})

# data is imported
[%{id: geo_data_import_1, resource_history_id: ^id_0}] = DB.GeoDataImport |> DB.Repo.all()
assert DB.GeoData |> DB.Repo.all() |> Enum.count() == 1

# relaunch job
Transport.Jobs.LowEmissionZonesToGeoData.perform(%{})
assert :ok = perform_job(LowEmissionZonesToGeoData, %{})

# no change
[%{id: ^geo_data_import_1}] = DB.GeoDataImport |> DB.Repo.all()
Expand All @@ -115,7 +116,7 @@ defmodule Transport.Jobs.LowEmissionZonesToGeoDataTest do
})

# relaunch job
Transport.Jobs.LowEmissionZonesToGeoData.perform(%{})
assert :ok = perform_job(LowEmissionZonesToGeoData, %{})

# geo_data and geo_data_import are updated accordingly
[%{id: geo_data_import_2, resource_history_id: ^id_1}] = DB.GeoDataImport |> DB.Repo.all()
Expand Down
Loading

0 comments on commit f543830

Please sign in to comment.