diff --git a/emm/build.gradle b/emm/build.gradle new file mode 100644 index 0000000000..f670e896c2 --- /dev/null +++ b/emm/build.gradle @@ -0,0 +1,73 @@ +apply plugin: 'java-library' +apply plugin: 'com.github.johnrengelman.shadow' + +def mainClassName = "whelk.EmmServer" + +repositories { + mavenCentral() +} + +// Don't blame me for this TRAVESTY. It is a necessity because of the versioning of xml-apis (2.0.2 which gradle otherwise chooses is OLDER (and broken) despite the version.) +configurations.all { + resolutionStrategy { + force "xml-apis:xml-apis:1.4.01" + } +} + +dependencies { + // XL dependencies + implementation(project(':whelk-core')) + implementation(project(':server-common')) + + // Servlet + api "jakarta.servlet:jakarta.servlet-api:${servletApiVersion}" + implementation "org.eclipse.jetty:jetty-server:${jettyVersion}" + implementation "org.eclipse.jetty.ee8:jetty-ee8-servlet:${jettyVersion}" + + // Logging + implementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: "${log4jVersion}" + implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: "${log4jVersion}" + + // metrics + implementation "io.prometheus:simpleclient:${prometheusVersion}" + implementation "io.prometheus:simpleclient_servlet:${prometheusVersion}" +} + +shadowJar { + archiveClassifier = null // removes `-all` in the filename of the created .jar +} + +jar { + dependsOn ':server-common:jar' + dependsOn(shadowJar) + + manifest { + attributes "Main-Class": mainClassName, + // log4j uses multi-release to ship different stack walking implementations for different java + // versions. Since we repackage everything as a fat jar, that jar must also be multi-release. + "Multi-Release": true + } + + archiveClassifier = "nonfat" + enabled = false +} + +task(appRun, dependsOn: "classes", type: JavaExec) { + classpath = sourceSets.test.runtimeClasspath + mainClass = mainClassName + minHeapSize = "1g" + maxHeapSize = "4g" + systemProperties( + 'xl.secret.properties': System.getProperty("xl.secret.properties"), + 'xl.logRoot': System.getProperty("xl.logRoot", "./logs"), + 'xl.http.port': System.getProperty("xl.http.port", "8182") + ) + args(System.getProperty("args", "").split() as String[]) + + debugOptions { + enabled = true + port = 6007 + server = true + suspend = false + } +} diff --git a/emm/exampleclient/client.py b/emm/exampleclient/client.py new file mode 100644 index 0000000000..cfffd5f3b3 --- /dev/null +++ b/emm/exampleclient/client.py @@ -0,0 +1,398 @@ +# +# This example Libris EMM client is intended to illustrate how one can use +# the EMM protocol to obtain a cache of some subset of libris data and keep +# that cache up to date over time. By default this example client caches +# data for one selected libris library (see the configuration section below). +# But the selected subset might as well be "all agents", "all everything" +# or something else. +# +# Beware: This client has not been written with defensive coding in mind, +# and is not intended for production use. +# +# Key to using the EMM protocol (at least as seen here), is to define what +# you consider to be the root set of entities to cache. For us this is +# going to mean: +# Entities (I) of type 'Item' with a library code (I->helBy->@id) identifying +# our selected library. +# +# Knowing this, we will consume a dump of this set of entities (the initial +# load). +# We are then going to consume/monitor the stream of all changes in Libris, +# and for every activity in that stream, we will consider the following: +# 1. Is this a creation or a deletion of an entity that fits our root entity +# definition? +# If so, we must add or remove said entity from our collection. +# 2. Is this an update of an entity that we have (be it as a root, or +# embedded within another entity, or somewhere else)? +# If so, we must update that entity wherever we keep it. +# +# This is all we need to do to keap our cache up date in perpetuity. +# +# For this example client we are going to keep knowledge graphs in an SQLITE +# table called 'entities'. These knowledge graphs will always consist of an +# entity of type 'Item' with various other entities that are being linked to +# embedded inside that root entity. +# +# We will also have a table called 'uris' which will provide a mapping from +# entity IDs (URIs) to all of our knowledge graphs that reference that URI +# (which means they also keep an embedded copy of the referenced entity). +# +# Finally we will have a single-value table called 'state', which will hold +# the latest update time from the server that we have already consumed. +# + + +# +# Configuration section +# + +# This parameter tells the client where on disc, to store its cache. +cache_location = "./libris-cache.sqlite3" + +# This parameter tells the client which EMM server to use. +libris_emm_base_url = "http://localhost:8182/" + +# This parameter tells the client which library we are downloading data for. +local_library_code = "S" + +# This parameter tells the client which properties we would like to follow +# links for and download additional data to keep with our entities. +properties_of_interest = ["itemOf", "instanceOf", "agent", "subject"] + + +# +# Code section +# +# This client was written for python 3.13.0, but it is believed that many +# python 3 implementations can run this code. +# +# It uses only the Python standard library. There are no 3rd party +# dependencies. +# +import sqlite3 +import urllib.request +from urllib.request import Request, urlopen +import json +import os + + +# +# For a given entity, return all URIs within that entity (embedded or not) +# for which we have any data. +# +def collect_uris_with_data(entity): + uris = [] + if isinstance(entity, dict): + for key in entity: + if key == "@id" and len(entity) > 1: + uris.append(entity[key]) + uris = uris + collect_uris_with_data(entity[key]) + elif isinstance(entity, list): + for item in entity: + uris = uris + collect_uris_with_data(item) + return uris + + +# +# For a given entity and uri, return any embedded entities identified by the +# uri. +# +def collect_entity_with_uri(entity, uri): + if isinstance(entity, dict): + if "@id" in entity and len(entity) > 1 and entity["@id"] == uri: + return entity + for key in entity: + result = collect_entity_with_uri(entity[key], uri) + if result: + return result + elif isinstance(entity, list): + for item in entity: + result = collect_entity_with_uri(item, uri) + if result: + return result + return None + + +# +# Attempt to download and return data for a given URI. +# +def download_entity(url): + #print(f"Requesting external resource: {url}") + req = Request(url) + req.add_header('accept', 'application/json+ld') + try: + return json.load(urlopen(req))["@graph"][1] + except: + return None + + +# +# For a given entity, iff that entity is a link (a lone @id-property and +# nothing else) attempt to find data on that URI, and embed that data into +# this entity. +# +def embed_links(entity, connection): + if isinstance(entity, dict) and len(entity) == 1 and "@id" in entity: + # Find data for this ID, somewhere and replace the stuff in our entity + # First see if we have more of this data somewhere already + cursor = connection.cursor() + rows = cursor.execute("SELECT entities.entity FROM uris JOIN entities on uris.entity_id = entities.id WHERE uris.uri = ?", (entity["@id"],)) + row = rows.fetchone() + if row: + whole_other_record = json.loads(row[0]) + sought_entity = collect_entity_with_uri(whole_other_record, entity["@id"]) + entity.clear() + entity.update(sought_entity) + # Otherwise do a GET on the ID try to get some data from there + else: + sought_entity = download_entity(entity["@id"]) + if sought_entity: + entity.clear() + entity.update(sought_entity) + elif isinstance(entity, list): + for item in entity: + embed_links(item, connection) + + +# +# For a given entity, check if we can get more data on the things it links +# to (that we are interested in - see the configuration section), and embed +# copies of that data. +# +def embellish(entity, connection): + if isinstance(entity, dict): + for key in entity: + if key in properties_of_interest: + embed_links(entity[key], connection) + if not key == "@reverse": + embellish(entity[key], connection) + elif isinstance(entity, list): + for item in entity: + embellish(item, connection) + + +# +# Given an entity (that's been changed), update the mappings of URIs to +# this entity. +# +def update_uris_table(entity, entity_id, connection): + cursor = connection.cursor() + uris = collect_uris_with_data(entity) + cursor.execute("DELETE FROM uris WHERE id = ?", (entity_id,)) + for uri in uris: + cursor.execute( + """ + INSERT INTO + uris(entity_id, uri) + VALUES + (?, ?) + """, + (entity_id, uri,) + ) + connection.commit() + + +# +# Ingest a root entity from a dump. +# +def ingest_entity(entity, connection): + cursor = connection.cursor() + entity_id = cursor.execute( + """ + INSERT INTO + entities(entity) + VALUES + (?) + """, + (json.dumps(entity),) + ).lastrowid + connection.commit() + update_uris_table(entity, entity_id, connection) + + +# +# Load an itial dump of the configured data set. +# +def load_dump(connection): + next_url = f"{libris_emm_base_url}?dump=itemAndInstance:{local_library_code}&offset=0" + dumpCreationTime = None + while next_url: + with urllib.request.urlopen(next_url) as response: + data = json.load(response) + dumpCreationTimeOnPage = data["creationTime"] + if (dumpCreationTime and dumpCreationTime != dumpCreationTimeOnPage): + print(" DUMP INVALIDATED WHILE DOWNLOADING, TODO: DEAL WITH THIS ") + dumpCreationTime = dumpCreationTimeOnPage + if "next" in data: + next_url = data["next"] + else: + next_url = None + if "entities" in data: + for entity in data["entities"]: + embellish(entity, connection) + ingest_entity(entity, connection) + cursor = connection.cursor() + cursor.execute( + """ + INSERT INTO + state(changes_consumed_until) + VALUES + (?) + """, + (dumpCreationTime,) + ) + connection.commit() + + +# +# Given a root entity 'r', and a replacement/update of some embedded entity 'u', +# update the data of 'u' wherever it is copied/embedded into 'r'. +# +def replace_subentity(node, replacement_entity): + uri_to_replace = replacement_entity["@id"] + if isinstance(node, dict): + for key in node.keys: + if isinstance(node[key], dict) and "@id" in node[key] and node[key]["@id"] == uri_to_replace: + node[key] = replacement_entity + replace_subentity(node[key], replacement_entity) + elif isinstance(node, list): + for i in range(len(node)): + if isinstance(node[i], dict) and "@id" in node[i] and node[i]["@id"] == uri_to_replace: + node[i] = replacement_entity + replace_subentity(node[i], replacement_entity) + + +# +# Given a root entity 'r', and a replacement/update of some embedded entity 'u', +# update the data of 'u' wherever it is copied/embedded into 'r'. +# +def replace_entity(node, replacement_entity): + uri_to_replace = replacement_entity["@id"] + if isinstance(node, dict) and "@id" in node and node["@id"] == uri_to_replace: # Root-replacement + return replacement_entity + else: # Embedded replacement + replace_subentity(node, replacement_entity) + return node + + +# +# Return True if 'entity' matches our root-entity criteria, otherwise False +# +def is_root_entity(entity): + if entity == None: + return False + if "@type" in entity and entity["@type"] == 'Item': + if "heldBy" in entity and isinstance(entity["heldBy"], dict) and "@id" in entity["heldBy"]: + return entity["heldBy"]["@id"].endswith(local_library_code) + return False + + +# +# This is the heart of the update mechanism. Consume an update activity, and take +# whatever action is necesseray to keep our cache up to date. +# +def handle_activity(connection, activity): + cursor = connection.cursor() + + if activity["type"] == "create": + created_data = download_entity(activity["object"]["id"]) + if is_root_entity(created_data): + embellish(created_data, connection) + ingest_entity(created_data, connection) + + elif activity["type"] == "delete": + # This is a "cascading delete", but doing so is safe as long as libris + # maintains its principle that linked records cannot be deleted. + cursor.execute("DELETE FROM uris WHERE uri = ?", (activity["object"]["id"],)) + connection.commit() + + elif activity["type"] == "update": + # Find all of our records that depend on this URI + rows = cursor.execute("SELECT entities.id, entities.entity FROM uris JOIN entities on uris.entity_id = entities.id WHERE uris.uri = ?", (activity["object"]["id"],)) + + updated_data = None + for row in rows: + if not updated_data: + updated_data = download_entity(activity["object"]["id"]) + entity_id = row[0] + entity_data = json.loads(row[1]) + entity_data = replace_entity(entity_data, updated_data) + embellish(entity_data, connection) + cursor.execute("UPDATE entities SET entity = ? WHERE id = ?", (json.dumps(entity_data),entity_id)) + connection.commit() + update_uris_table(entity_data, entity_id, connection) + + + +# +# Scan for new updates to consume. +# +def update(connection): + cursor = connection.cursor() + with urllib.request.urlopen(libris_emm_base_url) as response: + data = json.load(response) + next_url = data["first"]["id"] + + while next_url: + with urllib.request.urlopen(next_url) as response: + data = json.load(response) + if "next" in data: + next_url = data["next"] + else: + next_url = None + for item in data["orderedItems"]: + result = cursor.execute("SELECT julianday(changes_consumed_until) - julianday(?) FROM state", (item["published"],)) + diff = result.fetchone()[0] + if (float(diff) >= 0.0): + print(f"{item["published"]} is before our last taken update, stop here.") + next_url = None + break + handle_activity(connection, item) + cursor.execute("UPDATE state SET changes_consumed_until = ?", (item["published"],)) + connection.commit() + + +def main(): + cache_initialized = False + if os.path.exists(cache_location): + cache_initialized = True + connection = sqlite3.connect(cache_location) + cursor = connection.cursor() + cursor.execute("PRAGMA journal_mode=WAL") + cursor.execute("PRAGMA synchronous=OFF") + cursor.execute("PRAGMA foreign_keys=ON") + + # If the cache database does not exist, set up a new one + if not cache_initialized: + cursor.execute(""" +CREATE TABLE entities ( + id INTEGER PRIMARY KEY, + entity TEXT +); +""") + cursor.execute(""" +CREATE TABLE uris ( + id INTEGER PRIMARY KEY, + uri TEXT, + entity_id INTEGER, + UNIQUE(uri, entity_id) ON CONFLICT IGNORE, + FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE +); +""") + cursor.execute(""" +CREATE TABLE state ( + id INTEGER PRIMARY KEY, + changes_consumed_until TEXT +); +""") + cursor.execute("CREATE INDEX idx_uris_uri ON uris(uri);") + cursor.execute("CREATE INDEX idx_uris_entity_id ON uris(entity_id);") + connection.commit() + load_dump(connection) + + update(connection) + + + +if __name__ == "__main__": + main() diff --git a/emm/src/main/java/whelk/Dump.java b/emm/src/main/java/whelk/Dump.java new file mode 100644 index 0000000000..6e5bca6f2e --- /dev/null +++ b/emm/src/main/java/whelk/Dump.java @@ -0,0 +1,319 @@ +package whelk; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.*; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.attribute.BasicFileAttributes; +import java.sql.*; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; +import java.util.*; + +import static whelk.util.Jackson.mapper; + +public class Dump { + /* Here is how these dumps work: + * When someone asks for a dump of particular category, we check if we have one already. + * If we do not, we start generating one and delay our response a little bit, until enough + * of the new dump has been generated to fill the first page. + * + * The generated dumps on disks are text files containing only XL system IDs of the records + * that make up the dump. The actual data for each record is not stored in these files, but + * only added into the response stream at the last moment. + * + * The dumps on disk have a rather particular format. They consist of a number of lines, + * that are all exactly 17 bytes long, with one ID on each line. The number 17 is chosen + * merely because it is the smallest number that can hold any internal libris ID followed + * by a line break, but there is another reason why *all* lines must be exactly 17 bytes + * (even though some IDs are shorter). The point of this, is to be able to serve any part + * (page) of the response, without having the scan through the whole dump. If someone asks + * for a page starting att offset one million, we can simply start reading the dump file + * at byte offset 17 million, and not have to read through the millions of bytes before that. + * + * The last line of a finished dump holds a (also 17 bytes) marker, to separate the finished + * dump from one that is still being generated but just haven't gotten any further yet. + */ + private static final Logger logger = LogManager.getLogger(Dump.class); + private static final String DUMP_END_MARKER = "_DUMP_END_MARKER\n"; // Must be 17 bytes + + public static void sendDumpResponse(Whelk whelk, String apiBaseUrl, HttpServletRequest req, HttpServletResponse res) throws IOException, SQLException { + String dump = req.getParameter("dump"); + + if (dump.equals("index")) { + sendDumpIndexResponse(apiBaseUrl, res); + return; + } + + String offset = req.getParameter("offset"); + long offsetNumeric = Long.parseLong(offset); + + String tmpDir = System.getProperty("java.io.tmpdir"); + Path dumpsPath = Paths.get(tmpDir, "dumps"); + Files.createDirectories(dumpsPath); + Path dumpFilePath = dumpsPath.resolve(dump+".dump"); + + invalidateIfOld(dumpFilePath); + if (!Files.exists(dumpFilePath)) + generateDump(whelk, dump, dumpFilePath); + sendDumpPageResponse(whelk, apiBaseUrl, dump, dumpFilePath, offsetNumeric, res); + } + + private static void sendDumpIndexResponse(String apiBaseUrl, HttpServletResponse res) throws IOException { + HashMap responseObject = new HashMap(); + + ArrayList categoriesList = new ArrayList<>(); + + HashMap allCategory = new HashMap(); + allCategory.put("url", apiBaseUrl+"?dump=all&offset=0"); + allCategory.put("description", "This category represents the whole collection, without reservations."); + categoriesList.add(allCategory); + + HashMap libraryCategory = new HashMap(); + libraryCategory.put("url", apiBaseUrl+"?dump=itemAndInstance:X&offset=0"); + libraryCategory.put("description", "These categories represent the Items and Instances held by a particular library. " + + "The relevant library-code (sigel) for which you want data must replace the X in the category URL."); + categoriesList.add(libraryCategory); + + HashMap typesCategory = new HashMap(); + typesCategory.put("url", apiBaseUrl+"?dump=type:X&offset=0"); + typesCategory.put("description", "These categories represent the set of entities of a certain type, including subtypes. " + + "For example the type Agent would include both Persons and Organizations etc. The X in the URL must be replaced " + + "with the type you want."); + categoriesList.add(typesCategory); + + responseObject.put("categories", categoriesList); + responseObject.put("warning", "This description of the available dump categories is a temporary one which will NOT look like this for long. Be careful not to rely on the format or even existence of this particular page."); + + String jsonResponse = mapper.writeValueAsString(responseObject); + BufferedWriter writer = new BufferedWriter( res.getWriter() ); + writer.write(jsonResponse); + writer.close(); + } + + private static void sendDumpPageResponse(Whelk whelk, String apiBaseUrl, String dump, Path dumpFilePath, long offsetLines, HttpServletResponse res) throws IOException { + ArrayList recordIdsOnPage = new ArrayList<>(EmmChangeSet.TARGET_HITS_PER_PAGE); + Long totalEntityCount = null; + + try { + // Has the dump not begun being written yet ? + while (!Files.exists(dumpFilePath)) { + Thread.sleep(10); + } + + try (RandomAccessFile file = new RandomAccessFile(dumpFilePath.toFile(), "r")) { + byte[] lineBuffer = new byte[17]; + + // Is the dump generation finished ? + boolean dumpFinished = false; + if (file.length() >= 17) { + file.seek(file.length() - 17); + dumpFinished = ( 17 == file.read(lineBuffer) && new String(lineBuffer, StandardCharsets.UTF_8).equals(DUMP_END_MARKER) ); + } + + // Is there not enough data for a full page yet ? + long offsetBytes = 17 * offsetLines; + while (!dumpFinished && file.length() < offsetBytes + (17 * (long)EmmChangeSet.TARGET_HITS_PER_PAGE)) { + Thread.sleep(10); + + if (file.length() >= 17) { + file.seek(file.length() - 17); + dumpFinished = ( 17 == file.read(lineBuffer) && new String(lineBuffer, StandardCharsets.UTF_8).equals(DUMP_END_MARKER) ); + } + } + + if (dumpFinished) { + totalEntityCount = file.length() / 17 - 1; + } + + // We're ok to send a full page, or the end of the dump (which may be a partial page). + file.seek(offsetBytes); + int recordsToSend = Integer.min(EmmChangeSet.TARGET_HITS_PER_PAGE, (int)((file.length() - offsetBytes) / 17) - 1); + for (int i = 0; i < recordsToSend; ++i) { + if (17 == file.read(lineBuffer)) { + recordIdsOnPage.add(new String(lineBuffer, StandardCharsets.UTF_8).trim()); + } else { + logger.error("Suspected corrupt dump (non-17-byte line detected): " + dumpFilePath); + res.sendError(500); + return; + } + } + + } + } catch (IOException | InterruptedException e) { + logger.error("Failed reading dumpfile: " + dumpFilePath, e); + } + + BasicFileAttributes attributes = Files.readAttributes(dumpFilePath, BasicFileAttributes.class); + Instant dumpCreationTime = attributes.creationTime().toInstant(); + sendFormattedResponse(whelk, apiBaseUrl, dump, recordIdsOnPage, res, offsetLines + EmmChangeSet.TARGET_HITS_PER_PAGE, totalEntityCount, dumpCreationTime); + } + + private static void sendFormattedResponse(Whelk whelk, String apiBaseUrl, String dump, ArrayList recordIdsOnPage, HttpServletResponse res, long nextLineOffset, Long totalEntityCount, Instant dumpCreationTime) throws IOException{ + HashMap responseObject = new HashMap(); + + responseObject.put("creationTime", ZonedDateTime.ofInstant(dumpCreationTime, ZoneOffset.UTC).toString()); + if (totalEntityCount == null) + responseObject.put("status", "generating"); + else { + responseObject.put("status", "done"); + responseObject.put("totalEntityCount", totalEntityCount); + } + + if (totalEntityCount == null || nextLineOffset < totalEntityCount) { + responseObject.put("next", apiBaseUrl+"?dump="+dump+"&offset="+nextLineOffset); + } + + ArrayList entitesList = new ArrayList<>(EmmChangeSet.TARGET_HITS_PER_PAGE); + responseObject.put("entities", entitesList); + Map idsAndRecords = whelk.bulkLoad(recordIdsOnPage); + for (Document doc : idsAndRecords.values()) { + + // Here is a bit of SPECIALIZED treatment only for the itemAndInstance:categories. These should + // include not only the Item (which is the root node for this category), but also the linked Instance. + // Without this, a client must individually GET every single Instance in their dataset, which scales poorly. + if (dump.startsWith("itemAndInstance:")) { + String itemOf = doc.getHoldingFor(); + if (itemOf == null) { + logger.warn("Holding of nothing? " + doc.getId()); + continue; + } + Document instance = new Document( whelk.loadData(itemOf) ); + if (instance == null) { + logger.warn("Bad instance? " + itemOf); + continue; + } + ArrayList itemOfPath = new ArrayList(); + itemOfPath.add("@graph"); itemOfPath.add(1); itemOfPath.add("itemOf"); // unggh.. + doc._set(itemOfPath, instance.getThing(), doc.data); + entitesList.add(doc.getThing()); + } + + // For normal categories + else { + entitesList.add(doc.getThing()); + } + + } + + String jsonResponse = mapper.writeValueAsString(responseObject); + BufferedWriter writer = new BufferedWriter( res.getWriter() ); + writer.write(jsonResponse); + writer.close(); + } + + private static void invalidateIfOld(Path dumpFilePath) { + try { + if (!Files.exists(dumpFilePath)) + return; + + BasicFileAttributes attributes = Files.readAttributes(dumpFilePath, BasicFileAttributes.class); + if (attributes.creationTime().toInstant().isBefore(Instant.now().minus(5, ChronoUnit.DAYS))) { + Files.delete(dumpFilePath); + } + } catch (IOException e) { + // These exceptions are caught here due to the (theoretical) risk of file access race conditions. + // For example, it could be that a dump is being read by one thread, while passing the too-old-threshold + // and then while still being read, another thread sees the dump as too old and tries to delete it. + // Just log this sort of thing and carry on. + logger.info("Failed to invalidate (delete) EMM dump: " + dumpFilePath, e); + } + } + + private static void generateDump(Whelk whelk, String dump, Path dumpFilePath) { + new Thread(() -> { + try (BufferedWriter dumpFileWriter = new BufferedWriter(new FileWriter(dumpFilePath.toFile())); + Connection connection = whelk.getStorage().getOuterConnection()) { + connection.setAutoCommit(false); + + PreparedStatement preparedStatement = null; + + if (dump.equals("all")) { + preparedStatement = getAllDumpStatement(connection); + } else if (dump.startsWith("itemAndInstance:")) { + preparedStatement = getLibraryXDumpStatement(connection, dump.substring(16)); + } else if (dump.startsWith("type:")) { + preparedStatement = getTypeXDumpStatement(connection, whelk, dump.substring(5)); + } + + if (preparedStatement == null) { + logger.info("Dump request for unknown category: " + dump); + return; + } + + try (PreparedStatement p = preparedStatement) { + p.setFetchSize(EmmChangeSet.TARGET_HITS_PER_PAGE); + try (ResultSet resultSet = p.executeQuery()) { + while (resultSet.next()) { + + // Each line must be exactly 17 bytes long, including the (unix) line break. + String id = String.format("%-16s\n", resultSet.getString(1)); + dumpFileWriter.write(id); + } + } + } + + dumpFileWriter.write( String.format("%-17s", DUMP_END_MARKER) ); + } catch (IOException | SQLException e) { + logger.error("Failed dump generation", e); + } + }).start(); + } + + private static PreparedStatement getAllDumpStatement(Connection connection) throws SQLException { + String sql = " SELECT " + + " id" + + " FROM" + + " lddb"; + PreparedStatement preparedStatement = connection.prepareStatement(sql); + return preparedStatement; + } + + private static PreparedStatement getLibraryXDumpStatement(Connection connection, String library) throws SQLException { + String sql = " SELECT " + + " id" + + " FROM" + + " lddb" + + " WHERE" + + " collection = 'hold' AND" + + " (data#>>'{@graph,1,heldBy,@id}' = ? OR data#>>'{@graph,1,heldBy,@id}' = ?)"; + PreparedStatement preparedStatement = connection.prepareStatement(sql); + + preparedStatement.setString(1, Document.getBASE_URI().resolve("/library/"+library).toString()); + + // This is uncomfortable. Library URIs are "https://libris.kb.se/library/.." regardless of environment base-URI. + // To protect ourselves from the fact that this could change, check both these URIs and environment-specific ones. + URI defaultLibrisURI = null; + try { defaultLibrisURI = new URI("https://libris.kb.se"); } catch (URISyntaxException e) { /*ignore*/ } + preparedStatement.setString(2, defaultLibrisURI.resolve("/library/"+library).toString()); + + return preparedStatement; + } + + private static PreparedStatement getTypeXDumpStatement(Connection connection, Whelk whelk, String type) throws SQLException { + Set types = whelk.getJsonld().getSubClasses(type); + types.add(type); + + String sql = " SELECT " + + " id" + + " FROM" + + " lddb" + + " WHERE" + + " data#>>'{@graph,1,@type}' = ANY( ? )"; + + PreparedStatement preparedStatement = connection.prepareStatement(sql); + preparedStatement.setArray(1, connection.createArrayOf("TEXT", types.toArray() )); + + return preparedStatement; + } +} diff --git a/emm/src/main/java/whelk/EmmActivity.java b/emm/src/main/java/whelk/EmmActivity.java new file mode 100644 index 0000000000..eaac1005cc --- /dev/null +++ b/emm/src/main/java/whelk/EmmActivity.java @@ -0,0 +1,32 @@ +package whelk; + +import java.sql.Timestamp; + +public class EmmActivity { + public enum Type { + CREATE, + UPDATE, + DELETE, + } + + public final String uri; + public final String entityType; + public final Type activityType; + public final Timestamp modificationTime; + + public EmmActivity(String uri, String type, Timestamp creationTime, Timestamp modificationTime, boolean deleted) { + this.uri = uri; + this.entityType = type; + this.modificationTime = modificationTime; + if (deleted) + this.activityType = Type.DELETE; + else if (creationTime.equals(modificationTime)) + this.activityType = Type.CREATE; + else + this.activityType = Type.UPDATE; + } + + public String toString() { + return activityType.toString() + " of " + uri + " (" + entityType + ") at " + modificationTime; + } +} diff --git a/emm/src/main/java/whelk/EmmChangeSet.java b/emm/src/main/java/whelk/EmmChangeSet.java new file mode 100644 index 0000000000..e9d2824808 --- /dev/null +++ b/emm/src/main/java/whelk/EmmChangeSet.java @@ -0,0 +1,143 @@ +package whelk; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import javax.servlet.http.HttpServletResponse; +import java.io.BufferedWriter; +import java.io.IOException; +import java.sql.*; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static whelk.util.Jackson.mapper; + +public class EmmChangeSet { + public static final int TARGET_HITS_PER_PAGE = 100; + private static final Logger logger = LogManager.getLogger(EmmChangeSet.class); + + static void sendChangeSet(Whelk whelk, HttpServletResponse res, String until, String apiBaseUrl) throws IOException { + + List activitiesOnPage = new ArrayList<>(TARGET_HITS_PER_PAGE+5); + Timestamp nextTimeStamp = getPage(whelk, until, activitiesOnPage); + if (nextTimeStamp == null) { + res.sendError(500); + return; + } + + // THIS SHIT is so painful in Java :( + HashMap responseObject = new HashMap(); + ArrayList contexts = new ArrayList(); + contexts.add("https://www.w3.org/ns/activitystreams"); + contexts.add("https://emm-spec.org/1.0/context.json"); + responseObject.put("@context", contexts); + responseObject.put("type", "OrderedCollectionPage"); + responseObject.put("id", apiBaseUrl+"?until="+until); + HashMap partOf = new HashMap(); + partOf.put("type", "OrderedCollection"); + partOf.put("id", apiBaseUrl); + responseObject.put("partOf", partOf); + responseObject.put("next", apiBaseUrl+"?until="+nextTimeStamp.getTime()); + List orderedItems = new ArrayList(); + responseObject.put("orderedItems", orderedItems); + + for (EmmActivity activityInList : activitiesOnPage) { + HashMap activityInStream = new HashMap(); + activityInStream.put("type", switch (activityInList.activityType) { + case CREATE -> "create"; + case UPDATE -> "update"; + case DELETE -> "delete"; + }); + activityInStream.put("published", ZonedDateTime.ofInstant(activityInList.modificationTime.toInstant(), ZoneOffset.UTC).toString()); + HashMap activityObject = new HashMap(); + activityInStream.put("object", activityObject); + activityObject.put("id", activityInList.uri); + activityObject.put("type", activityInList.entityType); + activityObject.put("updated", ZonedDateTime.ofInstant(activityInList.modificationTime.toInstant(), ZoneOffset.UTC).toString()); + orderedItems.add(activityInStream); + } + + String jsonResponse = mapper.writeValueAsString(responseObject); + BufferedWriter writer = new BufferedWriter( res.getWriter() ); + writer.write(jsonResponse); + writer.close(); + } + + /** + * Get a page's worth of items. The results will be added to the 'result'-list. The return value will be "the next timestamp" + * to start getting the next page at, or null on failure. + */ + private static Timestamp getPage(Whelk whelk, String until, List result) { + + // Internally 'until' is "milliseconds since epoch". + long untilNumerical = Long.parseLong(until); + Timestamp untilTimeStamp = new Timestamp(untilNumerical); + + Timestamp earliestSeenTimeStamp = new Timestamp(System.currentTimeMillis()); // This is (will be set to) how far back in time this page stretches + + try (Connection connection = whelk.getStorage().getOuterConnection()) { + + // Get a page of items + { + String sql = "SELECT" + + " data#>>'{@graph,1,@id}'," + + " GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}'))," + + " deleted," + + " created," + + " data#>>'{@graph,1,@type}'" + + " FROM" + + " lddb__versions" + + " WHERE GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}')) <= ? " + + " ORDER BY GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}')) DESC LIMIT ? "; + PreparedStatement preparedStatement = connection.prepareStatement(sql); + preparedStatement.setTimestamp(1, untilTimeStamp); + preparedStatement.setInt(2, TARGET_HITS_PER_PAGE); + preparedStatement.setFetchSize(TARGET_HITS_PER_PAGE + 1); + try (ResultSet resultSet = preparedStatement.executeQuery()) { + while (resultSet.next()) { + String uri = resultSet.getString(1); + Timestamp modificationTime = resultSet.getTimestamp(2); + boolean deleted = resultSet.getBoolean(3); + Timestamp creationTime = resultSet.getTimestamp(4); + String type = resultSet.getString(5); + result.add(new EmmActivity(uri, type, creationTime, modificationTime, deleted)); + if (modificationTime.before(earliestSeenTimeStamp)) + earliestSeenTimeStamp = modificationTime; + } + } + } + + // Get any extra records that share an exact modification time with the earliest seen time on this page + { + String sql = "SELECT" + + " data#>>'{@graph,1,@id}'," + + " GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}'))," + + " deleted," + + " created," + + " data#>>'{@graph,1,@type}'" + + " FROM lddb__versions WHERE GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}')) = ?"; + PreparedStatement preparedStatement = connection.prepareStatement(sql); + preparedStatement.setTimestamp(1, untilTimeStamp); + try (ResultSet resultSet = preparedStatement.executeQuery()) { + while (resultSet.next()) { + String uri = resultSet.getString(1); + Timestamp modificationTime = resultSet.getTimestamp(2); + boolean deleted = resultSet.getBoolean(3); + Timestamp creationTime = resultSet.getTimestamp(4); + String type = resultSet.getString(5); + result.add(new EmmActivity(uri, type, creationTime, modificationTime, deleted)); + } + } + } + } catch (SQLException se) { + logger.error(se); + se.printStackTrace(); + return null; + } + + return earliestSeenTimeStamp; + } +} diff --git a/emm/src/main/java/whelk/EmmServer.java b/emm/src/main/java/whelk/EmmServer.java new file mode 100644 index 0000000000..28f30cf53b --- /dev/null +++ b/emm/src/main/java/whelk/EmmServer.java @@ -0,0 +1,24 @@ +package whelk; + +import io.prometheus.client.exporter.MetricsServlet; +import org.eclipse.jetty.ee8.servlet.ServletContextHandler; +import org.eclipse.jetty.server.Server; + +public class EmmServer extends XlServer { + + @Override + protected void configureHandlers(Server server) { + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + + server.setHandler(context); + + context.addServlet(MetricsServlet.class, "/metrics"); + context.addServlet(EmmServlet.class, "/"); + serveStaticContent(context); + } + + public static void main(String[] args) throws Exception { + new EmmServer().run(); + } +} diff --git a/emm/src/main/java/whelk/EmmServlet.java b/emm/src/main/java/whelk/EmmServlet.java new file mode 100644 index 0000000000..03a180b916 --- /dev/null +++ b/emm/src/main/java/whelk/EmmServlet.java @@ -0,0 +1,77 @@ +package whelk; + +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.BufferedWriter; +import java.io.IOException; +import java.sql.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + +import static whelk.util.Jackson.mapper; + +public class EmmServlet extends HttpServlet { + private final Logger logger = LogManager.getLogger(this.getClass()); + private final Whelk whelk; + public EmmServlet() { + whelk = Whelk.createLoadedCoreWhelk(); + } + + public void init() { + } + + public void destroy() { + } + + public void doGet(HttpServletRequest req, HttpServletResponse res) { + try { + String dump = req.getParameter("dump"); + String until = req.getParameter("until"); + String apiBaseUrl = req.getRequestURL().toString(); + + res.setCharacterEncoding("utf-8"); + res.setContentType("application/activity+json"); + + if (dump != null) { + Dump.sendDumpResponse(whelk, apiBaseUrl, req, res); + return; + } + + // Send an Entry-Point reply + if (until == null) { + HashMap responseObject = new HashMap(); + ArrayList contexts = new ArrayList(); + contexts.add("https://www.w3.org/ns/activitystreams"); + contexts.add("https://emm-spec.org/1.0/context.json"); + responseObject.put("@context", contexts); + responseObject.put("type", "OrderedCollection"); + responseObject.put("id", apiBaseUrl); + responseObject.put("url", apiBaseUrl+"?dump=index"); + HashMap first = new HashMap(); + first.put("type", "OrderedCollectionPage"); + first.put("id", apiBaseUrl+"?until="+System.currentTimeMillis()); + responseObject.put("first", first); + + String jsonResponse = mapper.writeValueAsString(responseObject); + BufferedWriter writer = new BufferedWriter( res.getWriter() ); + writer.write(jsonResponse); + writer.close(); + return; + } + + // Send ChangeSet reply + EmmChangeSet.sendChangeSet(whelk, res, until, apiBaseUrl); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + +} diff --git a/emm/src/main/resources/log4j2.xml b/emm/src/main/resources/log4j2.xml new file mode 100644 index 0000000000..57aa8d42aa --- /dev/null +++ b/emm/src/main/resources/log4j2.xml @@ -0,0 +1,28 @@ + + + + ./logs + + + + + + + + + + + + + + + + + + + + + + + diff --git a/librisxl-tools/postgresql/migrations/00000023-index-generationDate-versions.plsql b/librisxl-tools/postgresql/migrations/00000023-index-generationDate-versions.plsql new file mode 100644 index 0000000000..dab2a2d38d --- /dev/null +++ b/librisxl-tools/postgresql/migrations/00000023-index-generationDate-versions.plsql @@ -0,0 +1,32 @@ +BEGIN; + +DO $$DECLARE + -- THESE MUST BE CHANGED WHEN YOU COPY THE SCRIPT! + + -- The version you expect the database to have _before_ the migration + old_version numeric := 22; + -- The version the database should have _after_ the migration + new_version numeric := 23; + + -- hands off + existing_version numeric; + +BEGIN + + -- Check existing version + SELECT version from lddb__schema INTO existing_version; + IF ( existing_version <> old_version) THEN + RAISE EXCEPTION 'ASKED TO MIGRATE FROM INCORRECT EXISTING VERSION!'; + ROLLBACK; + END IF; + UPDATE lddb__schema SET version = new_version; + + -- ACTUAL SCHEMA CHANGES HERE: + + CREATE INDEX IF NOT EXISTS idx_lddb__versions_generation_date ON lddb__versions (GREATEST(modified, totstz(data#>>'{@graph,0,generationDate}'))); + + CREATE INDEX IF NOT EXISTS idx_lddb_thing_type ON lddb ((data#>>'{@graph,1,@type}')); + +END$$; + +COMMIT; diff --git a/settings.gradle b/settings.gradle index f155603454..5b6e56f8d5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -14,4 +14,5 @@ include( 'trld-java', 'whelk-core', 'whelktool', + 'emm', )