Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add refresh statements for the materialized views #801

Merged
merged 17 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public DataCollectionAdapter(DataCollection<S> collection, Function<S, T> transf
* {@inheritDoc}
*/
@Override
public Iterator iterator() {
public Iterator<T> iterator() {
return collection.stream().map(this.transformer).iterator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.baremaps.openstreetmap.model.Change;
import org.apache.baremaps.openstreetmap.model.Entity;

/** Represents an operation on the entities of changes of different types. */
/** Represents an operation on the entities of a change. */
public class ChangeEntitiesHandler implements Consumer<Change> {

private final Consumer<Entity> consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,59 +31,81 @@
*/
public class PostgresCoordinateMap extends DataMap<Long, Coordinate> {

public static final String SELECT_CONTAINS_KEY = """
SELECT 1
FROM osm_nodes
WHERE id = ? LIMIT 1""";

public static final String SELECT_CONTAINS_VALUE = """
SELECT 1
FROM osm_nodes
WHERE nodes = ? LIMIT 1""";

private static final String SELECT_IN = """
SELECT id, lon, lat
FROM osm_nodes
WHERE id
WHERE id = ANY (?)""";

private static final String SELECT_BY_ID = """
SELECT lon, lat
FROM osm_nodes
WHERE id = ?""";

private static final String SELECT_SIZE = """
SELECT count()
FROM osm_nodes
""";

private static final String SELECT_KEYS = """
SELECT id
FROM osm_nodes
""";

private static final String SELECT_VALUES = """
SELECT lon, lat
FROM osm_nodes
""";

private static final String SELECT_ENTRIES = """
SELECT id, lon, lat
FROM osm_nodes
""";

private final DataSource dataSource;

/** Constructs a {@link PostgresCoordinateMap}. */
public final String selectContainsKey;

public final String selectContainsValue;

private final String selectIn;

private final String selectById;

private final String selectSize;

private final String selectKeys;

private final String selectValues;

private final String selectEntries;

/**
* Constructs a {@link PostgresCoordinateMap}.
*/
public PostgresCoordinateMap(DataSource dataSource) {
this(dataSource, "public", "osm_nodes");
}

/**
* Constructs a {@link PostgresCoordinateMap}.
*/
public PostgresCoordinateMap(DataSource dataSource, String schema, String table) {
this.dataSource = dataSource;
var fullTableName = String.format("%s.%s", schema, table);
this.selectContainsKey = String.format("""
SELECT 1
FROM %1$s
WHERE id = ? LIMIT 1
""", fullTableName);
this.selectContainsValue = String.format("""
SELECT 1
FROM %1$s
WHERE nodes = ? LIMIT 1
""", fullTableName);
this.selectIn = String.format("""
SELECT id, lon, lat
FROM %1$s
WHERE id = ANY (?)
""", fullTableName);
this.selectById = String.format("""
SELECT lon, lat
FROM %1$s
WHERE id = ?
""", fullTableName);
this.selectSize = String.format("""
SELECT count()
FROM %1$s
""", fullTableName);
this.selectKeys = String.format("""
SELECT id
FROM %1$s
""", fullTableName);
this.selectValues = String.format("""
SELECT lon, lat
FROM %1$s
""", fullTableName);
this.selectEntries = String.format("""
SELECT id, lon, lat
FROM %1$s
""", fullTableName);

}

/** {@inheritDoc} */
@Override
public Coordinate get(Object key) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(SELECT_BY_ID)) {
PreparedStatement statement = connection.prepareStatement(selectById)) {
statement.setLong(1, (Long) key);
try (ResultSet result = statement.executeQuery()) {
if (result.next()) {
Expand All @@ -103,7 +125,7 @@ public Coordinate get(Object key) {
@Override
public List<Coordinate> getAll(List<Long> keys) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(SELECT_IN)) {
PreparedStatement statement = connection.prepareStatement(selectIn)) {
statement.setArray(1, connection.createArrayOf("int8", keys.toArray()));
try (ResultSet result = statement.executeQuery()) {
Map<Long, Coordinate> nodes = new HashMap<>();
Expand All @@ -123,7 +145,7 @@ public List<Coordinate> getAll(List<Long> keys) {
@Override
protected Iterator<Long> keyIterator() {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(SELECT_KEYS)) {
PreparedStatement statement = connection.prepareStatement(selectKeys)) {
ResultSet result = statement.executeQuery();
return new PostgresIterator<>(result, this::key);
} catch (SQLException e) {
Expand All @@ -142,7 +164,7 @@ private Long key(ResultSet resultSet) {
@Override
protected Iterator<Coordinate> valueIterator() {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(SELECT_VALUES)) {
PreparedStatement statement = connection.prepareStatement(selectValues)) {
ResultSet result = statement.executeQuery();
return new PostgresIterator<>(result, this::value);
} catch (SQLException e) {
Expand All @@ -163,7 +185,7 @@ private Coordinate value(ResultSet resultSet) {
@Override
protected Iterator<Entry<Long, Coordinate>> entryIterator() {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(SELECT_ENTRIES)) {
PreparedStatement statement = connection.prepareStatement(selectEntries)) {
ResultSet result = statement.executeQuery();
return new PostgresIterator<>(result, this::entry);
} catch (SQLException e) {
Expand All @@ -185,7 +207,7 @@ private Entry<Long, Coordinate> entry(ResultSet resultSet) {
@Override
public long sizeAsLong() {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(SELECT_SIZE)) {
PreparedStatement statement = connection.prepareStatement(selectSize)) {
try (ResultSet result = statement.executeQuery()) {
if (result.next()) {
return result.getLong(1);
Expand All @@ -201,7 +223,7 @@ public long sizeAsLong() {
@Override
public boolean containsKey(Object key) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(SELECT_CONTAINS_KEY)) {
PreparedStatement statement = connection.prepareStatement(selectContainsKey)) {
statement.setLong(1, (Long) key);
try (ResultSet result = statement.executeQuery()) {
return result.next();
Expand All @@ -214,7 +236,7 @@ public boolean containsKey(Object key) {
@Override
public boolean containsValue(Object value) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(SELECT_CONTAINS_VALUE)) {
PreparedStatement statement = connection.prepareStatement(selectContainsValue)) {
statement.setArray(1, connection.createArrayOf("int8", (Long[]) value));
try (ResultSet result = statement.executeQuery()) {
return result.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,27 @@ public class PostgresHeaderRepository implements HeaderRepository {
* @param dataSource
*/
public PostgresHeaderRepository(DataSource dataSource) {
this(dataSource, "osm_headers", "replication_sequence_number", "replication_timestamp",
this(dataSource, "public", "osm_headers", "replication_sequence_number",
"replication_timestamp",
"replication_url", "source", "writing_program");
}

/**
* Constructs a {@code PostgresHeaderRepository} with custom parameters.
*
* @param dataSource
* @param tableName
* @param schema
* @param table
* @param replicationSequenceNumberColumn
* @param replicationTimestampColumn
* @param replicationUrlColumn
* @param sourceColumn
* @param writingProgramColumn
*/
public PostgresHeaderRepository(DataSource dataSource, String tableName,
public PostgresHeaderRepository(DataSource dataSource, String schema, String table,
String replicationSequenceNumberColumn, String replicationTimestampColumn,
String replicationUrlColumn, String sourceColumn, String writingProgramColumn) {
var fullTableName = String.format("%1$s.%2$s", schema, table);
this.dataSource = dataSource;
this.createTable = String.format("""
CREATE TABLE IF NOT EXISTS %1$s (
Expand All @@ -92,20 +95,21 @@ public PostgresHeaderRepository(DataSource dataSource, String tableName,
%4$s text,
%5$s text,
%6$s text
)""", tableName, replicationSequenceNumberColumn, replicationTimestampColumn,
)""", fullTableName, replicationSequenceNumberColumn, replicationTimestampColumn,
replicationUrlColumn, sourceColumn, writingProgramColumn);
this.dropTable = String.format("DROP TABLE IF EXISTS %1$s CASCADE", tableName);
this.truncateTable = String.format("TRUNCATE TABLE %1$s", tableName);
this.dropTable = String.format("DROP TABLE IF EXISTS %1$s CASCADE", fullTableName);
this.truncateTable = String.format("TRUNCATE TABLE %1$s", fullTableName);
this.selectLatest =
String.format("SELECT %2$s, %3$s, %4$s, %5$s, %6$s FROM %1$s ORDER BY %2$s DESC", tableName,
String.format("SELECT %2$s, %3$s, %4$s, %5$s, %6$s FROM %1$s ORDER BY %2$s DESC",
fullTableName,
replicationSequenceNumberColumn, replicationTimestampColumn, replicationUrlColumn,
sourceColumn, writingProgramColumn);
this.select = String.format("SELECT %2$s, %3$s, %4$s, %5$s, %6$s FROM %1$s WHERE %2$s = ?",
tableName, replicationSequenceNumberColumn, replicationTimestampColumn,
fullTableName, replicationSequenceNumberColumn, replicationTimestampColumn,
replicationUrlColumn, sourceColumn, writingProgramColumn);
this.selectIn =
String.format("SELECT %2$s, %3$s, %4$s, %5$s, %6$s FROM %1$s WHERE %2$s = ANY (?)",
tableName, replicationSequenceNumberColumn, replicationTimestampColumn,
fullTableName, replicationSequenceNumberColumn, replicationTimestampColumn,
replicationUrlColumn, sourceColumn, writingProgramColumn);
this.insert = String.format("""
INSERT INTO %1$s (%2$s, %3$s, %4$s, %5$s, %6$s)
Expand All @@ -114,12 +118,12 @@ ON CONFLICT (%2$s) DO UPDATE SET
%3$s = excluded.%3$s,
%4$s = excluded.%4$s,
%5$s = excluded.%5$s,
%6$s = excluded.%6$s""", tableName, replicationSequenceNumberColumn,
%6$s = excluded.%6$s""", fullTableName, replicationSequenceNumberColumn,
replicationTimestampColumn, replicationUrlColumn, sourceColumn, writingProgramColumn);
this.delete = String.format("DELETE FROM %1$s WHERE %2$s = ?", tableName,
this.delete = String.format("DELETE FROM %1$s WHERE %2$s = ?", fullTableName,
replicationSequenceNumberColumn);
this.copy = String.format("COPY %1$s (%2$s, %3$s, %4$s, %5$s, %6$s) FROM STDIN BINARY",
tableName, replicationSequenceNumberColumn, replicationTimestampColumn,
fullTableName, replicationSequenceNumberColumn, replicationTimestampColumn,
replicationUrlColumn, sourceColumn, writingProgramColumn);
}

Expand Down
Loading