-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'feature/cxz' into develop
- Loading branch information
Showing
12 changed files
with
929 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
apply plugin: 'war' | ||
apply plugin: 'groovy' | ||
apply from: '../gretty.plugin' | ||
|
||
repositories { | ||
mavenCentral() | ||
} | ||
|
||
// Don't blame me for this TRAVESTY. It is a necessity because of the versioning of xml-apis (2.0.2 which gradle otherwise chooses is OLDER (and broken) despite the version.) | ||
configurations.all { | ||
resolutionStrategy { | ||
force "xml-apis:xml-apis:1.4.01" | ||
} | ||
} | ||
|
||
sourceSets { | ||
main { | ||
java { srcDirs = [] } | ||
groovy { srcDirs = ['src/main/java', 'src/main/groovy'] } | ||
} | ||
test { | ||
groovy { srcDir 'src/test/groovy/' } | ||
} | ||
} | ||
|
||
|
||
dependencies { | ||
// XL dependencies | ||
implementation(project(':whelk-core')) | ||
|
||
// Logging | ||
implementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: "${log4jVersion}" | ||
implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: "${log4jVersion}" | ||
|
||
// metrics | ||
implementation "io.prometheus:simpleclient:${prometheusVersion}" | ||
implementation "io.prometheus:simpleclient_servlet:${prometheusVersion}" | ||
|
||
//implementation 'org.apache.commons:commons-lang3:3.3.2' | ||
implementation "org.codehaus.groovy:groovy:${groovyVersion}" | ||
|
||
implementation group: 'org.simplejavamail', name: 'simple-java-mail', version: '8.2.0' | ||
|
||
// Cron | ||
implementation group: 'it.sauronsoftware.cron4j', name: 'cron4j', version: '2.2.5' | ||
|
||
} | ||
|
||
gretty { | ||
jvmArgs = ['-XX:+UseParallelGC'] | ||
systemProperties = ['xl.secret.properties': System.getProperty("xl.secret.properties")] | ||
httpPort = 8589 | ||
scanInterval = 0 | ||
afterEvaluate { | ||
appRunDebug { | ||
debugPort = 5006 | ||
debugSuspend = false | ||
} | ||
} | ||
} |
300 changes: 300 additions & 0 deletions
300
housekeeping/src/main/groovy/whelk/housekeeping/NotificationGenerator.groovy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,300 @@ | ||
package whelk.housekeeping | ||
|
||
import whelk.Document | ||
import whelk.IdGenerator | ||
import whelk.JsonLd | ||
import whelk.Whelk | ||
import groovy.transform.CompileStatic | ||
import groovy.util.logging.Log4j2 as Log | ||
|
||
import java.sql.Array | ||
import java.sql.Connection | ||
import java.sql.PreparedStatement | ||
import java.sql.ResultSet | ||
import java.sql.Timestamp | ||
import java.time.Instant | ||
import java.time.ZoneOffset | ||
import java.time.ZonedDateTime | ||
import java.time.format.DateTimeFormatter | ||
import java.time.temporal.ChronoUnit | ||
|
||
import static whelk.util.Jackson.mapper | ||
|
||
@CompileStatic | ||
@Log | ||
class NotificationGenerator extends HouseKeeper { | ||
|
||
public static final String STATE_KEY = "CXZ notification generator" | ||
private static final int MAX_OBSERVATIONS_PER_CHANGE = 20 | ||
private String status = "OK" | ||
private final Whelk whelk | ||
|
||
public NotificationGenerator(Whelk whelk) { | ||
this.whelk = whelk | ||
} | ||
|
||
public String getName() { | ||
return "Notifications generator" | ||
} | ||
|
||
public String getStatusDescription() { | ||
return status | ||
} | ||
|
||
public String getCronSchedule() { | ||
return "* * * * *" | ||
} | ||
|
||
public void trigger() { | ||
// Determine the time interval of changes for which to generate notifications. | ||
Instant now = Instant.now() | ||
Timestamp from = Timestamp.from(now) // First run? Default to now (=do nothing but set the timestamp for next run) | ||
Map state = whelk.getStorage().getState(STATE_KEY) | ||
if (state && state.lastGenerationTime) | ||
from = Timestamp.from( ZonedDateTime.parse( (String) state.lastGenerationTime, DateTimeFormatter.ISO_OFFSET_DATE_TIME).toInstant() ) | ||
Timestamp until = Timestamp.from(now) | ||
|
||
Connection connection | ||
PreparedStatement statement | ||
ResultSet resultSet | ||
|
||
Map<String, List<String>> changedInstanceIDsWithComments = [:] | ||
|
||
connection = whelk.getStorage().getOuterConnection() | ||
connection.setAutoCommit(false) | ||
try { | ||
// Fetch all changed IDs within the interval | ||
String sql = "SELECT id, ARRAY_AGG(data#>>'{@graph,0,hasChangeNote}') as changeNotes FROM lddb__versions WHERE collection IN ('bib', 'auth') AND ( modified > ? AND modified <= ? ) group by id;" | ||
connection.setAutoCommit(false) | ||
statement = connection.prepareStatement(sql) | ||
statement.setTimestamp(1, from) | ||
statement.setTimestamp(2, until) | ||
statement.setFetchSize(512) | ||
resultSet = statement.executeQuery() | ||
while (resultSet.next()) { | ||
String id = resultSet.getString("id") | ||
|
||
Array changeNotesArray = resultSet.getArray("changeNotes") | ||
List changeNotes = [] | ||
for (Object o : changeNotesArray.getArray()) { | ||
if (o != null) | ||
changeNotes.add(o) | ||
} | ||
|
||
List<Tuple2<String, String>> dependers = whelk.getStorage().followDependers(id, ["itemOf"]) | ||
dependers.add(new Tuple2(id, null)) // This ID too, not _only_ the dependers! | ||
dependers.each { | ||
String dependerID = it[0] | ||
String dependerMainEntityType = whelk.getStorage().getMainEntityTypeBySystemID(dependerID) | ||
if (whelk.getJsonld().isSubClassOf(dependerMainEntityType, "Instance")) { | ||
if (!changedInstanceIDsWithComments.containsKey(dependerID)) { | ||
changedInstanceIDsWithComments.put(dependerID, []) | ||
} | ||
changedInstanceIDsWithComments[dependerID].addAll(changeNotes) | ||
} | ||
} | ||
} | ||
|
||
for (String instanceId : changedInstanceIDsWithComments.keySet()) { | ||
List<Document> resultingChangeObservations = generateObservationsForAffectedInstance( | ||
instanceId, changedInstanceIDsWithComments[instanceId], from.toInstant(), until.toInstant()) | ||
|
||
if (resultingChangeObservations.size() <= MAX_OBSERVATIONS_PER_CHANGE) { | ||
for (Document observation : resultingChangeObservations) { | ||
if (!whelk.createDocument(observation, "NotificationGenerator", "SEK", "none", false)) { | ||
log.error("Failed to create ChangeObservation:\n${observation.getDataAsString()}") | ||
} | ||
} | ||
} | ||
} | ||
|
||
} catch (Throwable e) { | ||
status = "Failed with:\n" + e + "\nat:\n" + e.getStackTrace().toString() | ||
throw e | ||
} finally { | ||
connection.close() | ||
Map newState = new HashMap() | ||
newState.lastGenerationTime = until.toInstant().atOffset(ZoneOffset.UTC).toString() | ||
whelk.getStorage().putState(STATE_KEY, newState) | ||
} | ||
} | ||
|
||
private List<Document> generateObservationsForAffectedInstance(String instanceId, List changeNotes, Instant before, Instant after) { | ||
List<Document> generatedObservations = [] | ||
List<String> propertiesToEmbellish = [ | ||
"mainEntity", | ||
"instanceOf", | ||
"contribution", | ||
"hasTitle", | ||
"intendedAudience", | ||
"classification", | ||
"precededBy", | ||
"succeededBy", | ||
"contribution", | ||
"agent", | ||
] | ||
Document instanceAfterChange = whelk.getStorage().loadAsOf(instanceId, Timestamp.from(after)) | ||
historicEmbellish(instanceAfterChange, propertiesToEmbellish, after) | ||
Document instanceBeforeChange = whelk.getStorage().loadAsOf(instanceId, Timestamp.from(before)) | ||
if (instanceBeforeChange == null) { // This instance is new, and did not exist at 'before'. | ||
return generatedObservations | ||
} | ||
historicEmbellish(instanceBeforeChange, propertiesToEmbellish, before) | ||
|
||
Tuple comparisonResult = primaryContributionChanged(instanceBeforeChange, instanceAfterChange) | ||
if (comparisonResult[0]) { | ||
generatedObservations.add( | ||
makeChangeObservation( | ||
instanceId, changeNotes, "https://id.kb.se/changecategory/primarycontribution", | ||
(Map) comparisonResult[1], (Map) comparisonResult[2]) | ||
) | ||
} | ||
|
||
return generatedObservations | ||
} | ||
|
||
private static Tuple primaryContributionChanged(Document instanceBeforeChange, Document instanceAfterChange) { | ||
Object contributionsAfter = Document._get(["mainEntity", "instanceOf", "contribution"], instanceAfterChange.data) | ||
Object contributionsBefore = Document._get(["mainEntity", "instanceOf", "contribution"], instanceBeforeChange.data) | ||
if (contributionsBefore != null && contributionsAfter != null && contributionsBefore instanceof List && contributionsAfter instanceof List) { | ||
for (Object contrBefore : contributionsBefore) { | ||
for (Object contrAfter : contributionsAfter) { | ||
if (contrBefore["@type"].equals("PrimaryContribution") && contrAfter["@type"].equals("PrimaryContribution")) { | ||
if (contrBefore["agent"] != null && contrAfter["agent"] != null) { | ||
if ( | ||
contrBefore["agent"]["familyName"] != contrAfter["agent"]["familyName"] || | ||
contrBefore["agent"]["givenName"] != contrAfter["agent"]["givenName"] || | ||
contrBefore["agent"]["lifeSpan"] != contrAfter["agent"]["lifeSpan"] | ||
) | ||
return new Tuple(true, contrBefore["agent"], contrAfter["agent"]) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
return new Tuple(false, null, null) | ||
} | ||
|
||
private Document makeChangeObservation(String instanceId, List changeNotes, String categoryUri, Map oldValue, Map newValue) { | ||
String newId = IdGenerator.generate() | ||
String metadataUri = Document.BASE_URI.toString() + newId | ||
String mainEntityUri = metadataUri+"#it" | ||
|
||
// If the @id is left, the object is considered a link, and the actual data (which we want) is removed when storing this as a record. | ||
Map oldValueEmbedded = new HashMap(oldValue) | ||
oldValueEmbedded.remove("@id") | ||
Map newValueEmbedded = new HashMap(newValue) | ||
newValueEmbedded.remove("@id") | ||
|
||
Map observationData = [ "@graph":[ | ||
[ | ||
"@id" : metadataUri, | ||
"@type" : "Record", | ||
"mainEntity" : ["@id" : mainEntityUri], | ||
], | ||
[ | ||
"@id" : mainEntityUri, | ||
"@type" : "ChangeObservation", | ||
"concerning" : ["@id" : Document.BASE_URI.toString() + instanceId], | ||
"representationBefore" : oldValueEmbedded, | ||
"representationAfter" : newValueEmbedded, | ||
"category" : ["@id" : categoryUri], | ||
] | ||
]] | ||
|
||
List<String> comments = extractComments(changeNotes) | ||
if (comments) { | ||
Map mainEntity = (Map) observationData["@graph"][1] | ||
mainEntity.put("comment", comments) | ||
} | ||
|
||
return new Document(observationData) | ||
} | ||
|
||
private List<String> extractComments(List changeNotes) { | ||
List<String> comments = [] | ||
for (Object changeNote : changeNotes) { | ||
if ( ! (changeNote instanceof String) ) | ||
continue | ||
Map changeNoteMap = mapper.readValue( (String) changeNote, Map) | ||
comments.addAll( asList(changeNoteMap["comment"]) ) | ||
} | ||
return comments | ||
} | ||
|
||
private List asList(Object o) { | ||
if (o == null) | ||
return [] | ||
if (o instanceof List) | ||
return o | ||
return [o] | ||
} | ||
|
||
/** | ||
* This is a simplified/specialized from of 'embellish', for historic data and using only select properties. | ||
* The full general embellish code can not help us here, because it is based on the idea of cached cards, | ||
* which can (and must!) only cache the latest/current data for each card, which isn't what we need here | ||
* (we need to embellish older historic data). | ||
* | ||
* This function mutates docToEmbellish | ||
*/ | ||
private void historicEmbellish(Document docToEmbellish, List<String> properties, Instant asOf) { | ||
List graphListToEmbellish = (List) docToEmbellish.data["@graph"] | ||
Set alreadyLoadedURIs = [] | ||
|
||
for (int i = 0; i < properties.size(); ++i) { | ||
Set uris = findLinkedURIs(graphListToEmbellish, properties) | ||
uris.removeAll(alreadyLoadedURIs) | ||
if (uris.isEmpty()) | ||
break | ||
|
||
Map<String, Document> linkedDocumentsByUri = whelk.bulkLoad(uris, asOf) | ||
linkedDocumentsByUri.each { | ||
List linkedGraphList = (List) it.value.data["@graph"] | ||
if (linkedGraphList.size() > 1) | ||
graphListToEmbellish.add(linkedGraphList[1]) | ||
} | ||
alreadyLoadedURIs.addAll(uris) | ||
} | ||
|
||
docToEmbellish.data = JsonLd.frame(docToEmbellish.getCompleteId(), docToEmbellish.data) | ||
} | ||
|
||
private Set<String> findLinkedURIs(Object node, List<String> properties) { | ||
Set<String> uris = [] | ||
if (node instanceof List) { | ||
for (Object element : node) { | ||
uris.addAll(findLinkedURIs(element, properties)) | ||
} | ||
} | ||
else if (node instanceof Map) { | ||
for (String key : node.keySet()) { | ||
if (properties.contains(key)) { | ||
uris.addAll(getLinkIfAny(node[key])) | ||
} | ||
uris.addAll(findLinkedURIs(node[key], properties)) | ||
} | ||
} | ||
return uris | ||
} | ||
|
||
private List<String> getLinkIfAny(Object node) { | ||
List<String> uris = [] | ||
if (node instanceof Map) { | ||
if (node.containsKey("@id")) { | ||
uris.add((String) node["@id"]) | ||
} | ||
} | ||
if (node instanceof List) { | ||
for (Object element : node) { | ||
if (element instanceof Map) { | ||
if (element.containsKey("@id")) { | ||
uris.add((String) element["@id"]) | ||
} | ||
} | ||
} | ||
} | ||
return uris | ||
} | ||
|
||
} |
Oops, something went wrong.