From 349ef2ad6755d1e839db2a9dbcf453dbbef65256 Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Wed, 30 Oct 2024 16:30:22 +0100 Subject: [PATCH 01/23] First attempt at getting paged EMM data, not complete. --- emm/build.gradle | 73 +++++++++++++++ emm/src/main/java/whelk/EmmServer.java | 24 +++++ emm/src/main/java/whelk/EmmServlet.java | 113 ++++++++++++++++++++++++ emm/src/main/resources/log4j2.xml | 28 ++++++ settings.gradle | 1 + 5 files changed, 239 insertions(+) create mode 100644 emm/build.gradle create mode 100644 emm/src/main/java/whelk/EmmServer.java create mode 100644 emm/src/main/java/whelk/EmmServlet.java create mode 100644 emm/src/main/resources/log4j2.xml diff --git a/emm/build.gradle b/emm/build.gradle new file mode 100644 index 0000000000..f670e896c2 --- /dev/null +++ b/emm/build.gradle @@ -0,0 +1,73 @@ +apply plugin: 'java-library' +apply plugin: 'com.github.johnrengelman.shadow' + +def mainClassName = "whelk.EmmServer" + +repositories { + mavenCentral() +} + +// Don't blame me for this TRAVESTY. It is a necessity because of the versioning of xml-apis (2.0.2 which gradle otherwise chooses is OLDER (and broken) despite the version.) +configurations.all { + resolutionStrategy { + force "xml-apis:xml-apis:1.4.01" + } +} + +dependencies { + // XL dependencies + implementation(project(':whelk-core')) + implementation(project(':server-common')) + + // Servlet + api "jakarta.servlet:jakarta.servlet-api:${servletApiVersion}" + implementation "org.eclipse.jetty:jetty-server:${jettyVersion}" + implementation "org.eclipse.jetty.ee8:jetty-ee8-servlet:${jettyVersion}" + + // Logging + implementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: "${log4jVersion}" + implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: "${log4jVersion}" + + // metrics + implementation "io.prometheus:simpleclient:${prometheusVersion}" + implementation "io.prometheus:simpleclient_servlet:${prometheusVersion}" +} + +shadowJar { + archiveClassifier = null // removes `-all` in the filename of the created .jar +} + +jar { + dependsOn ':server-common:jar' + dependsOn(shadowJar) + + manifest { + attributes "Main-Class": mainClassName, + // log4j uses multi-release to ship different stack walking implementations for different java + // versions. Since we repackage everything as a fat jar, that jar must also be multi-release. + "Multi-Release": true + } + + archiveClassifier = "nonfat" + enabled = false +} + +task(appRun, dependsOn: "classes", type: JavaExec) { + classpath = sourceSets.test.runtimeClasspath + mainClass = mainClassName + minHeapSize = "1g" + maxHeapSize = "4g" + systemProperties( + 'xl.secret.properties': System.getProperty("xl.secret.properties"), + 'xl.logRoot': System.getProperty("xl.logRoot", "./logs"), + 'xl.http.port': System.getProperty("xl.http.port", "8182") + ) + args(System.getProperty("args", "").split() as String[]) + + debugOptions { + enabled = true + port = 6007 + server = true + suspend = false + } +} diff --git a/emm/src/main/java/whelk/EmmServer.java b/emm/src/main/java/whelk/EmmServer.java new file mode 100644 index 0000000000..28f30cf53b --- /dev/null +++ b/emm/src/main/java/whelk/EmmServer.java @@ -0,0 +1,24 @@ +package whelk; + +import io.prometheus.client.exporter.MetricsServlet; +import org.eclipse.jetty.ee8.servlet.ServletContextHandler; +import org.eclipse.jetty.server.Server; + +public class EmmServer extends XlServer { + + @Override + protected void configureHandlers(Server server) { + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + + server.setHandler(context); + + context.addServlet(MetricsServlet.class, "/metrics"); + context.addServlet(EmmServlet.class, "/"); + serveStaticContent(context); + } + + public static void main(String[] args) throws Exception { + new EmmServer().run(); + } +} diff --git a/emm/src/main/java/whelk/EmmServlet.java b/emm/src/main/java/whelk/EmmServlet.java new file mode 100644 index 0000000000..6b6eadedf9 --- /dev/null +++ b/emm/src/main/java/whelk/EmmServlet.java @@ -0,0 +1,113 @@ +package whelk; + +import whelk.Whelk; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.sql.*; +import java.util.HashSet; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + +public class EmmServlet extends HttpServlet { + + private final Logger logger = LogManager.getLogger(this.getClass()); + private final HashSet availableCategories; + private final Whelk whelk; + public EmmServlet() { + availableCategories = new HashSet<>(); + availableCategories.add("all"); + whelk = Whelk.createLoadedCoreWhelk(); + } + + public void init() { + } + + public void destroy() { + } + + public void doGet(HttpServletRequest req, HttpServletResponse res) throws IOException { + try { + logger.info("Received request with from {}:{}.", req.getRemoteAddr(), req.getRemotePort()); + + String category = req.getParameter("category"); + String until = req.getParameter("until"); + + System.err.println("category: " + category); + System.err.println("until: " + until); + + if (!availableCategories.contains(category)) { + res.sendError(400); // temp + return; + } + + if (until == null) { // Send EntryPoint reply + res.sendError(400); // temp + return; + } + + // Send ChangeSet reply + sendChangeSet(whelk, res, category, until); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void sendChangeSet(Whelk whelk, HttpServletResponse res, String category, String until) throws IOException { + final int targetHitsPerPage = 100; + + // Internally 'until' is "milliseconds since epoch". + long untilNumerical = Long.parseLong(until); + Timestamp untilTimeStamp = new Timestamp(untilNumerical); + + Timestamp earliestSeenTimeStamp = new Timestamp(System.currentTimeMillis()); // This is how far back in time this page stretches + + try (Connection connection = whelk.getStorage().getOuterConnection()) { + + // Get a page of items + { + String sql = + "SELECT data#>>'{@graph,0,@id}', GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) " + + " FROM lddb WHERE GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) <= ? " + + " ORDER BY GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) DESC LIMIT ? ".stripIndent(); + PreparedStatement preparedStatement = connection.prepareStatement(sql); + preparedStatement.setTimestamp(1, untilTimeStamp); + preparedStatement.setInt(2, targetHitsPerPage); + preparedStatement.setFetchSize(targetHitsPerPage + 1); + try (ResultSet resultSet = preparedStatement.executeQuery()) { + while (resultSet.next()) { + String uri = resultSet.getString(1); + Timestamp modificationTime = resultSet.getTimestamp(2); + + System.err.println("at " + modificationTime + " -> " + uri); + + if (modificationTime.before(earliestSeenTimeStamp)) + earliestSeenTimeStamp = modificationTime; + } + } + } + + // Get any extra records that share an exact modification time with the earliest seen time on this page (above) + { + String sql = "SELECT data#>>'{@graph,0,@id}' FROM lddb WHERE GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) = ?"; + PreparedStatement preparedStatement = connection.prepareStatement(sql); + preparedStatement.setTimestamp(1, untilTimeStamp); + try (ResultSet resultSet = preparedStatement.executeQuery()) { + while (resultSet.next()) { + String uri = resultSet.getString(1); + + System.err.println("at same time! " + uri); + } + } + } + + System.err.println("Starting point for next page: " + earliestSeenTimeStamp); + + } catch (SQLException se) { + res.sendError(500); + } + } +} diff --git a/emm/src/main/resources/log4j2.xml b/emm/src/main/resources/log4j2.xml new file mode 100644 index 0000000000..57aa8d42aa --- /dev/null +++ b/emm/src/main/resources/log4j2.xml @@ -0,0 +1,28 @@ + + + + ./logs + + + + + + + + + + + + + + + + + + + + + + + diff --git a/settings.gradle b/settings.gradle index f155603454..5b6e56f8d5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -14,4 +14,5 @@ include( 'trld-java', 'whelk-core', 'whelktool', + 'emm', ) From 2807edda00b303b850a5416686c00945492ed61f Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Thu, 31 Oct 2024 11:35:31 +0100 Subject: [PATCH 02/23] Progress. --- emm/src/main/java/whelk/EmmActivity.java | 32 ++++++++ emm/src/main/java/whelk/EmmServlet.java | 75 +++++++++++++------ ...000023-index-generationDate-versions.plsql | 31 ++++++++ 3 files changed, 117 insertions(+), 21 deletions(-) create mode 100644 emm/src/main/java/whelk/EmmActivity.java create mode 100644 librisxl-tools/postgresql/migrations/00000023-index-generationDate-versions.plsql diff --git a/emm/src/main/java/whelk/EmmActivity.java b/emm/src/main/java/whelk/EmmActivity.java new file mode 100644 index 0000000000..eaac1005cc --- /dev/null +++ b/emm/src/main/java/whelk/EmmActivity.java @@ -0,0 +1,32 @@ +package whelk; + +import java.sql.Timestamp; + +public class EmmActivity { + public enum Type { + CREATE, + UPDATE, + DELETE, + } + + public final String uri; + public final String entityType; + public final Type activityType; + public final Timestamp modificationTime; + + public EmmActivity(String uri, String type, Timestamp creationTime, Timestamp modificationTime, boolean deleted) { + this.uri = uri; + this.entityType = type; + this.modificationTime = modificationTime; + if (deleted) + this.activityType = Type.DELETE; + else if (creationTime.equals(modificationTime)) + this.activityType = Type.CREATE; + else + this.activityType = Type.UPDATE; + } + + public String toString() { + return activityType.toString() + " of " + uri + " (" + entityType + ") at " + modificationTime; + } +} diff --git a/emm/src/main/java/whelk/EmmServlet.java b/emm/src/main/java/whelk/EmmServlet.java index 6b6eadedf9..116581a06d 100644 --- a/emm/src/main/java/whelk/EmmServlet.java +++ b/emm/src/main/java/whelk/EmmServlet.java @@ -1,18 +1,19 @@ package whelk; -import whelk.Whelk; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.sql.*; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; public class EmmServlet extends HttpServlet { - + private static final int TARGET_HITS_PER_PAGE = 100; private final Logger logger = LogManager.getLogger(this.getClass()); private final HashSet availableCategories; private final Whelk whelk; @@ -57,57 +58,89 @@ public void doGet(HttpServletRequest req, HttpServletResponse res) throws IOExce } private void sendChangeSet(Whelk whelk, HttpServletResponse res, String category, String until) throws IOException { - final int targetHitsPerPage = 100; + + List activitiesOnPage = new ArrayList<>(TARGET_HITS_PER_PAGE+5); + Timestamp nextTimeStamp = getPage(whelk, category, until, activitiesOnPage); + if (nextTimeStamp == null) { + res.sendError(500); + return; + } + + for (EmmActivity a : activitiesOnPage) { + System.err.println(a); + } + } + + /** + * Get a page's worth of items. The results will be added to the 'result'-list. The return value will be "the next timestamp" + * to start getting the next page at, or null on failure. + */ + private Timestamp getPage(Whelk whelk, String category, String until, List result) { // Internally 'until' is "milliseconds since epoch". long untilNumerical = Long.parseLong(until); Timestamp untilTimeStamp = new Timestamp(untilNumerical); - Timestamp earliestSeenTimeStamp = new Timestamp(System.currentTimeMillis()); // This is how far back in time this page stretches + Timestamp earliestSeenTimeStamp = new Timestamp(System.currentTimeMillis()); // This is (will be set to) how far back in time this page stretches try (Connection connection = whelk.getStorage().getOuterConnection()) { // Get a page of items { - String sql = - "SELECT data#>>'{@graph,0,@id}', GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) " + - " FROM lddb WHERE GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) <= ? " + - " ORDER BY GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) DESC LIMIT ? ".stripIndent(); + String sql = "SELECT" + + " data#>>'{@graph,1,@id}'," + + " GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz)," + + " deleted," + + " created," + + " data#>>'{@graph,1,@type}'" + + " FROM" + + " lddb__versions" + + " WHERE GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) <= ? " + + " ORDER BY GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) DESC LIMIT ? ".stripIndent(); PreparedStatement preparedStatement = connection.prepareStatement(sql); preparedStatement.setTimestamp(1, untilTimeStamp); - preparedStatement.setInt(2, targetHitsPerPage); - preparedStatement.setFetchSize(targetHitsPerPage + 1); + preparedStatement.setInt(2, TARGET_HITS_PER_PAGE); + preparedStatement.setFetchSize(TARGET_HITS_PER_PAGE + 1); try (ResultSet resultSet = preparedStatement.executeQuery()) { while (resultSet.next()) { String uri = resultSet.getString(1); Timestamp modificationTime = resultSet.getTimestamp(2); - - System.err.println("at " + modificationTime + " -> " + uri); - + boolean deleted = resultSet.getBoolean(3); + Timestamp creationTime = resultSet.getTimestamp(4); + String type = resultSet.getString(5); + result.add(new EmmActivity(uri, type, creationTime, modificationTime, deleted)); if (modificationTime.before(earliestSeenTimeStamp)) earliestSeenTimeStamp = modificationTime; } } } - // Get any extra records that share an exact modification time with the earliest seen time on this page (above) + // Get any extra records that share an exact modification time with the earliest seen time on this page { - String sql = "SELECT data#>>'{@graph,0,@id}' FROM lddb WHERE GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) = ?"; + String sql = "SELECT" + + " data#>>'{@graph,1,@id}'," + + " deleted," + + " created," + + " data#>>'{@graph,1,@type}'" + + " FROM lddb__versions WHERE GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) = ?".stripIndent(); PreparedStatement preparedStatement = connection.prepareStatement(sql); preparedStatement.setTimestamp(1, untilTimeStamp); try (ResultSet resultSet = preparedStatement.executeQuery()) { while (resultSet.next()) { String uri = resultSet.getString(1); - - System.err.println("at same time! " + uri); + Timestamp modificationTime = resultSet.getTimestamp(2); + boolean deleted = resultSet.getBoolean(3); + Timestamp creationTime = resultSet.getTimestamp(4); + String type = resultSet.getString(5); + result.add(new EmmActivity(uri, type, creationTime, modificationTime, deleted)); } } } - - System.err.println("Starting point for next page: " + earliestSeenTimeStamp); - } catch (SQLException se) { - res.sendError(500); + logger.error(se); + return null; } + + return earliestSeenTimeStamp; } } diff --git a/librisxl-tools/postgresql/migrations/00000023-index-generationDate-versions.plsql b/librisxl-tools/postgresql/migrations/00000023-index-generationDate-versions.plsql new file mode 100644 index 0000000000..a15cd1aeb2 --- /dev/null +++ b/librisxl-tools/postgresql/migrations/00000023-index-generationDate-versions.plsql @@ -0,0 +1,31 @@ +BEGIN; + +DO $$DECLARE + -- THESE MUST BE CHANGED WHEN YOU COPY THE SCRIPT! + + -- The version you expect the database to have _before_ the migration + old_version numeric := 22; + -- The version the database should have _after_ the migration + new_version numeric := 23; + + -- hands off + existing_version numeric; + +BEGIN + + -- Check existing version + SELECT version from lddb__schema INTO existing_version; + IF ( existing_version <> old_version) THEN + RAISE EXCEPTION 'ASKED TO MIGRATE FROM INCORRECT EXISTING VERSION!'; + ROLLBACK; + END IF; + UPDATE lddb__schema SET version = new_version; + + -- ACTUAL SCHEMA CHANGES HERE: + + -- This is an ammendment of migration #18, which did this for the lddb-table (including the totstz-function). We now need it for versions as well. + CREATE INDEX IF NOT EXISTS idx_lddb__versions_generation_date ON lddb__versions (totstz(data#>>'{@graph,0,generationDate}')); + +END$$; + +COMMIT; From 782f0cbed67c114ba8fe98150de4a636ca43449f Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Thu, 31 Oct 2024 14:13:20 +0100 Subject: [PATCH 03/23] Starting to look a little like EMM. --- emm/src/main/java/whelk/EmmServlet.java | 92 +------------------------ 1 file changed, 3 insertions(+), 89 deletions(-) diff --git a/emm/src/main/java/whelk/EmmServlet.java b/emm/src/main/java/whelk/EmmServlet.java index 116581a06d..063c176a52 100644 --- a/emm/src/main/java/whelk/EmmServlet.java +++ b/emm/src/main/java/whelk/EmmServlet.java @@ -13,7 +13,6 @@ import org.apache.logging.log4j.LogManager; public class EmmServlet extends HttpServlet { - private static final int TARGET_HITS_PER_PAGE = 100; private final Logger logger = LogManager.getLogger(this.getClass()); private final HashSet availableCategories; private final Whelk whelk; @@ -31,8 +30,6 @@ public void destroy() { public void doGet(HttpServletRequest req, HttpServletResponse res) throws IOException { try { - logger.info("Received request with from {}:{}.", req.getRemoteAddr(), req.getRemotePort()); - String category = req.getParameter("category"); String until = req.getParameter("until"); @@ -50,97 +47,14 @@ public void doGet(HttpServletRequest req, HttpServletResponse res) throws IOExce } // Send ChangeSet reply - sendChangeSet(whelk, res, category, until); + res.setCharacterEncoding("utf-8"); + res.setContentType("application/json"); + EmmChangeSet.sendChangeSet(whelk, res, category, until); } catch (Exception e) { e.printStackTrace(); } } - private void sendChangeSet(Whelk whelk, HttpServletResponse res, String category, String until) throws IOException { - - List activitiesOnPage = new ArrayList<>(TARGET_HITS_PER_PAGE+5); - Timestamp nextTimeStamp = getPage(whelk, category, until, activitiesOnPage); - if (nextTimeStamp == null) { - res.sendError(500); - return; - } - - for (EmmActivity a : activitiesOnPage) { - System.err.println(a); - } - } - - /** - * Get a page's worth of items. The results will be added to the 'result'-list. The return value will be "the next timestamp" - * to start getting the next page at, or null on failure. - */ - private Timestamp getPage(Whelk whelk, String category, String until, List result) { - - // Internally 'until' is "milliseconds since epoch". - long untilNumerical = Long.parseLong(until); - Timestamp untilTimeStamp = new Timestamp(untilNumerical); - Timestamp earliestSeenTimeStamp = new Timestamp(System.currentTimeMillis()); // This is (will be set to) how far back in time this page stretches - - try (Connection connection = whelk.getStorage().getOuterConnection()) { - - // Get a page of items - { - String sql = "SELECT" + - " data#>>'{@graph,1,@id}'," + - " GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz)," + - " deleted," + - " created," + - " data#>>'{@graph,1,@type}'" + - " FROM" + - " lddb__versions" + - " WHERE GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) <= ? " + - " ORDER BY GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) DESC LIMIT ? ".stripIndent(); - PreparedStatement preparedStatement = connection.prepareStatement(sql); - preparedStatement.setTimestamp(1, untilTimeStamp); - preparedStatement.setInt(2, TARGET_HITS_PER_PAGE); - preparedStatement.setFetchSize(TARGET_HITS_PER_PAGE + 1); - try (ResultSet resultSet = preparedStatement.executeQuery()) { - while (resultSet.next()) { - String uri = resultSet.getString(1); - Timestamp modificationTime = resultSet.getTimestamp(2); - boolean deleted = resultSet.getBoolean(3); - Timestamp creationTime = resultSet.getTimestamp(4); - String type = resultSet.getString(5); - result.add(new EmmActivity(uri, type, creationTime, modificationTime, deleted)); - if (modificationTime.before(earliestSeenTimeStamp)) - earliestSeenTimeStamp = modificationTime; - } - } - } - - // Get any extra records that share an exact modification time with the earliest seen time on this page - { - String sql = "SELECT" + - " data#>>'{@graph,1,@id}'," + - " deleted," + - " created," + - " data#>>'{@graph,1,@type}'" + - " FROM lddb__versions WHERE GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) = ?".stripIndent(); - PreparedStatement preparedStatement = connection.prepareStatement(sql); - preparedStatement.setTimestamp(1, untilTimeStamp); - try (ResultSet resultSet = preparedStatement.executeQuery()) { - while (resultSet.next()) { - String uri = resultSet.getString(1); - Timestamp modificationTime = resultSet.getTimestamp(2); - boolean deleted = resultSet.getBoolean(3); - Timestamp creationTime = resultSet.getTimestamp(4); - String type = resultSet.getString(5); - result.add(new EmmActivity(uri, type, creationTime, modificationTime, deleted)); - } - } - } - } catch (SQLException se) { - logger.error(se); - return null; - } - - return earliestSeenTimeStamp; - } } From 087ae95cc6ef181cad581fd82e76df64b78cf5fc Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Thu, 31 Oct 2024 14:42:15 +0100 Subject: [PATCH 04/23] Add a missing file. --- emm/src/main/java/whelk/EmmChangeSet.java | 143 ++++++++++++++++++++++ emm/src/main/java/whelk/EmmServlet.java | 4 +- 2 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 emm/src/main/java/whelk/EmmChangeSet.java diff --git a/emm/src/main/java/whelk/EmmChangeSet.java b/emm/src/main/java/whelk/EmmChangeSet.java new file mode 100644 index 0000000000..d017f9f8c6 --- /dev/null +++ b/emm/src/main/java/whelk/EmmChangeSet.java @@ -0,0 +1,143 @@ +package whelk; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import javax.servlet.http.HttpServletResponse; +import java.io.BufferedWriter; +import java.io.IOException; +import java.sql.*; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static whelk.util.Jackson.mapper; + +public class EmmChangeSet { + private static final int TARGET_HITS_PER_PAGE = 100; + private static final Logger logger = LogManager.getLogger(EmmChangeSet.class); + + static void sendChangeSet(Whelk whelk, HttpServletResponse res, String category, String until, String ApiBaseUrl) throws IOException { + + List activitiesOnPage = new ArrayList<>(TARGET_HITS_PER_PAGE+5); + Timestamp nextTimeStamp = getPage(whelk, category, until, activitiesOnPage); + if (nextTimeStamp == null) { + res.sendError(500); + return; + } + + // THIS SHIT is so painful in Java :( + HashMap responseObject = new HashMap(); + ArrayList contexts = new ArrayList(); + contexts.add("https://www.w3.org/ns/activitystreams"); + contexts.add("https://emm-spec.org/1.0/context.json"); + responseObject.put("@context", contexts); + responseObject.put("type", "OrderedCollectionPage"); + responseObject.put("id", ApiBaseUrl+"?category="+category+"&until="+until); + HashMap partOf = new HashMap(); + partOf.put("type", "OrderedCollectionPage"); + partOf.put("id", "TODO"); + responseObject.put("partOf", partOf); + responseObject.put("next", ApiBaseUrl+"?category="+category+"&until="+nextTimeStamp.getTime()); + List orderedItems = new ArrayList(); + responseObject.put("orderedItems", orderedItems); + + for (EmmActivity activityInList : activitiesOnPage) { + HashMap activityInStream = new HashMap(); + activityInStream.put("type", switch (activityInList.activityType) { + case CREATE -> "create"; + case UPDATE -> "update"; + case DELETE -> "delete"; + }); + activityInStream.put("published", ZonedDateTime.ofInstant(activityInList.modificationTime.toInstant(), ZoneOffset.UTC).toString()); + HashMap activityObject = new HashMap(); + activityInStream.put("object", activityObject); + activityObject.put("id", activityInList.uri); + activityObject.put("type", activityInList.entityType); + activityObject.put("updated", ZonedDateTime.ofInstant(activityInList.modificationTime.toInstant(), ZoneOffset.UTC).toString()); + orderedItems.add(activityInStream); + } + + String jsonResponse = mapper.writeValueAsString(responseObject); + BufferedWriter writer = new BufferedWriter( res.getWriter() ); + writer.write(jsonResponse); + writer.close(); + } + + /** + * Get a page's worth of items. The results will be added to the 'result'-list. The return value will be "the next timestamp" + * to start getting the next page at, or null on failure. + */ + private static Timestamp getPage(Whelk whelk, String category, String until, List result) { + + // Internally 'until' is "milliseconds since epoch". + long untilNumerical = Long.parseLong(until); + Timestamp untilTimeStamp = new Timestamp(untilNumerical); + + Timestamp earliestSeenTimeStamp = new Timestamp(System.currentTimeMillis()); // This is (will be set to) how far back in time this page stretches + + try (Connection connection = whelk.getStorage().getOuterConnection()) { + + // Get a page of items + { + String sql = "SELECT" + + " data#>>'{@graph,1,@id}'," + + " GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz)," + + " deleted," + + " created," + + " data#>>'{@graph,1,@type}'" + + " FROM" + + " lddb__versions" + + " WHERE GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) <= ? " + + " ORDER BY GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) DESC LIMIT ? ".stripIndent(); + PreparedStatement preparedStatement = connection.prepareStatement(sql); + preparedStatement.setTimestamp(1, untilTimeStamp); + preparedStatement.setInt(2, TARGET_HITS_PER_PAGE); + preparedStatement.setFetchSize(TARGET_HITS_PER_PAGE + 1); + try (ResultSet resultSet = preparedStatement.executeQuery()) { + while (resultSet.next()) { + String uri = resultSet.getString(1); + Timestamp modificationTime = resultSet.getTimestamp(2); + boolean deleted = resultSet.getBoolean(3); + Timestamp creationTime = resultSet.getTimestamp(4); + String type = resultSet.getString(5); + result.add(new EmmActivity(uri, type, creationTime, modificationTime, deleted)); + if (modificationTime.before(earliestSeenTimeStamp)) + earliestSeenTimeStamp = modificationTime; + } + } + } + + // Get any extra records that share an exact modification time with the earliest seen time on this page + { + String sql = "SELECT" + + " data#>>'{@graph,1,@id}'," + + " GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz)," + + " deleted," + + " created," + + " data#>>'{@graph,1,@type}'" + + " FROM lddb__versions WHERE GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) = ?".stripIndent(); + PreparedStatement preparedStatement = connection.prepareStatement(sql); + preparedStatement.setTimestamp(1, untilTimeStamp); + try (ResultSet resultSet = preparedStatement.executeQuery()) { + while (resultSet.next()) { + String uri = resultSet.getString(1); + Timestamp modificationTime = resultSet.getTimestamp(2); + boolean deleted = resultSet.getBoolean(3); + Timestamp creationTime = resultSet.getTimestamp(4); + String type = resultSet.getString(5); + result.add(new EmmActivity(uri, type, creationTime, modificationTime, deleted)); + } + } + } + } catch (SQLException se) { + logger.error(se); + se.printStackTrace(); + return null; + } + + return earliestSeenTimeStamp; + } +} diff --git a/emm/src/main/java/whelk/EmmServlet.java b/emm/src/main/java/whelk/EmmServlet.java index 063c176a52..03e9f05e7e 100644 --- a/emm/src/main/java/whelk/EmmServlet.java +++ b/emm/src/main/java/whelk/EmmServlet.java @@ -46,10 +46,12 @@ public void doGet(HttpServletRequest req, HttpServletResponse res) throws IOExce return; } + String ApiBaseUrl = req.getRequestURL().toString(); + // Send ChangeSet reply res.setCharacterEncoding("utf-8"); res.setContentType("application/json"); - EmmChangeSet.sendChangeSet(whelk, res, category, until); + EmmChangeSet.sendChangeSet(whelk, res, category, until, ApiBaseUrl); } catch (Exception e) { e.printStackTrace(); From 2cfae284b7447030abc8f14214fb3c11e7e79e18 Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Thu, 31 Oct 2024 15:05:28 +0100 Subject: [PATCH 05/23] Fix DB index usage. --- emm/src/main/java/whelk/EmmChangeSet.java | 11 ++++++----- .../00000023-index-generationDate-versions.plsql | 3 +-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/emm/src/main/java/whelk/EmmChangeSet.java b/emm/src/main/java/whelk/EmmChangeSet.java index d017f9f8c6..c7832eb73b 100644 --- a/emm/src/main/java/whelk/EmmChangeSet.java +++ b/emm/src/main/java/whelk/EmmChangeSet.java @@ -84,18 +84,19 @@ private static Timestamp getPage(Whelk whelk, String category, String until, Lis { String sql = "SELECT" + " data#>>'{@graph,1,@id}'," + - " GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz)," + + " GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}'))," + " deleted," + " created," + " data#>>'{@graph,1,@type}'" + " FROM" + " lddb__versions" + - " WHERE GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) <= ? " + - " ORDER BY GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) DESC LIMIT ? ".stripIndent(); + " WHERE GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}')) <= ? " + + " ORDER BY GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}')) DESC LIMIT ? ".stripIndent(); PreparedStatement preparedStatement = connection.prepareStatement(sql); preparedStatement.setTimestamp(1, untilTimeStamp); preparedStatement.setInt(2, TARGET_HITS_PER_PAGE); preparedStatement.setFetchSize(TARGET_HITS_PER_PAGE + 1); + System.err.println(preparedStatement); try (ResultSet resultSet = preparedStatement.executeQuery()) { while (resultSet.next()) { String uri = resultSet.getString(1); @@ -114,11 +115,11 @@ private static Timestamp getPage(Whelk whelk, String category, String until, Lis { String sql = "SELECT" + " data#>>'{@graph,1,@id}'," + - " GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz)," + + " GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}'))," + " deleted," + " created," + " data#>>'{@graph,1,@type}'" + - " FROM lddb__versions WHERE GREATEST(modified, (data#>>'{@graph,0,generationDate}')::timestamptz) = ?".stripIndent(); + " FROM lddb__versions WHERE GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}')) = ?".stripIndent(); PreparedStatement preparedStatement = connection.prepareStatement(sql); preparedStatement.setTimestamp(1, untilTimeStamp); try (ResultSet resultSet = preparedStatement.executeQuery()) { diff --git a/librisxl-tools/postgresql/migrations/00000023-index-generationDate-versions.plsql b/librisxl-tools/postgresql/migrations/00000023-index-generationDate-versions.plsql index a15cd1aeb2..bd21727136 100644 --- a/librisxl-tools/postgresql/migrations/00000023-index-generationDate-versions.plsql +++ b/librisxl-tools/postgresql/migrations/00000023-index-generationDate-versions.plsql @@ -23,8 +23,7 @@ BEGIN -- ACTUAL SCHEMA CHANGES HERE: - -- This is an ammendment of migration #18, which did this for the lddb-table (including the totstz-function). We now need it for versions as well. - CREATE INDEX IF NOT EXISTS idx_lddb__versions_generation_date ON lddb__versions (totstz(data#>>'{@graph,0,generationDate}')); + CREATE INDEX IF NOT EXISTS idx_lddb__versions_generation_date ON lddb__versions (GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}'))); END$$; From dfc0c8c8b1f5b13969dad1602faec7c66e91d126 Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Fri, 1 Nov 2024 12:41:00 +0100 Subject: [PATCH 06/23] Add the entry point response. --- emm/src/main/java/whelk/EmmChangeSet.java | 4 +-- emm/src/main/java/whelk/EmmServlet.java | 38 +++++++++++++++++------ 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/emm/src/main/java/whelk/EmmChangeSet.java b/emm/src/main/java/whelk/EmmChangeSet.java index c7832eb73b..d9180a56f3 100644 --- a/emm/src/main/java/whelk/EmmChangeSet.java +++ b/emm/src/main/java/whelk/EmmChangeSet.java @@ -37,8 +37,8 @@ static void sendChangeSet(Whelk whelk, HttpServletResponse res, String category, responseObject.put("type", "OrderedCollectionPage"); responseObject.put("id", ApiBaseUrl+"?category="+category+"&until="+until); HashMap partOf = new HashMap(); - partOf.put("type", "OrderedCollectionPage"); - partOf.put("id", "TODO"); + partOf.put("type", "OrderedCollection"); + partOf.put("id", ApiBaseUrl+"?category="+category); responseObject.put("partOf", partOf); responseObject.put("next", ApiBaseUrl+"?category="+category+"&until="+nextTimeStamp.getTime()); List orderedItems = new ArrayList(); diff --git a/emm/src/main/java/whelk/EmmServlet.java b/emm/src/main/java/whelk/EmmServlet.java index 03e9f05e7e..97ccfe88bd 100644 --- a/emm/src/main/java/whelk/EmmServlet.java +++ b/emm/src/main/java/whelk/EmmServlet.java @@ -3,15 +3,19 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import java.io.BufferedWriter; import java.io.IOException; import java.sql.*; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import static whelk.util.Jackson.mapper; + public class EmmServlet extends HttpServlet { private final Logger logger = LogManager.getLogger(this.getClass()); private final HashSet availableCategories; @@ -32,25 +36,41 @@ public void doGet(HttpServletRequest req, HttpServletResponse res) throws IOExce try { String category = req.getParameter("category"); String until = req.getParameter("until"); + String ApiBaseUrl = req.getRequestURL().toString(); - System.err.println("category: " + category); - System.err.println("until: " + until); + res.setCharacterEncoding("utf-8"); + res.setContentType("application/json"); if (!availableCategories.contains(category)) { - res.sendError(400); // temp + res.sendError(404); return; } - if (until == null) { // Send EntryPoint reply - res.sendError(400); // temp + // Send an Entry-Point reply + if (until == null) { + HashMap responseObject = new HashMap(); + ArrayList contexts = new ArrayList(); + contexts.add("https://www.w3.org/ns/activitystreams"); + contexts.add("https://emm-spec.org/1.0/context.json"); + responseObject.put("@context", contexts); + responseObject.put("type", "OrderedCollection"); + responseObject.put("id", ApiBaseUrl+"?category="+category); + + HashMap first = new HashMap(); + first.put("type", "OrderedCollectionPage"); + first.put("id", ApiBaseUrl+"?category="+category+"&until="+System.currentTimeMillis()); + + responseObject.put("first", first); + + + String jsonResponse = mapper.writeValueAsString(responseObject); + BufferedWriter writer = new BufferedWriter( res.getWriter() ); + writer.write(jsonResponse); + writer.close(); return; } - String ApiBaseUrl = req.getRequestURL().toString(); - // Send ChangeSet reply - res.setCharacterEncoding("utf-8"); - res.setContentType("application/json"); EmmChangeSet.sendChangeSet(whelk, res, category, until, ApiBaseUrl); } catch (Exception e) { From 73f48b92438cf82eb3ee908664243b2d9167ab47 Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Mon, 4 Nov 2024 15:01:20 +0100 Subject: [PATCH 07/23] Work in progress, dumps --- emm/src/main/java/whelk/Dump.java | 97 +++++++++++++++++++++++ emm/src/main/java/whelk/EmmChangeSet.java | 7 +- emm/src/main/java/whelk/EmmServlet.java | 12 ++- 3 files changed, 108 insertions(+), 8 deletions(-) create mode 100644 emm/src/main/java/whelk/Dump.java diff --git a/emm/src/main/java/whelk/Dump.java b/emm/src/main/java/whelk/Dump.java new file mode 100644 index 0000000000..04bc5935b8 --- /dev/null +++ b/emm/src/main/java/whelk/Dump.java @@ -0,0 +1,97 @@ +package whelk; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.*; + +public class Dump { + private static final Logger logger = LogManager.getLogger(Dump.class); + private static final String DUMP_END_MARKER = "_DUMP_END_MARKER"; // Must be 16 bytes + + public static void sendDumpResponse(Whelk whelk, HttpServletRequest req, HttpServletResponse res) throws IOException, SQLException { + String dump = req.getParameter("dump"); + String offset = req.getParameter("offset"); + + String tmpDir = System.getProperty("java.io.tmpdir"); + Path dumpsPath = Paths.get(tmpDir, "dumps"); + Files.createDirectories(dumpsPath); + Path dumpFilePath = dumpsPath.resolve(dump+".dump"); + + generateDump(whelk, dumpFilePath); + sendDumpPageResponse(dumpFilePath, offset, res); + } + + private static void sendDumpPageResponse(Path dumpFilePath, String offset, HttpServletResponse res) { + // Determine the state of this dump + try { + // Not begun writing yet ? + while (!Files.exists(dumpFilePath)) { + System.err.println("no dump yet.."); + Thread.sleep(10); + } + + try (RandomAccessFile file = new RandomAccessFile(dumpFilePath.toFile(), "r")) { + + // Is the dump generation finished ? + file.seek(file.length()-17); + boolean dumpFinished = file.readLine().equals(DUMP_END_MARKER); + + // Not data enough for a full page yet ? + while (!dumpFinished && file.length() < 17 * EmmChangeSet.TARGET_HITS_PER_PAGE) { + System.err.println("not enough lines.."); + Thread.sleep(10); + } + + //file.seek(offset); + System.err.println("PAGE AVAILABLE!!\n"); + + } + } catch (IOException | InterruptedException e) { + logger.error("Failed reading dumpfile: " + dumpFilePath, e); + } + } + + private static void invalidateIfOld(Path dumpFilePath) { + // TODO + } + + private static void generateDump(Whelk whelk, Path dumpFilePath) { + new Thread(() -> { + try (BufferedWriter dumpFileWriter = new BufferedWriter(new FileWriter(dumpFilePath.toFile())); + Connection connection = whelk.getStorage().getOuterConnection()) { + + connection.setAutoCommit(false); + String library = "https://libris.kb.se/library/Li"; // TEMP + String sql = " SELECT " + + " id" + + " FROM" + + " lddb" + + " WHERE" + + " data#>>'{@graph,1,heldBy,@id}' = ?"; + PreparedStatement preparedStatement = connection.prepareStatement(sql); + + preparedStatement.setString(1, library); + preparedStatement.setFetchSize(EmmChangeSet.TARGET_HITS_PER_PAGE); + try (ResultSet resultSet = preparedStatement.executeQuery()) { + while (resultSet.next()) { + + // Each line must be exactly 17 bytes long, including the (unix-)line break. + String id = String.format("%-16s\n", resultSet.getString(1)); + dumpFileWriter.write(id); + } + } + + dumpFileWriter.write( String.format("%-16s\n", DUMP_END_MARKER) ); + } catch (IOException | SQLException e) { + logger.error("Failed dump generation", e); + } + }).start(); + } +} diff --git a/emm/src/main/java/whelk/EmmChangeSet.java b/emm/src/main/java/whelk/EmmChangeSet.java index d9180a56f3..4fbe10da62 100644 --- a/emm/src/main/java/whelk/EmmChangeSet.java +++ b/emm/src/main/java/whelk/EmmChangeSet.java @@ -16,7 +16,7 @@ import static whelk.util.Jackson.mapper; public class EmmChangeSet { - private static final int TARGET_HITS_PER_PAGE = 100; + public static final int TARGET_HITS_PER_PAGE = 100; private static final Logger logger = LogManager.getLogger(EmmChangeSet.class); static void sendChangeSet(Whelk whelk, HttpServletResponse res, String category, String until, String ApiBaseUrl) throws IOException { @@ -91,12 +91,11 @@ private static Timestamp getPage(Whelk whelk, String category, String until, Lis " FROM" + " lddb__versions" + " WHERE GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}')) <= ? " + - " ORDER BY GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}')) DESC LIMIT ? ".stripIndent(); + " ORDER BY GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}')) DESC LIMIT ? "; PreparedStatement preparedStatement = connection.prepareStatement(sql); preparedStatement.setTimestamp(1, untilTimeStamp); preparedStatement.setInt(2, TARGET_HITS_PER_PAGE); preparedStatement.setFetchSize(TARGET_HITS_PER_PAGE + 1); - System.err.println(preparedStatement); try (ResultSet resultSet = preparedStatement.executeQuery()) { while (resultSet.next()) { String uri = resultSet.getString(1); @@ -119,7 +118,7 @@ private static Timestamp getPage(Whelk whelk, String category, String until, Lis " deleted," + " created," + " data#>>'{@graph,1,@type}'" + - " FROM lddb__versions WHERE GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}')) = ?".stripIndent(); + " FROM lddb__versions WHERE GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}')) = ?"; PreparedStatement preparedStatement = connection.prepareStatement(sql); preparedStatement.setTimestamp(1, untilTimeStamp); try (ResultSet resultSet = preparedStatement.executeQuery()) { diff --git a/emm/src/main/java/whelk/EmmServlet.java b/emm/src/main/java/whelk/EmmServlet.java index 97ccfe88bd..3a73615183 100644 --- a/emm/src/main/java/whelk/EmmServlet.java +++ b/emm/src/main/java/whelk/EmmServlet.java @@ -32,8 +32,9 @@ public void init() { public void destroy() { } - public void doGet(HttpServletRequest req, HttpServletResponse res) throws IOException { + public void doGet(HttpServletRequest req, HttpServletResponse res) { try { + String dump = req.getParameter("dump"); String category = req.getParameter("category"); String until = req.getParameter("until"); String ApiBaseUrl = req.getRequestURL().toString(); @@ -41,6 +42,11 @@ public void doGet(HttpServletRequest req, HttpServletResponse res) throws IOExce res.setCharacterEncoding("utf-8"); res.setContentType("application/json"); + if (dump != null) { + Dump.sendDumpResponse(whelk, req, res); + return; + } + if (!availableCategories.contains(category)) { res.sendError(404); return; @@ -55,14 +61,12 @@ public void doGet(HttpServletRequest req, HttpServletResponse res) throws IOExce responseObject.put("@context", contexts); responseObject.put("type", "OrderedCollection"); responseObject.put("id", ApiBaseUrl+"?category="+category); - + responseObject.put("url", ApiBaseUrl+"?dump=index"); HashMap first = new HashMap(); first.put("type", "OrderedCollectionPage"); first.put("id", ApiBaseUrl+"?category="+category+"&until="+System.currentTimeMillis()); - responseObject.put("first", first); - String jsonResponse = mapper.writeValueAsString(responseObject); BufferedWriter writer = new BufferedWriter( res.getWriter() ); writer.write(jsonResponse); From 48b7b3fc01864cd873f10fb8312589a737fc7384 Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Tue, 5 Nov 2024 13:27:03 +0100 Subject: [PATCH 08/23] Read dumps, but don't send em yet. --- emm/src/main/java/whelk/Dump.java | 62 +++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 15 deletions(-) diff --git a/emm/src/main/java/whelk/Dump.java b/emm/src/main/java/whelk/Dump.java index 04bc5935b8..2be37ba15e 100644 --- a/emm/src/main/java/whelk/Dump.java +++ b/emm/src/main/java/whelk/Dump.java @@ -6,56 +6,88 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.*; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.sql.*; +import java.util.ArrayList; public class Dump { private static final Logger logger = LogManager.getLogger(Dump.class); - private static final String DUMP_END_MARKER = "_DUMP_END_MARKER"; // Must be 16 bytes + private static final String DUMP_END_MARKER = "_DUMP_END_MARKER\n"; // Must be 17 bytes public static void sendDumpResponse(Whelk whelk, HttpServletRequest req, HttpServletResponse res) throws IOException, SQLException { String dump = req.getParameter("dump"); String offset = req.getParameter("offset"); + long offsetNumeric = Long.parseLong(offset); + System.err.println("offset: " + offsetNumeric); String tmpDir = System.getProperty("java.io.tmpdir"); Path dumpsPath = Paths.get(tmpDir, "dumps"); Files.createDirectories(dumpsPath); Path dumpFilePath = dumpsPath.resolve(dump+".dump"); - generateDump(whelk, dumpFilePath); - sendDumpPageResponse(dumpFilePath, offset, res); + invalidateIfOld(dumpFilePath); + if (!Files.exists(dumpFilePath)) + generateDump(whelk, dumpFilePath); + sendDumpPageResponse(dumpFilePath, offsetNumeric, res); } - private static void sendDumpPageResponse(Path dumpFilePath, String offset, HttpServletResponse res) { - // Determine the state of this dump + private static void sendDumpPageResponse(Path dumpFilePath, long offset, HttpServletResponse res) { + ArrayList recordIdsOnPage = new ArrayList<>(EmmChangeSet.TARGET_HITS_PER_PAGE); + try { - // Not begun writing yet ? + // Has the dump not begun being written yet ? while (!Files.exists(dumpFilePath)) { - System.err.println("no dump yet.."); Thread.sleep(10); } try (RandomAccessFile file = new RandomAccessFile(dumpFilePath.toFile(), "r")) { + byte[] lineBuffer = new byte[17]; // Is the dump generation finished ? - file.seek(file.length()-17); - boolean dumpFinished = file.readLine().equals(DUMP_END_MARKER); + boolean dumpFinished = false; + if (file.length() >= 17) { + file.seek(file.length() - 17); + dumpFinished = ( 17 == file.read(lineBuffer) && new String(lineBuffer, StandardCharsets.UTF_8).equals(DUMP_END_MARKER) ); + } - // Not data enough for a full page yet ? - while (!dumpFinished && file.length() < 17 * EmmChangeSet.TARGET_HITS_PER_PAGE) { - System.err.println("not enough lines.."); + // Is there not enough data for a full page yet ? + long startOffsetBytes = 17 * offset; + while (!dumpFinished && file.length() < startOffsetBytes + (17 * (long)EmmChangeSet.TARGET_HITS_PER_PAGE)) { Thread.sleep(10); + + if (file.length() >= 17) { + file.seek(file.length() - 17); + dumpFinished = ( 17 == file.read(lineBuffer) && new String(lineBuffer, StandardCharsets.UTF_8).equals(DUMP_END_MARKER) ); + } } - //file.seek(offset); - System.err.println("PAGE AVAILABLE!!\n"); + // We're ok to send a full page, or the end of the dump (which may be a partial page). + file.seek(startOffsetBytes); + int recordsToSend = Integer.min(EmmChangeSet.TARGET_HITS_PER_PAGE, (int)((file.length() - startOffsetBytes) / 17) - 1); + for (int i = 0; i < recordsToSend; ++i) { + if (17 == file.read(lineBuffer)) { + recordIdsOnPage.add(new String(lineBuffer, StandardCharsets.UTF_8)); + } else { + logger.error("Suspected corrupt dump (non-17-byte line detected): " + dumpFilePath); + res.sendError(500); + return; + } + } } } catch (IOException | InterruptedException e) { logger.error("Failed reading dumpfile: " + dumpFilePath, e); } + + sendFormattedResponse(recordIdsOnPage, res); + } + + private static void sendFormattedResponse(ArrayList recordIdsOnPage, HttpServletResponse res) { + for (String s : recordIdsOnPage) + System.err.println(s.trim()); } private static void invalidateIfOld(Path dumpFilePath) { @@ -88,7 +120,7 @@ private static void generateDump(Whelk whelk, Path dumpFilePath) { } } - dumpFileWriter.write( String.format("%-16s\n", DUMP_END_MARKER) ); + dumpFileWriter.write( String.format("%-17s", DUMP_END_MARKER) ); } catch (IOException | SQLException e) { logger.error("Failed dump generation", e); } From 6c99a105978cd8ffb6f82ab3a23cfe0892b3cdad Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Tue, 5 Nov 2024 14:53:46 +0100 Subject: [PATCH 09/23] Looks like somewhat working dumps --- emm/src/main/java/whelk/Dump.java | 56 +++++++++++++++++------ emm/src/main/java/whelk/EmmChangeSet.java | 8 ++-- emm/src/main/java/whelk/EmmServlet.java | 12 ++--- 3 files changed, 53 insertions(+), 23 deletions(-) diff --git a/emm/src/main/java/whelk/Dump.java b/emm/src/main/java/whelk/Dump.java index 2be37ba15e..b09b42af6f 100644 --- a/emm/src/main/java/whelk/Dump.java +++ b/emm/src/main/java/whelk/Dump.java @@ -12,16 +12,19 @@ import java.nio.file.Paths; import java.sql.*; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import static whelk.util.Jackson.mapper; public class Dump { private static final Logger logger = LogManager.getLogger(Dump.class); private static final String DUMP_END_MARKER = "_DUMP_END_MARKER\n"; // Must be 17 bytes - public static void sendDumpResponse(Whelk whelk, HttpServletRequest req, HttpServletResponse res) throws IOException, SQLException { + public static void sendDumpResponse(Whelk whelk, String apiBaseUrl, HttpServletRequest req, HttpServletResponse res) throws IOException, SQLException { String dump = req.getParameter("dump"); String offset = req.getParameter("offset"); long offsetNumeric = Long.parseLong(offset); - System.err.println("offset: " + offsetNumeric); String tmpDir = System.getProperty("java.io.tmpdir"); Path dumpsPath = Paths.get(tmpDir, "dumps"); @@ -31,11 +34,12 @@ public static void sendDumpResponse(Whelk whelk, HttpServletRequest req, HttpSer invalidateIfOld(dumpFilePath); if (!Files.exists(dumpFilePath)) generateDump(whelk, dumpFilePath); - sendDumpPageResponse(dumpFilePath, offsetNumeric, res); + sendDumpPageResponse(whelk, apiBaseUrl, dump, dumpFilePath, offsetNumeric, res); } - private static void sendDumpPageResponse(Path dumpFilePath, long offset, HttpServletResponse res) { + private static void sendDumpPageResponse(Whelk whelk, String apiBaseUrl, String dump, Path dumpFilePath, long offsetLines, HttpServletResponse res) throws IOException { ArrayList recordIdsOnPage = new ArrayList<>(EmmChangeSet.TARGET_HITS_PER_PAGE); + Long totalEntityCount = null; try { // Has the dump not begun being written yet ? @@ -54,8 +58,8 @@ private static void sendDumpPageResponse(Path dumpFilePath, long offset, HttpSer } // Is there not enough data for a full page yet ? - long startOffsetBytes = 17 * offset; - while (!dumpFinished && file.length() < startOffsetBytes + (17 * (long)EmmChangeSet.TARGET_HITS_PER_PAGE)) { + long offsetBytes = 17 * offsetLines; + while (!dumpFinished && file.length() < offsetBytes + (17 * (long)EmmChangeSet.TARGET_HITS_PER_PAGE)) { Thread.sleep(10); if (file.length() >= 17) { @@ -64,12 +68,16 @@ private static void sendDumpPageResponse(Path dumpFilePath, long offset, HttpSer } } + if (dumpFinished) { + totalEntityCount = file.length() / 17 - 1; + } + // We're ok to send a full page, or the end of the dump (which may be a partial page). - file.seek(startOffsetBytes); - int recordsToSend = Integer.min(EmmChangeSet.TARGET_HITS_PER_PAGE, (int)((file.length() - startOffsetBytes) / 17) - 1); + file.seek(offsetBytes); + int recordsToSend = Integer.min(EmmChangeSet.TARGET_HITS_PER_PAGE, (int)((file.length() - offsetBytes) / 17) - 1); for (int i = 0; i < recordsToSend; ++i) { if (17 == file.read(lineBuffer)) { - recordIdsOnPage.add(new String(lineBuffer, StandardCharsets.UTF_8)); + recordIdsOnPage.add(new String(lineBuffer, StandardCharsets.UTF_8).trim()); } else { logger.error("Suspected corrupt dump (non-17-byte line detected): " + dumpFilePath); res.sendError(500); @@ -82,12 +90,34 @@ private static void sendDumpPageResponse(Path dumpFilePath, long offset, HttpSer logger.error("Failed reading dumpfile: " + dumpFilePath, e); } - sendFormattedResponse(recordIdsOnPage, res); + sendFormattedResponse(whelk, apiBaseUrl, dump, recordIdsOnPage, res, offsetLines + EmmChangeSet.TARGET_HITS_PER_PAGE, totalEntityCount); } - private static void sendFormattedResponse(ArrayList recordIdsOnPage, HttpServletResponse res) { - for (String s : recordIdsOnPage) - System.err.println(s.trim()); + private static void sendFormattedResponse(Whelk whelk, String apiBaseUrl, String dump, ArrayList recordIdsOnPage, HttpServletResponse res, long nextLineOffset, Long totalEntityCount) throws IOException{ + HashMap responseObject = new HashMap(); + + if (totalEntityCount == null) + responseObject.put("status", "generating"); + else { + responseObject.put("status", "done"); + responseObject.put("totalEntityCount", totalEntityCount); + } + + if (nextLineOffset < totalEntityCount) { + responseObject.put("next", apiBaseUrl+"?dump="+dump+"&offset="+nextLineOffset); + } + + ArrayList entitesList = new ArrayList<>(EmmChangeSet.TARGET_HITS_PER_PAGE); + responseObject.put("entities", entitesList); + Map idsAndRecords = whelk.bulkLoad(recordIdsOnPage); + for (Document doc : idsAndRecords.values()) { + entitesList.add(doc.getThing()); + } + + String jsonResponse = mapper.writeValueAsString(responseObject); + BufferedWriter writer = new BufferedWriter( res.getWriter() ); + writer.write(jsonResponse); + writer.close(); } private static void invalidateIfOld(Path dumpFilePath) { diff --git a/emm/src/main/java/whelk/EmmChangeSet.java b/emm/src/main/java/whelk/EmmChangeSet.java index 4fbe10da62..2a93d6e61c 100644 --- a/emm/src/main/java/whelk/EmmChangeSet.java +++ b/emm/src/main/java/whelk/EmmChangeSet.java @@ -19,7 +19,7 @@ public class EmmChangeSet { public static final int TARGET_HITS_PER_PAGE = 100; private static final Logger logger = LogManager.getLogger(EmmChangeSet.class); - static void sendChangeSet(Whelk whelk, HttpServletResponse res, String category, String until, String ApiBaseUrl) throws IOException { + static void sendChangeSet(Whelk whelk, HttpServletResponse res, String category, String until, String apiBaseUrl) throws IOException { List activitiesOnPage = new ArrayList<>(TARGET_HITS_PER_PAGE+5); Timestamp nextTimeStamp = getPage(whelk, category, until, activitiesOnPage); @@ -35,12 +35,12 @@ static void sendChangeSet(Whelk whelk, HttpServletResponse res, String category, contexts.add("https://emm-spec.org/1.0/context.json"); responseObject.put("@context", contexts); responseObject.put("type", "OrderedCollectionPage"); - responseObject.put("id", ApiBaseUrl+"?category="+category+"&until="+until); + responseObject.put("id", apiBaseUrl+"?category="+category+"&until="+until); HashMap partOf = new HashMap(); partOf.put("type", "OrderedCollection"); - partOf.put("id", ApiBaseUrl+"?category="+category); + partOf.put("id", apiBaseUrl+"?category="+category); responseObject.put("partOf", partOf); - responseObject.put("next", ApiBaseUrl+"?category="+category+"&until="+nextTimeStamp.getTime()); + responseObject.put("next", apiBaseUrl+"?category="+category+"&until="+nextTimeStamp.getTime()); List orderedItems = new ArrayList(); responseObject.put("orderedItems", orderedItems); diff --git a/emm/src/main/java/whelk/EmmServlet.java b/emm/src/main/java/whelk/EmmServlet.java index 3a73615183..c4183ae792 100644 --- a/emm/src/main/java/whelk/EmmServlet.java +++ b/emm/src/main/java/whelk/EmmServlet.java @@ -37,13 +37,13 @@ public void doGet(HttpServletRequest req, HttpServletResponse res) { String dump = req.getParameter("dump"); String category = req.getParameter("category"); String until = req.getParameter("until"); - String ApiBaseUrl = req.getRequestURL().toString(); + String apiBaseUrl = req.getRequestURL().toString(); res.setCharacterEncoding("utf-8"); res.setContentType("application/json"); if (dump != null) { - Dump.sendDumpResponse(whelk, req, res); + Dump.sendDumpResponse(whelk, apiBaseUrl, req, res); return; } @@ -60,11 +60,11 @@ public void doGet(HttpServletRequest req, HttpServletResponse res) { contexts.add("https://emm-spec.org/1.0/context.json"); responseObject.put("@context", contexts); responseObject.put("type", "OrderedCollection"); - responseObject.put("id", ApiBaseUrl+"?category="+category); - responseObject.put("url", ApiBaseUrl+"?dump=index"); + responseObject.put("id", apiBaseUrl+"?category="+category); + responseObject.put("url", apiBaseUrl+"?dump=index"); HashMap first = new HashMap(); first.put("type", "OrderedCollectionPage"); - first.put("id", ApiBaseUrl+"?category="+category+"&until="+System.currentTimeMillis()); + first.put("id", apiBaseUrl+"?category="+category+"&until="+System.currentTimeMillis()); responseObject.put("first", first); String jsonResponse = mapper.writeValueAsString(responseObject); @@ -75,7 +75,7 @@ public void doGet(HttpServletRequest req, HttpServletResponse res) { } // Send ChangeSet reply - EmmChangeSet.sendChangeSet(whelk, res, category, until, ApiBaseUrl); + EmmChangeSet.sendChangeSet(whelk, res, category, until, apiBaseUrl); } catch (Exception e) { e.printStackTrace(); From 675055e781bb1a5a4df4ce71550fb887bfc59cd4 Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Mon, 11 Nov 2024 11:32:58 +0100 Subject: [PATCH 10/23] No categories for feed. --- emm/src/main/java/whelk/EmmChangeSet.java | 12 ++++++------ emm/src/main/java/whelk/EmmServlet.java | 15 +++------------ 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/emm/src/main/java/whelk/EmmChangeSet.java b/emm/src/main/java/whelk/EmmChangeSet.java index 2a93d6e61c..e9d2824808 100644 --- a/emm/src/main/java/whelk/EmmChangeSet.java +++ b/emm/src/main/java/whelk/EmmChangeSet.java @@ -19,10 +19,10 @@ public class EmmChangeSet { public static final int TARGET_HITS_PER_PAGE = 100; private static final Logger logger = LogManager.getLogger(EmmChangeSet.class); - static void sendChangeSet(Whelk whelk, HttpServletResponse res, String category, String until, String apiBaseUrl) throws IOException { + static void sendChangeSet(Whelk whelk, HttpServletResponse res, String until, String apiBaseUrl) throws IOException { List activitiesOnPage = new ArrayList<>(TARGET_HITS_PER_PAGE+5); - Timestamp nextTimeStamp = getPage(whelk, category, until, activitiesOnPage); + Timestamp nextTimeStamp = getPage(whelk, until, activitiesOnPage); if (nextTimeStamp == null) { res.sendError(500); return; @@ -35,12 +35,12 @@ static void sendChangeSet(Whelk whelk, HttpServletResponse res, String category, contexts.add("https://emm-spec.org/1.0/context.json"); responseObject.put("@context", contexts); responseObject.put("type", "OrderedCollectionPage"); - responseObject.put("id", apiBaseUrl+"?category="+category+"&until="+until); + responseObject.put("id", apiBaseUrl+"?until="+until); HashMap partOf = new HashMap(); partOf.put("type", "OrderedCollection"); - partOf.put("id", apiBaseUrl+"?category="+category); + partOf.put("id", apiBaseUrl); responseObject.put("partOf", partOf); - responseObject.put("next", apiBaseUrl+"?category="+category+"&until="+nextTimeStamp.getTime()); + responseObject.put("next", apiBaseUrl+"?until="+nextTimeStamp.getTime()); List orderedItems = new ArrayList(); responseObject.put("orderedItems", orderedItems); @@ -70,7 +70,7 @@ static void sendChangeSet(Whelk whelk, HttpServletResponse res, String category, * Get a page's worth of items. The results will be added to the 'result'-list. The return value will be "the next timestamp" * to start getting the next page at, or null on failure. */ - private static Timestamp getPage(Whelk whelk, String category, String until, List result) { + private static Timestamp getPage(Whelk whelk, String until, List result) { // Internally 'until' is "milliseconds since epoch". long untilNumerical = Long.parseLong(until); diff --git a/emm/src/main/java/whelk/EmmServlet.java b/emm/src/main/java/whelk/EmmServlet.java index c4183ae792..345b9c79c0 100644 --- a/emm/src/main/java/whelk/EmmServlet.java +++ b/emm/src/main/java/whelk/EmmServlet.java @@ -18,11 +18,8 @@ public class EmmServlet extends HttpServlet { private final Logger logger = LogManager.getLogger(this.getClass()); - private final HashSet availableCategories; private final Whelk whelk; public EmmServlet() { - availableCategories = new HashSet<>(); - availableCategories.add("all"); whelk = Whelk.createLoadedCoreWhelk(); } @@ -35,7 +32,6 @@ public void destroy() { public void doGet(HttpServletRequest req, HttpServletResponse res) { try { String dump = req.getParameter("dump"); - String category = req.getParameter("category"); String until = req.getParameter("until"); String apiBaseUrl = req.getRequestURL().toString(); @@ -47,11 +43,6 @@ public void doGet(HttpServletRequest req, HttpServletResponse res) { return; } - if (!availableCategories.contains(category)) { - res.sendError(404); - return; - } - // Send an Entry-Point reply if (until == null) { HashMap responseObject = new HashMap(); @@ -60,11 +51,11 @@ public void doGet(HttpServletRequest req, HttpServletResponse res) { contexts.add("https://emm-spec.org/1.0/context.json"); responseObject.put("@context", contexts); responseObject.put("type", "OrderedCollection"); - responseObject.put("id", apiBaseUrl+"?category="+category); + responseObject.put("id", apiBaseUrl); responseObject.put("url", apiBaseUrl+"?dump=index"); HashMap first = new HashMap(); first.put("type", "OrderedCollectionPage"); - first.put("id", apiBaseUrl+"?category="+category+"&until="+System.currentTimeMillis()); + first.put("id", apiBaseUrl+"?until="+System.currentTimeMillis()); responseObject.put("first", first); String jsonResponse = mapper.writeValueAsString(responseObject); @@ -75,7 +66,7 @@ public void doGet(HttpServletRequest req, HttpServletResponse res) { } // Send ChangeSet reply - EmmChangeSet.sendChangeSet(whelk, res, category, until, apiBaseUrl); + EmmChangeSet.sendChangeSet(whelk, res, until, apiBaseUrl); } catch (Exception e) { e.printStackTrace(); From bd5882e6fdbf41619ca13c7ea09568d55331aaa1 Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Mon, 11 Nov 2024 14:04:20 +0100 Subject: [PATCH 11/23] Add a few basic dump categories. --- emm/src/main/java/whelk/Dump.java | 107 +++++++++++++++++++++++++++--- 1 file changed, 96 insertions(+), 11 deletions(-) diff --git a/emm/src/main/java/whelk/Dump.java b/emm/src/main/java/whelk/Dump.java index b09b42af6f..074d9fadc3 100644 --- a/emm/src/main/java/whelk/Dump.java +++ b/emm/src/main/java/whelk/Dump.java @@ -6,6 +6,8 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.*; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -23,6 +25,12 @@ public class Dump { public static void sendDumpResponse(Whelk whelk, String apiBaseUrl, HttpServletRequest req, HttpServletResponse res) throws IOException, SQLException { String dump = req.getParameter("dump"); + + if (dump.equals("index")) { + sendDumpIndexResponse(apiBaseUrl, res); + return; + } + String offset = req.getParameter("offset"); long offsetNumeric = Long.parseLong(offset); @@ -33,10 +41,41 @@ public static void sendDumpResponse(Whelk whelk, String apiBaseUrl, HttpServletR invalidateIfOld(dumpFilePath); if (!Files.exists(dumpFilePath)) - generateDump(whelk, dumpFilePath); + generateDump(whelk, dump, dumpFilePath); sendDumpPageResponse(whelk, apiBaseUrl, dump, dumpFilePath, offsetNumeric, res); } + private static void sendDumpIndexResponse(String apiBaseUrl, HttpServletResponse res) throws IOException { + HashMap responseObject = new HashMap(); + + // This could perhaps be better? + ArrayList categoriesList = new ArrayList<>(); + + HashMap allCategory = new HashMap(); + allCategory.put("url", apiBaseUrl+"?dump=all&offset=0"); + allCategory.put("description", "This category represents the whole collection, without reservations."); + categoriesList.add(allCategory); + + HashMap libraryCategory = new HashMap(); + libraryCategory.put("url", apiBaseUrl+"?dump=itemAndInstance-X&offset=0"); + libraryCategory.put("description", "These categories represent the Items and Instances held by a particular library. " + + "The relevant library-code (sigel) for which you want data must replace the X in the category url."); + categoriesList.add(libraryCategory); + + HashMap agentsCategory = new HashMap(); + agentsCategory.put("url", apiBaseUrl+"?dump=agents"); + agentsCategory.put("description", "This category represent agents (typically persons and organizations)."); + categoriesList.add(agentsCategory); + + // TODO: More categories! Subjects ? type=X ? etc + + responseObject.put("categories", categoriesList); + String jsonResponse = mapper.writeValueAsString(responseObject); + BufferedWriter writer = new BufferedWriter( res.getWriter() ); + writer.write(jsonResponse); + writer.close(); + } + private static void sendDumpPageResponse(Whelk whelk, String apiBaseUrl, String dump, Path dumpFilePath, long offsetLines, HttpServletResponse res) throws IOException { ArrayList recordIdsOnPage = new ArrayList<>(EmmChangeSet.TARGET_HITS_PER_PAGE); Long totalEntityCount = null; @@ -103,7 +142,7 @@ private static void sendFormattedResponse(Whelk whelk, String apiBaseUrl, String responseObject.put("totalEntityCount", totalEntityCount); } - if (nextLineOffset < totalEntityCount) { + if (totalEntityCount == null || nextLineOffset < totalEntityCount) { responseObject.put("next", apiBaseUrl+"?dump="+dump+"&offset="+nextLineOffset); } @@ -124,12 +163,13 @@ private static void invalidateIfOld(Path dumpFilePath) { // TODO } - private static void generateDump(Whelk whelk, Path dumpFilePath) { + private static void generateDump(Whelk whelk, String dump, Path dumpFilePath) { new Thread(() -> { try (BufferedWriter dumpFileWriter = new BufferedWriter(new FileWriter(dumpFilePath.toFile())); Connection connection = whelk.getStorage().getOuterConnection()) { - connection.setAutoCommit(false); + + /* String library = "https://libris.kb.se/library/Li"; // TEMP String sql = " SELECT " + " id" + @@ -138,15 +178,30 @@ private static void generateDump(Whelk whelk, Path dumpFilePath) { " WHERE" + " data#>>'{@graph,1,heldBy,@id}' = ?"; PreparedStatement preparedStatement = connection.prepareStatement(sql); + */ - preparedStatement.setString(1, library); - preparedStatement.setFetchSize(EmmChangeSet.TARGET_HITS_PER_PAGE); - try (ResultSet resultSet = preparedStatement.executeQuery()) { - while (resultSet.next()) { + PreparedStatement preparedStatement = null; - // Each line must be exactly 17 bytes long, including the (unix-)line break. - String id = String.format("%-16s\n", resultSet.getString(1)); - dumpFileWriter.write(id); + if (dump.equals("all")) { + preparedStatement = getAllDumpStatement(connection); + } else if (dump.startsWith("itemAndInstance-")) { + preparedStatement = getLibraryXDumpStatement(connection, dump.substring(16)); + } + + if (preparedStatement == null) { + logger.info("Dump request for unknown category: " + dump); + return; + } + + try (PreparedStatement p = preparedStatement) { + p.setFetchSize(EmmChangeSet.TARGET_HITS_PER_PAGE); + try (ResultSet resultSet = p.executeQuery()) { + while (resultSet.next()) { + + // Each line must be exactly 17 bytes long, including the (unix-)line break. + String id = String.format("%-16s\n", resultSet.getString(1)); + dumpFileWriter.write(id); + } } } @@ -156,4 +211,34 @@ private static void generateDump(Whelk whelk, Path dumpFilePath) { } }).start(); } + + private static PreparedStatement getAllDumpStatement(Connection connection) throws SQLException { + String sql = " SELECT " + + " id" + + " FROM" + + " lddb"; + PreparedStatement preparedStatement = connection.prepareStatement(sql); + return preparedStatement; + } + + private static PreparedStatement getLibraryXDumpStatement(Connection connection, String library) throws SQLException { + String sql = " SELECT " + + " id" + + " FROM" + + " lddb" + + " WHERE" + + " collection = 'hold' AND" + + " (data#>>'{@graph,1,heldBy,@id}' = ? OR data#>>'{@graph,1,heldBy,@id}' = ?)"; + PreparedStatement preparedStatement = connection.prepareStatement(sql); + + preparedStatement.setString(1, Document.getBASE_URI().resolve("/library/"+library).toString()); + + // This is uncomfortable. Library URIs are "https://libris.kb.se/library/.." regardless of environment base-URI. + // To protect ourselves from the fact that this could change, check both these URIs and environment-specific ones. + URI defaultLibrisURI = null; + try { defaultLibrisURI = new URI("https://libris.kb.se"); } catch (URISyntaxException e) { /*ignore*/ } + preparedStatement.setString(2, defaultLibrisURI.resolve("/library/"+library).toString()); + + return preparedStatement; + } } From ae381e5ded652a2117f474afa249b7132d270ef5 Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Mon, 11 Nov 2024 14:29:22 +0100 Subject: [PATCH 12/23] Embedd instances when dumping itemAndInstance-categories. --- emm/src/main/java/whelk/Dump.java | 38 +++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/emm/src/main/java/whelk/Dump.java b/emm/src/main/java/whelk/Dump.java index 074d9fadc3..e7dcabf652 100644 --- a/emm/src/main/java/whelk/Dump.java +++ b/emm/src/main/java/whelk/Dump.java @@ -150,7 +150,32 @@ private static void sendFormattedResponse(Whelk whelk, String apiBaseUrl, String responseObject.put("entities", entitesList); Map idsAndRecords = whelk.bulkLoad(recordIdsOnPage); for (Document doc : idsAndRecords.values()) { - entitesList.add(doc.getThing()); + + // Here is a bit of SPECIALIZED treatment only for the itemAndInstance-categories. These should + // include not only the Item (which is the root node for this category), but also the linked Instance. + // Without this, a client must individually GET every single Instance in their dataset, which scales poorly. + if (dump.startsWith("itemAndInstance-")) { + String itemOf = doc.getHoldingFor(); + if (itemOf == null) { + logger.warn("Holding of nothing? " + doc.getId()); + continue; + } + Document instance = new Document( whelk.loadData(itemOf) ); + if (instance == null) { + logger.warn("Bad instance? " + itemOf); + continue; + } + ArrayList itemOfPath = new ArrayList(); + itemOfPath.add("@graph"); itemOfPath.add(1); itemOfPath.add("itemOf"); // unggh.. + doc._set(itemOfPath, instance.getThing(), doc.data); + entitesList.add(doc.getThing()); + } + + // For normal categories + else { + entitesList.add(doc.getThing()); + } + } String jsonResponse = mapper.writeValueAsString(responseObject); @@ -169,17 +194,6 @@ private static void generateDump(Whelk whelk, String dump, Path dumpFilePath) { Connection connection = whelk.getStorage().getOuterConnection()) { connection.setAutoCommit(false); - /* - String library = "https://libris.kb.se/library/Li"; // TEMP - String sql = " SELECT " + - " id" + - " FROM" + - " lddb" + - " WHERE" + - " data#>>'{@graph,1,heldBy,@id}' = ?"; - PreparedStatement preparedStatement = connection.prepareStatement(sql); - */ - PreparedStatement preparedStatement = null; if (dump.equals("all")) { From db0280c489e34a56da1e3c12e2414e517bf5d60d Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Wed, 13 Nov 2024 12:31:16 +0100 Subject: [PATCH 13/23] Type categories for EMM-dumps. --- emm/src/main/java/whelk/Dump.java | 39 +++++++++++++------ ...000023-index-generationDate-versions.plsql | 2 + 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/emm/src/main/java/whelk/Dump.java b/emm/src/main/java/whelk/Dump.java index e7dcabf652..ff93b0d59d 100644 --- a/emm/src/main/java/whelk/Dump.java +++ b/emm/src/main/java/whelk/Dump.java @@ -13,9 +13,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.sql.*; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import static whelk.util.Jackson.mapper; @@ -48,7 +46,6 @@ public static void sendDumpResponse(Whelk whelk, String apiBaseUrl, HttpServletR private static void sendDumpIndexResponse(String apiBaseUrl, HttpServletResponse res) throws IOException { HashMap responseObject = new HashMap(); - // This could perhaps be better? ArrayList categoriesList = new ArrayList<>(); HashMap allCategory = new HashMap(); @@ -59,17 +56,18 @@ private static void sendDumpIndexResponse(String apiBaseUrl, HttpServletResponse HashMap libraryCategory = new HashMap(); libraryCategory.put("url", apiBaseUrl+"?dump=itemAndInstance-X&offset=0"); libraryCategory.put("description", "These categories represent the Items and Instances held by a particular library. " + - "The relevant library-code (sigel) for which you want data must replace the X in the category url."); + "The relevant library-code (sigel) for which you want data must replace the X in the category URL."); categoriesList.add(libraryCategory); - HashMap agentsCategory = new HashMap(); - agentsCategory.put("url", apiBaseUrl+"?dump=agents"); - agentsCategory.put("description", "This category represent agents (typically persons and organizations)."); - categoriesList.add(agentsCategory); - - // TODO: More categories! Subjects ? type=X ? etc + HashMap typesCategory = new HashMap(); + typesCategory.put("url", apiBaseUrl+"?dump=type:X&offset=0"); + typesCategory.put("description", "These categories represent the set of a entities of a certain type, including subtypes. " + + "For example the type Agent would include both Persons and Organizations etc. The X in the URL must be replaced " + + "with the type you want."); + categoriesList.add(typesCategory); responseObject.put("categories", categoriesList); + String jsonResponse = mapper.writeValueAsString(responseObject); BufferedWriter writer = new BufferedWriter( res.getWriter() ); writer.write(jsonResponse); @@ -200,6 +198,8 @@ private static void generateDump(Whelk whelk, String dump, Path dumpFilePath) { preparedStatement = getAllDumpStatement(connection); } else if (dump.startsWith("itemAndInstance-")) { preparedStatement = getLibraryXDumpStatement(connection, dump.substring(16)); + } else if (dump.startsWith("type:")) { + preparedStatement = getTypeXStatement(connection, whelk, dump.substring(5)); } if (preparedStatement == null) { @@ -255,4 +255,21 @@ private static PreparedStatement getLibraryXDumpStatement(Connection connection, return preparedStatement; } + + private static PreparedStatement getTypeXStatement(Connection connection, Whelk whelk, String type) throws SQLException { + Set types = whelk.getJsonld().getSubClasses(type); + types.add(type); + + String sql = " SELECT " + + " id" + + " FROM" + + " lddb" + + " WHERE" + + " data#>>'{@graph,1,@type}' = ANY( ? )"; + + PreparedStatement preparedStatement = connection.prepareStatement(sql); + preparedStatement.setArray(1, connection.createArrayOf("TEXT", types.toArray() )); + + return preparedStatement; + } } diff --git a/librisxl-tools/postgresql/migrations/00000023-index-generationDate-versions.plsql b/librisxl-tools/postgresql/migrations/00000023-index-generationDate-versions.plsql index bd21727136..dab2a2d38d 100644 --- a/librisxl-tools/postgresql/migrations/00000023-index-generationDate-versions.plsql +++ b/librisxl-tools/postgresql/migrations/00000023-index-generationDate-versions.plsql @@ -24,6 +24,8 @@ BEGIN -- ACTUAL SCHEMA CHANGES HERE: CREATE INDEX IF NOT EXISTS idx_lddb__versions_generation_date ON lddb__versions (GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}'))); + + CREATE INDEX IF NOT EXISTS idx_lddb_thing_type ON lddb ((data#>>'{@graph,1,@type}')); END$$; From b72529df6cfe785e846f9e6a6dcf75f550d04d05 Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Wed, 13 Nov 2024 13:13:20 +0100 Subject: [PATCH 14/23] Add dump IDs --- emm/src/main/java/whelk/Dump.java | 36 ++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/emm/src/main/java/whelk/Dump.java b/emm/src/main/java/whelk/Dump.java index ff93b0d59d..65ab01fd6c 100644 --- a/emm/src/main/java/whelk/Dump.java +++ b/emm/src/main/java/whelk/Dump.java @@ -12,7 +12,10 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.attribute.BasicFileAttributes; import java.sql.*; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.*; import static whelk.util.Jackson.mapper; @@ -54,7 +57,7 @@ private static void sendDumpIndexResponse(String apiBaseUrl, HttpServletResponse categoriesList.add(allCategory); HashMap libraryCategory = new HashMap(); - libraryCategory.put("url", apiBaseUrl+"?dump=itemAndInstance-X&offset=0"); + libraryCategory.put("url", apiBaseUrl+"?dump=itemAndInstance:X&offset=0"); libraryCategory.put("description", "These categories represent the Items and Instances held by a particular library. " + "The relevant library-code (sigel) for which you want data must replace the X in the category URL."); categoriesList.add(libraryCategory); @@ -127,12 +130,15 @@ private static void sendDumpPageResponse(Whelk whelk, String apiBaseUrl, String logger.error("Failed reading dumpfile: " + dumpFilePath, e); } - sendFormattedResponse(whelk, apiBaseUrl, dump, recordIdsOnPage, res, offsetLines + EmmChangeSet.TARGET_HITS_PER_PAGE, totalEntityCount); + BasicFileAttributes attributes = Files.readAttributes(dumpFilePath, BasicFileAttributes.class); + String dumpId = ""+(dumpFilePath.toString() + attributes.creationTime().toInstant().toEpochMilli()).hashCode(); + sendFormattedResponse(whelk, apiBaseUrl, dump, recordIdsOnPage, res, offsetLines + EmmChangeSet.TARGET_HITS_PER_PAGE, totalEntityCount, dumpId); } - private static void sendFormattedResponse(Whelk whelk, String apiBaseUrl, String dump, ArrayList recordIdsOnPage, HttpServletResponse res, long nextLineOffset, Long totalEntityCount) throws IOException{ + private static void sendFormattedResponse(Whelk whelk, String apiBaseUrl, String dump, ArrayList recordIdsOnPage, HttpServletResponse res, long nextLineOffset, Long totalEntityCount, String dumpId) throws IOException{ HashMap responseObject = new HashMap(); + responseObject.put("id", dumpId); if (totalEntityCount == null) responseObject.put("status", "generating"); else { @@ -149,10 +155,10 @@ private static void sendFormattedResponse(Whelk whelk, String apiBaseUrl, String Map idsAndRecords = whelk.bulkLoad(recordIdsOnPage); for (Document doc : idsAndRecords.values()) { - // Here is a bit of SPECIALIZED treatment only for the itemAndInstance-categories. These should + // Here is a bit of SPECIALIZED treatment only for the itemAndInstance:categories. These should // include not only the Item (which is the root node for this category), but also the linked Instance. // Without this, a client must individually GET every single Instance in their dataset, which scales poorly. - if (dump.startsWith("itemAndInstance-")) { + if (dump.startsWith("itemAndInstance:")) { String itemOf = doc.getHoldingFor(); if (itemOf == null) { logger.warn("Holding of nothing? " + doc.getId()); @@ -183,7 +189,21 @@ private static void sendFormattedResponse(Whelk whelk, String apiBaseUrl, String } private static void invalidateIfOld(Path dumpFilePath) { - // TODO + try { + if (!Files.exists(dumpFilePath)) + return; + + BasicFileAttributes attributes = Files.readAttributes(dumpFilePath, BasicFileAttributes.class); + if (attributes.creationTime().toInstant().isBefore(Instant.now().minus(5, ChronoUnit.DAYS))) { + Files.delete(dumpFilePath); + } + } catch (IOException e) { + // These exceptions are caught here due to the (theoretical) risk of files access race conditions. + // For example, it could be that a dump is being read by one thread, while passing the too-old-threshold + // and then while still being read, another thread sees the dump as too old and tries to delete it. + // Just log this sort of thing and carry on. + logger.info("Failed to invalidate (delete) EMM dump: " + dumpFilePath, e); + } } private static void generateDump(Whelk whelk, String dump, Path dumpFilePath) { @@ -196,7 +216,7 @@ private static void generateDump(Whelk whelk, String dump, Path dumpFilePath) { if (dump.equals("all")) { preparedStatement = getAllDumpStatement(connection); - } else if (dump.startsWith("itemAndInstance-")) { + } else if (dump.startsWith("itemAndInstance:")) { preparedStatement = getLibraryXDumpStatement(connection, dump.substring(16)); } else if (dump.startsWith("type:")) { preparedStatement = getTypeXStatement(connection, whelk, dump.substring(5)); @@ -212,7 +232,7 @@ private static void generateDump(Whelk whelk, String dump, Path dumpFilePath) { try (ResultSet resultSet = p.executeQuery()) { while (resultSet.next()) { - // Each line must be exactly 17 bytes long, including the (unix-)line break. + // Each line must be exactly 17 bytes long, including the (unix) line break. String id = String.format("%-16s\n", resultSet.getString(1)); dumpFileWriter.write(id); } From 2d819793625ebd3740e6ae0737d43c9df278fa7c Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Mon, 18 Nov 2024 09:41:50 +0100 Subject: [PATCH 15/23] Beginnings of an EMM client. --- emm/exampleclient/client.py | 209 ++++++++++++++++++++++++++++++ emm/src/main/java/whelk/Dump.java | 12 +- 2 files changed, 216 insertions(+), 5 deletions(-) create mode 100644 emm/exampleclient/client.py diff --git a/emm/exampleclient/client.py b/emm/exampleclient/client.py new file mode 100644 index 0000000000..c34d486f9b --- /dev/null +++ b/emm/exampleclient/client.py @@ -0,0 +1,209 @@ +# Standard library only. 3rd party dependencies not permitted. +import sqlite3 +import urllib.request +from urllib.request import Request, urlopen +import json +import os + +# Configuration section: +cache_location = "./libris-cache.sqlite3" +libris_emm_base_url = "http://localhost:8182/" +local_library_code = "S" + +def collect_entity(data, uri): + return {} + + +def collect_uris_with_data(entity): + uris = [] + if isinstance(entity, dict): + for key in entity: + if key == "@id" and len(entity) > 1: + uris.append(entity[key]) + uris = uris + collect_uris_with_data(entity[key]) + elif isinstance(entity, list): + for item in entity: + uris = uris + collect_uris_with_data(item) + return uris + + +def ingest_entity(entity, connection): + uris = collect_uris_with_data(entity) + cursor = connection.cursor() + entity_id = cursor.execute( + """ + INSERT INTO + entities(entity) + VALUES + (?) + """, + (json.dumps(entity),) + ).lastrowid + + for uri in uris: + cursor.execute( + """ + INSERT INTO + uris(entity_id, uri) + VALUES + (?, ?) + """, + (entity_id, uri,) + ) + connection.commit() + + +def load_dump(connection): + next_url = f"{libris_emm_base_url}?dump=itemAndInstance:{local_library_code}&offset=0" + dumpCreationTime = None + while next_url: + with urllib.request.urlopen(next_url) as response: + data = json.load(response) + dumpCreationTimeOnPage = data["creationTime"] + if (dumpCreationTime and dumpCreationTime != dumpCreationTimeOnPage): + print(" DUMP INVALIDATED WHILE DOWNLOADING, TODO: DEAL WITH THIS ") + dumpCreationTime = dumpCreationTimeOnPage + if "next" in data: + next_url = data["next"] + else: + next_url = None + if "entities" in data: + for entity in data["entities"]: + ingest_entity(entity, connection) + cursor = connection.cursor() + cursor.execute( + """ + INSERT INTO + state(changes_consumed_until) + VALUES + (?) + """, + (dumpCreationTime,) + ) + connection.commit() + + +def download_entity(url): + req = Request(url) + req.add_header('accept', 'application/json+ld') + return json.load(urlopen(req))["@graph"][1] + + +def replace_entity(node, replacement_entity): + #print(f" ** consider replacing {replacement_entity}\n\nin\n\n{node}\n\n") + print(f"Will now compare for replacement {replacement_entity['@id']} with {node['@id']}") + uri_to_replace = replacement_entity["@id"] + if isinstance(node, dict) and "@id" in node and node["@id"] == uri_to_replace: + print(" ROOT REPLACEMENT! ****\n\n") + return replacement_entity + else: + replace_subentity(node, replacement_entity) + return node + + +def replace_subentity(node, replacement_entity): + uri_to_replace = replacement_entity["@id"] + if isinstance(node, dict): + for key in node.keys: + if isinstance(node[key], dict) and "@id" in node[key] and node[key]["@id"] == uri_to_replace: + node[key] = replacement_entity + replace_subentity(node[key], replacement_entity) + elif isinstance(node, list): + for i in range(len(node)): + if isinstance(node[i], dict) and "@id" in node[i] and node[i]["@id"] == uri_to_replace: + node[i] = replacement_entity + replace_subentity(node[i], replacement_entity) + + +def handle_activity(connection, activity): + #print(f"Will now handle stuff: {json.dumps(activity)}") + + cursor = connection.cursor() + if activity["type"] == "create": + pass # TODO + elif activity["type"] == "delete": + pass # TODO + elif activity["type"] == "update": + + # Find all of our records that depend on this URI + rows = cursor.execute("SELECT entities.id, entities.entity FROM uris JOIN entities on uris.entity_id = entities.id WHERE uris.uri = ?", (activity["object"]["id"],)) + + updated_data = None + for row in rows: + if not updated_data: + updated_data = download_entity(activity["object"]["id"]) + #print( f"downloaded record: {json.dumps(download_entity(activity["object"]["id"]))}" ) + entity_id = row[0] + entity_data = json.loads(row[1]) + entity_data = replace_entity(entity_data, updated_data) + print(f"REPLACED part of entity that now looks like this: {entity_data}") + + + +def update(connection): + cursor = connection.cursor() + with urllib.request.urlopen(libris_emm_base_url) as response: + data = json.load(response) + next_url = data["first"]["id"] + + while next_url: + with urllib.request.urlopen(next_url) as response: + data = json.load(response) + if "next" in data: + next_url = data["next"] + else: + next_url = None + for item in data["orderedItems"]: + result = cursor.execute("SELECT julianday(changes_consumed_until) - julianday(?) FROM state", (item["published"],)) + diff = result.fetchone()[0] + if (float(diff) >= 0.0): + print(f"{item["published"]} is before our last taken update, stop here.") + next_url = None + break + handle_activity(connection, item) + cursor.execute("UPDATE state SET changes_consumed_until = ?", (item["published"],)) + connection.commit() + + +def main(): + cache_initialized = False + if os.path.exists(cache_location): + cache_initialized = True + connection = sqlite3.connect(cache_location) + cursor = connection.cursor() + cursor.execute("PRAGMA journal_mode=WAL") + cursor.execute("PRAGMA synchronous=OFF") + cursor.execute("PRAGMA foreign_keys=ON") + + # If the cache database does not exist, set up a new one + if not cache_initialized: + cursor.execute(""" +CREATE TABLE entities ( + id INTEGER PRIMARY KEY, + entity TEXT +); +""") + cursor.execute(""" +CREATE TABLE uris ( + id INTEGER PRIMARY KEY, + uri TEXT, + entity_id INTEGER, + UNIQUE(uri, entity_id) ON CONFLICT IGNORE, + FOREIGN KEY (entity_id) REFERENCES entities(id) +); +""") + cursor.execute(""" +CREATE TABLE state ( + id INTEGER PRIMARY KEY, + changes_consumed_until TEXT +); +""") + connection.commit() + load_dump(connection) + + update(connection) + + + +if __name__ == "__main__": + main() diff --git a/emm/src/main/java/whelk/Dump.java b/emm/src/main/java/whelk/Dump.java index 65ab01fd6c..2da59db545 100644 --- a/emm/src/main/java/whelk/Dump.java +++ b/emm/src/main/java/whelk/Dump.java @@ -15,6 +15,8 @@ import java.nio.file.attribute.BasicFileAttributes; import java.sql.*; import java.time.Instant; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; import java.util.*; @@ -64,7 +66,7 @@ private static void sendDumpIndexResponse(String apiBaseUrl, HttpServletResponse HashMap typesCategory = new HashMap(); typesCategory.put("url", apiBaseUrl+"?dump=type:X&offset=0"); - typesCategory.put("description", "These categories represent the set of a entities of a certain type, including subtypes. " + + typesCategory.put("description", "These categories represent the set of entities of a certain type, including subtypes. " + "For example the type Agent would include both Persons and Organizations etc. The X in the URL must be replaced " + "with the type you want."); categoriesList.add(typesCategory); @@ -131,14 +133,14 @@ private static void sendDumpPageResponse(Whelk whelk, String apiBaseUrl, String } BasicFileAttributes attributes = Files.readAttributes(dumpFilePath, BasicFileAttributes.class); - String dumpId = ""+(dumpFilePath.toString() + attributes.creationTime().toInstant().toEpochMilli()).hashCode(); - sendFormattedResponse(whelk, apiBaseUrl, dump, recordIdsOnPage, res, offsetLines + EmmChangeSet.TARGET_HITS_PER_PAGE, totalEntityCount, dumpId); + Instant dumpCreationTime = attributes.creationTime().toInstant(); + sendFormattedResponse(whelk, apiBaseUrl, dump, recordIdsOnPage, res, offsetLines + EmmChangeSet.TARGET_HITS_PER_PAGE, totalEntityCount, dumpCreationTime); } - private static void sendFormattedResponse(Whelk whelk, String apiBaseUrl, String dump, ArrayList recordIdsOnPage, HttpServletResponse res, long nextLineOffset, Long totalEntityCount, String dumpId) throws IOException{ + private static void sendFormattedResponse(Whelk whelk, String apiBaseUrl, String dump, ArrayList recordIdsOnPage, HttpServletResponse res, long nextLineOffset, Long totalEntityCount, Instant dumpCreationTime) throws IOException{ HashMap responseObject = new HashMap(); - responseObject.put("id", dumpId); + responseObject.put("creationTime", ZonedDateTime.ofInstant(dumpCreationTime, ZoneOffset.UTC).toString()); if (totalEntityCount == null) responseObject.put("status", "generating"); else { From 71b40b79d5f13eb722c6d23349ea781faa1adee5 Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Tue, 19 Nov 2024 09:59:09 +0100 Subject: [PATCH 16/23] emm client, write changes on update. --- emm/exampleclient/client.py | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/emm/exampleclient/client.py b/emm/exampleclient/client.py index c34d486f9b..8c3b3b8a11 100644 --- a/emm/exampleclient/client.py +++ b/emm/exampleclient/client.py @@ -89,18 +89,6 @@ def download_entity(url): return json.load(urlopen(req))["@graph"][1] -def replace_entity(node, replacement_entity): - #print(f" ** consider replacing {replacement_entity}\n\nin\n\n{node}\n\n") - print(f"Will now compare for replacement {replacement_entity['@id']} with {node['@id']}") - uri_to_replace = replacement_entity["@id"] - if isinstance(node, dict) and "@id" in node and node["@id"] == uri_to_replace: - print(" ROOT REPLACEMENT! ****\n\n") - return replacement_entity - else: - replace_subentity(node, replacement_entity) - return node - - def replace_subentity(node, replacement_entity): uri_to_replace = replacement_entity["@id"] if isinstance(node, dict): @@ -115,9 +103,16 @@ def replace_subentity(node, replacement_entity): replace_subentity(node[i], replacement_entity) -def handle_activity(connection, activity): - #print(f"Will now handle stuff: {json.dumps(activity)}") +def replace_entity(node, replacement_entity): + uri_to_replace = replacement_entity["@id"] + if isinstance(node, dict) and "@id" in node and node["@id"] == uri_to_replace: # Root-replacement + return replacement_entity + else: # Embedded replacement + replace_subentity(node, replacement_entity) + return node + +def handle_activity(connection, activity): cursor = connection.cursor() if activity["type"] == "create": pass # TODO @@ -132,11 +127,11 @@ def handle_activity(connection, activity): for row in rows: if not updated_data: updated_data = download_entity(activity["object"]["id"]) - #print( f"downloaded record: {json.dumps(download_entity(activity["object"]["id"]))}" ) entity_id = row[0] entity_data = json.loads(row[1]) entity_data = replace_entity(entity_data, updated_data) - print(f"REPLACED part of entity that now looks like this: {entity_data}") + cursor.execute("UPDATE entities SET entity = ? WHERE id = ?", (json.dumps(entity_data),entity_id)) + connection.commit() @@ -161,8 +156,9 @@ def update(connection): next_url = None break handle_activity(connection, item) - cursor.execute("UPDATE state SET changes_consumed_until = ?", (item["published"],)) - connection.commit() + # Disable for dev, temporary + #cursor.execute("UPDATE state SET changes_consumed_until = ?", (item["published"],)) + #connection.commit() def main(): From 702d9a5a6cc69b0fbcf5868a81be8dff566d3c7c Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Tue, 19 Nov 2024 12:54:27 +0100 Subject: [PATCH 17/23] embellish in emm client --- emm/exampleclient/client.py | 60 ++++++++++++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/emm/exampleclient/client.py b/emm/exampleclient/client.py index 8c3b3b8a11..8c971676e5 100644 --- a/emm/exampleclient/client.py +++ b/emm/exampleclient/client.py @@ -1,4 +1,4 @@ -# Standard library only. 3rd party dependencies not permitted. +# Standard library only, 3rd party dependencies are not permitted. import sqlite3 import urllib.request from urllib.request import Request, urlopen @@ -9,6 +9,7 @@ cache_location = "./libris-cache.sqlite3" libris_emm_base_url = "http://localhost:8182/" local_library_code = "S" +properties_of_interest = ["itemOf", "instanceOf", "agent", "subject"] def collect_entity(data, uri): return {} @@ -27,6 +28,62 @@ def collect_uris_with_data(entity): return uris +def collect_entity_with_uri(entity, uri): + if isinstance(entity, dict): + if "@id" in entity and len(entity) > 1 and entity["@id"] == uri: + return entity + for key in entity: + result = collect_entity_with_uri(entity[key], uri) + if result: + return result + elif isinstance(entity, list): + for item in entity: + result = collect_entity_with_uri(item, uri) + if result: + return result + return None + + +def download_entity(url): + req = Request(url) + req.add_header('accept', 'application/json+ld') + return json.load(urlopen(req))["@graph"][1] + + +def embed_links(entity, connection): + if isinstance(entity, dict) and len(entity) == 1 and "@id" in entity: + # Find data for this ID, somewhere and replace the stuff in our entity + # First see if we have more of this data somewhere already + cursor = connection.cursor() + rows = cursor.execute("SELECT entities.entity FROM uris JOIN entities on uris.entity_id = entities.id WHERE uris.uri = ?", (entity["@id"],)) + row = rows.fetchone() + if row: + whole_other_record = json.loads(row[0]) + sought_entity = collect_entity_with_uri(whole_other_record, entity["@id"]) + entity.clear() + entity.update(sought_entity) + # Otherwise do a GET on the ID try to get some data from there + else: + sought_entity = download_entity(entity["@id"]) + if sought_entity: + entity.clear() + entity.update(sought_entity) + elif isinstance(entity, list): + for item in entity: + embed_links(item, connection) + + +def embellish(entity, connection): + if isinstance(entity, dict): + for key in entity: + if key in properties_of_interest: + embed_links(entity[key], connection) + embellish(entity[key], connection) + elif isinstance(entity, list): + for item in entity: + embellish(item, connection) + + def ingest_entity(entity, connection): uris = collect_uris_with_data(entity) cursor = connection.cursor() @@ -130,6 +187,7 @@ def handle_activity(connection, activity): entity_id = row[0] entity_data = json.loads(row[1]) entity_data = replace_entity(entity_data, updated_data) + embellish(entity_data, connection) cursor.execute("UPDATE entities SET entity = ? WHERE id = ?", (json.dumps(entity_data),entity_id)) connection.commit() From 9b08017f06d2608d3e50171db8c7c1e441f5060d Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Tue, 19 Nov 2024 13:06:48 +0100 Subject: [PATCH 18/23] somewhat working emm client. --- emm/exampleclient/client.py | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/emm/exampleclient/client.py b/emm/exampleclient/client.py index 8c971676e5..f8fed570b1 100644 --- a/emm/exampleclient/client.py +++ b/emm/exampleclient/client.py @@ -84,19 +84,10 @@ def embellish(entity, connection): embellish(item, connection) -def ingest_entity(entity, connection): - uris = collect_uris_with_data(entity) +def update_uris_table(entity, entity_id, connection): cursor = connection.cursor() - entity_id = cursor.execute( - """ - INSERT INTO - entities(entity) - VALUES - (?) - """, - (json.dumps(entity),) - ).lastrowid - + uris = collect_uris_with_data(entity) + cursor.execute("DELETE FROM uris WHERE id = ?", (entity_id,)) for uri in uris: cursor.execute( """ @@ -110,6 +101,21 @@ def ingest_entity(entity, connection): connection.commit() +def ingest_entity(entity, connection): + cursor = connection.cursor() + entity_id = cursor.execute( + """ + INSERT INTO + entities(entity) + VALUES + (?) + """, + (json.dumps(entity),) + ).lastrowid + connection.commit() + update_uris_table(entity, entity_id, connection) + + def load_dump(connection): next_url = f"{libris_emm_base_url}?dump=itemAndInstance:{local_library_code}&offset=0" dumpCreationTime = None @@ -190,6 +196,7 @@ def handle_activity(connection, activity): embellish(entity_data, connection) cursor.execute("UPDATE entities SET entity = ? WHERE id = ?", (json.dumps(entity_data),entity_id)) connection.commit() + update_uris_table(entity_data, entity_id, connection) From c1d9b18115b6ed9bd6509496333586301677856c Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Tue, 19 Nov 2024 14:59:53 +0100 Subject: [PATCH 19/23] Do a little cleanup of the EMM client. --- emm/exampleclient/client.py | 135 +++++++++++++++++++++++++++++++----- 1 file changed, 119 insertions(+), 16 deletions(-) diff --git a/emm/exampleclient/client.py b/emm/exampleclient/client.py index f8fed570b1..316cc45012 100644 --- a/emm/exampleclient/client.py +++ b/emm/exampleclient/client.py @@ -1,20 +1,85 @@ -# Standard library only, 3rd party dependencies are not permitted. -import sqlite3 -import urllib.request -from urllib.request import Request, urlopen -import json -import os - -# Configuration section: +# +# This example Libris EMM client is intended to illustrate how one can use +# the EMM protocol to obtain a cache of some subset of libris data and keep +# that cache up to date over time. By default this example client caches +# data for one selected libris library (see the configuration section below). +# But the selected subset might as well be "all agents", "all everything" +# or something else. +# +# Beware: This client has not been written with defensive coding in mind, +# and is not intended for production use. +# +# Key to using the EMM protocol (at least as seen here), is to define what +# you consider to be the root set of entities to cache. For us this is +# going to mean: +# Entities (I) of type 'Item' with a library code (I->helBy->@id) identifying +# our selected library. +# +# Knowing this, we will consume a dump of this set of entities (the initial +# load). +# We are then going to consume/monitor the stream of all changes in Libris, +# and for every activity in that stream, we will consider the following: +# 1. Is this a creation or a deletion of an entity that fits our root entity +# definition? +# If so, we must add or remove said entity from our collection. +# 2. Is this an update of an entity that we have (be it as a root, or +# embedded within another entity, or somewhere else)? +# If so, we must update that entity wherever we keep it. +# +# This is all we need to do to keap our cache up date in perpetuity. +# +# For this example client we are going to keep knowledge graphs in an SQLITE +# table called 'entities'. These knowledge graphs will always consist of an +# entity of type 'Item' with various other entities that are being linked to +# embedded inside that root entity. +# +# We will also have a table called 'uris' which will provide a mapping from +# entity IDs (URIs) to all of our knowledge graphs that reference that URI +# (which means they also keep an embedded copy of the referenced entity). +# +# Finally we will have a single-value table called 'state', which will hold +# the latest update time from the server that we have already consumed. +# + + +# +# Configuration section +# + +# This parameter tells the client where on disc, to store its cache. cache_location = "./libris-cache.sqlite3" + +# This parameter tells the client which EMM server to use. libris_emm_base_url = "http://localhost:8182/" + +# This parameter tells the client which library we are downloading data for. local_library_code = "S" + +# This parameter tells the client which properties we would like to follow +# links for and download additional data to keep with our entities. properties_of_interest = ["itemOf", "instanceOf", "agent", "subject"] -def collect_entity(data, uri): - return {} +# +# Code section +# +# This client was written for python 3.13.0, but it is believed that many +# python 3 implementations can run this code. +# +# It uses only the Python standard library. There are no 3rd party +# dependencies. +# +import sqlite3 +import urllib.request +from urllib.request import Request, urlopen +import json +import os + +# +# For a given entity, return all URIs within that entity (embedded or not) +# for which we have any data. +# def collect_uris_with_data(entity): uris = [] if isinstance(entity, dict): @@ -28,6 +93,10 @@ def collect_uris_with_data(entity): return uris +# +# For a given entity and uri, return any embedded entities identified by the +# uri. +# def collect_entity_with_uri(entity, uri): if isinstance(entity, dict): if "@id" in entity and len(entity) > 1 and entity["@id"] == uri: @@ -44,12 +113,20 @@ def collect_entity_with_uri(entity, uri): return None +# +# Attempt to download and return data for a given URI. +# def download_entity(url): req = Request(url) req.add_header('accept', 'application/json+ld') return json.load(urlopen(req))["@graph"][1] +# +# For a given entity, iff that entity is a link (a lone @id-property and +# nothing else) attempt to find data on that URI, and embed that data into +# this entity. +# def embed_links(entity, connection): if isinstance(entity, dict) and len(entity) == 1 and "@id" in entity: # Find data for this ID, somewhere and replace the stuff in our entity @@ -73,6 +150,11 @@ def embed_links(entity, connection): embed_links(item, connection) +# +# For a given entity, check if we can get more data on the things it links +# to (that we are interested in - see the configuration section), and embed +# copies of that data. +# def embellish(entity, connection): if isinstance(entity, dict): for key in entity: @@ -84,6 +166,10 @@ def embellish(entity, connection): embellish(item, connection) +# +# Given an entity (that's been changed), update the mappings of URIs to +# this entity. +# def update_uris_table(entity, entity_id, connection): cursor = connection.cursor() uris = collect_uris_with_data(entity) @@ -101,6 +187,9 @@ def update_uris_table(entity, entity_id, connection): connection.commit() +# +# Ingest a root entity from a dump. +# def ingest_entity(entity, connection): cursor = connection.cursor() entity_id = cursor.execute( @@ -116,6 +205,9 @@ def ingest_entity(entity, connection): update_uris_table(entity, entity_id, connection) +# +# Load an itial dump of the configured data set. +# def load_dump(connection): next_url = f"{libris_emm_base_url}?dump=itemAndInstance:{local_library_code}&offset=0" dumpCreationTime = None @@ -146,12 +238,10 @@ def load_dump(connection): connection.commit() -def download_entity(url): - req = Request(url) - req.add_header('accept', 'application/json+ld') - return json.load(urlopen(req))["@graph"][1] - - +# +# Given a root entity 'r', and a replacement/update of some embedded entity 'u', +# update the data of 'u' wherever it is copied/embedded into 'r'. +# def replace_subentity(node, replacement_entity): uri_to_replace = replacement_entity["@id"] if isinstance(node, dict): @@ -166,6 +256,10 @@ def replace_subentity(node, replacement_entity): replace_subentity(node[i], replacement_entity) +# +# Given a root entity 'r', and a replacement/update of some embedded entity 'u', +# update the data of 'u' wherever it is copied/embedded into 'r'. +# def replace_entity(node, replacement_entity): uri_to_replace = replacement_entity["@id"] if isinstance(node, dict) and "@id" in node and node["@id"] == uri_to_replace: # Root-replacement @@ -175,6 +269,10 @@ def replace_entity(node, replacement_entity): return node +# +# This is the heart of the update mechanism. Consume an update activity, and take +# whatever action is necesseray to keep our cache up to date. +# def handle_activity(connection, activity): cursor = connection.cursor() if activity["type"] == "create": @@ -200,6 +298,9 @@ def handle_activity(connection, activity): +# +# Scan for new updates to consume. +# def update(connection): cursor = connection.cursor() with urllib.request.urlopen(libris_emm_base_url) as response: @@ -259,6 +360,8 @@ def main(): changes_consumed_until TEXT ); """) + cursor.execute("CREATE INDEX idx_uris_uri ON uris(uri);") + cursor.execute("CREATE INDEX idx_uris_entity_id ON uris(entity_id);") connection.commit() load_dump(connection) From cda8b8f679334dfe48f5d5af9fc93d3a856da87f Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Wed, 20 Nov 2024 10:46:49 +0100 Subject: [PATCH 20/23] Fix EMM client embellish. --- emm/exampleclient/client.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/emm/exampleclient/client.py b/emm/exampleclient/client.py index 316cc45012..4264013a1c 100644 --- a/emm/exampleclient/client.py +++ b/emm/exampleclient/client.py @@ -117,9 +117,13 @@ def collect_entity_with_uri(entity, uri): # Attempt to download and return data for a given URI. # def download_entity(url): + #print(f"Requesting external resource: {url}") req = Request(url) req.add_header('accept', 'application/json+ld') - return json.load(urlopen(req))["@graph"][1] + try: + return json.load(urlopen(req))["@graph"][1] + except: + return None # @@ -160,7 +164,8 @@ def embellish(entity, connection): for key in entity: if key in properties_of_interest: embed_links(entity[key], connection) - embellish(entity[key], connection) + if not key == "@reverse": + embellish(entity[key], connection) elif isinstance(entity, list): for item in entity: embellish(item, connection) @@ -224,6 +229,7 @@ def load_dump(connection): next_url = None if "entities" in data: for entity in data["entities"]: + embellish(entity, connection) ingest_entity(entity, connection) cursor = connection.cursor() cursor.execute( From b087eb90df5b55a112fcf2b0933135a043ee386d Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Wed, 20 Nov 2024 14:48:50 +0100 Subject: [PATCH 21/23] Add create/delete handling in EMM client. --- emm/exampleclient/client.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/emm/exampleclient/client.py b/emm/exampleclient/client.py index 4264013a1c..a6e012ae7d 100644 --- a/emm/exampleclient/client.py +++ b/emm/exampleclient/client.py @@ -275,18 +275,38 @@ def replace_entity(node, replacement_entity): return node +# +# Return True if 'entity' matches our root-entity criteria, otherwise False +# +def is_root_entity(entity): + if entity == None: + return False + if "@type" in entity and entity["@type"] == 'Item': + if "heldBy" in entity and isinstance(entity["heldBy"], dict) and "@id" in entity["heldBy"]: + return entity["heldBy"]["@id"].endswith(local_library_code) + return False + + # # This is the heart of the update mechanism. Consume an update activity, and take # whatever action is necesseray to keep our cache up to date. # def handle_activity(connection, activity): cursor = connection.cursor() + if activity["type"] == "create": - pass # TODO + created_data = download_entity(activity["object"]["id"]) + if is_root_entity(created_data): + embellish(created_data, connection) + ingest_entity(created_data, connection) + elif activity["type"] == "delete": - pass # TODO + # This is a "cascading delete", but doing so is safe as long as libris + # maintains its principle that linked records cannot be deleted. + cursor.execute("DELETE FROM uris WHERE uri = ?", (activity["object"]["id"],)) + connection.commit() + elif activity["type"] == "update": - # Find all of our records that depend on this URI rows = cursor.execute("SELECT entities.id, entities.entity FROM uris JOIN entities on uris.entity_id = entities.id WHERE uris.uri = ?", (activity["object"]["id"],)) @@ -357,7 +377,7 @@ def main(): uri TEXT, entity_id INTEGER, UNIQUE(uri, entity_id) ON CONFLICT IGNORE, - FOREIGN KEY (entity_id) REFERENCES entities(id) + FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE ); """) cursor.execute(""" From 3ea4abedf2a41ab70bfb23421538b6bf2155bac0 Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Wed, 20 Nov 2024 14:58:49 +0100 Subject: [PATCH 22/23] Remove temporary hack from EMM client. --- emm/exampleclient/client.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/emm/exampleclient/client.py b/emm/exampleclient/client.py index a6e012ae7d..cfffd5f3b3 100644 --- a/emm/exampleclient/client.py +++ b/emm/exampleclient/client.py @@ -348,9 +348,8 @@ def update(connection): next_url = None break handle_activity(connection, item) - # Disable for dev, temporary - #cursor.execute("UPDATE state SET changes_consumed_until = ?", (item["published"],)) - #connection.commit() + cursor.execute("UPDATE state SET changes_consumed_until = ?", (item["published"],)) + connection.commit() def main(): From 43c8e318ab723f60ac8aab1319994f7e60c4e106 Mon Sep 17 00:00:00 2001 From: Jannis Tsiroyannis Date: Thu, 21 Nov 2024 11:44:16 +0100 Subject: [PATCH 23/23] Minor EMM cleanups. --- emm/src/main/java/whelk/Dump.java | 28 ++++++++++++++++++++++--- emm/src/main/java/whelk/EmmServlet.java | 2 +- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/emm/src/main/java/whelk/Dump.java b/emm/src/main/java/whelk/Dump.java index 2da59db545..6e5bca6f2e 100644 --- a/emm/src/main/java/whelk/Dump.java +++ b/emm/src/main/java/whelk/Dump.java @@ -23,6 +23,27 @@ import static whelk.util.Jackson.mapper; public class Dump { + /* Here is how these dumps work: + * When someone asks for a dump of particular category, we check if we have one already. + * If we do not, we start generating one and delay our response a little bit, until enough + * of the new dump has been generated to fill the first page. + * + * The generated dumps on disks are text files containing only XL system IDs of the records + * that make up the dump. The actual data for each record is not stored in these files, but + * only added into the response stream at the last moment. + * + * The dumps on disk have a rather particular format. They consist of a number of lines, + * that are all exactly 17 bytes long, with one ID on each line. The number 17 is chosen + * merely because it is the smallest number that can hold any internal libris ID followed + * by a line break, but there is another reason why *all* lines must be exactly 17 bytes + * (even though some IDs are shorter). The point of this, is to be able to serve any part + * (page) of the response, without having the scan through the whole dump. If someone asks + * for a page starting att offset one million, we can simply start reading the dump file + * at byte offset 17 million, and not have to read through the millions of bytes before that. + * + * The last line of a finished dump holds a (also 17 bytes) marker, to separate the finished + * dump from one that is still being generated but just haven't gotten any further yet. + */ private static final Logger logger = LogManager.getLogger(Dump.class); private static final String DUMP_END_MARKER = "_DUMP_END_MARKER\n"; // Must be 17 bytes @@ -72,6 +93,7 @@ private static void sendDumpIndexResponse(String apiBaseUrl, HttpServletResponse categoriesList.add(typesCategory); responseObject.put("categories", categoriesList); + responseObject.put("warning", "This description of the available dump categories is a temporary one which will NOT look like this for long. Be careful not to rely on the format or even existence of this particular page."); String jsonResponse = mapper.writeValueAsString(responseObject); BufferedWriter writer = new BufferedWriter( res.getWriter() ); @@ -200,7 +222,7 @@ private static void invalidateIfOld(Path dumpFilePath) { Files.delete(dumpFilePath); } } catch (IOException e) { - // These exceptions are caught here due to the (theoretical) risk of files access race conditions. + // These exceptions are caught here due to the (theoretical) risk of file access race conditions. // For example, it could be that a dump is being read by one thread, while passing the too-old-threshold // and then while still being read, another thread sees the dump as too old and tries to delete it. // Just log this sort of thing and carry on. @@ -221,7 +243,7 @@ private static void generateDump(Whelk whelk, String dump, Path dumpFilePath) { } else if (dump.startsWith("itemAndInstance:")) { preparedStatement = getLibraryXDumpStatement(connection, dump.substring(16)); } else if (dump.startsWith("type:")) { - preparedStatement = getTypeXStatement(connection, whelk, dump.substring(5)); + preparedStatement = getTypeXDumpStatement(connection, whelk, dump.substring(5)); } if (preparedStatement == null) { @@ -278,7 +300,7 @@ private static PreparedStatement getLibraryXDumpStatement(Connection connection, return preparedStatement; } - private static PreparedStatement getTypeXStatement(Connection connection, Whelk whelk, String type) throws SQLException { + private static PreparedStatement getTypeXDumpStatement(Connection connection, Whelk whelk, String type) throws SQLException { Set types = whelk.getJsonld().getSubClasses(type); types.add(type); diff --git a/emm/src/main/java/whelk/EmmServlet.java b/emm/src/main/java/whelk/EmmServlet.java index 345b9c79c0..03a180b916 100644 --- a/emm/src/main/java/whelk/EmmServlet.java +++ b/emm/src/main/java/whelk/EmmServlet.java @@ -36,7 +36,7 @@ public void doGet(HttpServletRequest req, HttpServletResponse res) { String apiBaseUrl = req.getRequestURL().toString(); res.setCharacterEncoding("utf-8"); - res.setContentType("application/json"); + res.setContentType("application/activity+json"); if (dump != null) { Dump.sendDumpResponse(whelk, apiBaseUrl, req, res);