Skip to content

Commit

Permalink
Addresses #52 Profile full monthly generation
Browse files Browse the repository at this point in the history
- Experiment with removing one-by-one rights retrieval in favor of Sequel batch query for multiple htids.
  • Loading branch information
moseshll committed Jul 18, 2024
1 parent e13ac01 commit 7e3ef9d
Show file tree
Hide file tree
Showing 20 changed files with 162 additions and 412 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ gem "canister"
gem "date_named_file"
gem "dotenv"
gem "ettin"
gem "fast_jsonparser"
gem "httpclient"
gem "marc"
gem "milemarker"
Expand Down
2 changes: 2 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ GEM
faraday-net_http (>= 2.0, < 3.1)
ruby2_keywords (>= 0.0.4)
faraday-net_http (3.0.2)
fast_jsonparser (0.6.0)
ffi (1.16.3)
ffi-compiler (1.0.1)
ffi (>= 1.0.0)
Expand Down Expand Up @@ -238,6 +239,7 @@ DEPENDENCIES
ettin
factory_bot
faraday
fast_jsonparser
filter!
hathifiles_database!
httpclient
Expand Down
67 changes: 61 additions & 6 deletions jobs/generate_hathifile.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
require "settings"
require "services"
require "push_metrics"
require "sequel"
require "zephir_files"

class GenerateHathifile
attr_reader :tracker

def initialize
@tracker = PushMetrics.new(batch_size: 10_000, job_name: "generate_hathifiles")
@tracker = PushMetrics.new(batch_size: 10_000, job_name: "generate_hathifiles",
logger: Services["logger"])
end

def run
Expand Down Expand Up @@ -42,14 +44,24 @@ def run_file(zephir_file)

Tempfile.create("hathifiles") do |fout|
Services[:logger].info "writing to tempfile #{fout.path}"
fin.each_with_index do |line, i|
if i % 100_000 == 0
Services[:logger].info "writing line #{i}"
fin.each_slice(1000) do |lines|
recs = []
lines.each do |line|
recs += BibRecord.new(line).hathifile_records.to_a
end
BibRecord.new(line).hathifile_records.each do |rec|
htids = recs.map { |rec| rec[:htid] }
htids_to_rights = batch_extract_rights(htids)
last_bib_key = nil
recs.each do |rec|
rights = htids_to_rights[rec[:htid]] || {}
rec[:rights_timestamp] = rights[:rights_timestamp]
rec[:access_profile] = rights[:access_profile]
fout.puts record_from_bib_record(rec).join("\t")
if last_bib_key != rec[:ht_bib_key]
tracker.increment_and_log_batch_line
last_bib_key = rec[:ht_bib_key]
end
end
tracker.increment_and_log_batch_line
end
fout.flush
Services[:logger].info "Gzipping: #{fout.path}"
Expand Down Expand Up @@ -94,6 +106,49 @@ def record_from_bib_record(rec)
(rec[:author].join(", ") || "")
]
end

# Map htid -> rights for this batch
def batch_extract_rights(htids)
htids_to_rights = {}
Services.db[:rights_current]
.join(:access_profiles, id: Sequel[:rights_current][:access_profile])
.select(
Sequel.as(qualified_rights_current_htid, :htid),
Sequel.as(qualified_rights_current_time, :rights_timestamp),
Sequel.as(Sequel.qualify(:access_profiles, :name), :access_profile)
)
.where(qualified_rights_current_htid => htids)
.each do |record|
htids_to_rights[record[:htid]] = {
rights_timestamp: record[:rights_timestamp],
access_profile: record[:access_profile]
}
end
htids_to_rights
end

private

def qualified_rights_current_namespace
@qualified_rights_current_namespace ||= Sequel.qualify(:rights_current, :namespace)
end

def qualified_rights_current_id
@qualified_rights_current_id ||= Sequel.qualify(:rights_current, :id)
end

def qualified_rights_current_htid
@qualified_rights_current_htid = Sequel.function(
:concat,
qualified_rights_current_namespace,
".",
qualified_rights_current_id
)
end

def qualified_rights_current_time
@qualified_rights_current_time ||= Sequel.qualify(:rights_current, :time)
end
end

# Force logger to flush STDOUT on write so we can see what out Argo Workflows are doing.
Expand Down
3 changes: 2 additions & 1 deletion lib/bib_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "marc"
require "traject"
require "traject/macros/marc21_semantics"
require "fast_jsonparser"
require "json"
require "place_of_publication"
require "us_fed_doc"
Expand Down Expand Up @@ -39,7 +40,7 @@ def self.bib_fmt(rec_type:, bib_level:)
end

def initialize(marc_in_json)
@marc = MARC::Record.new_from_hash(JSON.parse(marc_in_json))
@marc = MARC::Record.new_from_hash FastJsonparser.parse(marc_in_json, symbolize_keys: false)
end

def ht_bib_key
Expand Down
5 changes: 3 additions & 2 deletions lib/collections_database/collections.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def initialize(collections = load_from_db)
end

def load_from_db
Services.collections_db[:ht_collections]
Services.db[:ht_collections]
.select(:collection,
:content_provider_cluster,
:responsible_entity,
Expand All @@ -50,7 +50,8 @@ def [](collection)
if @collections.key?(collection)
@collections[collection]
else
raise KeyError, "No collection data for collection:#{collection}"
nil
# raise KeyError, "No collection data for collection:#{collection}"
end
end

Expand Down
64 changes: 0 additions & 64 deletions lib/collections_database/collections_db.rb

This file was deleted.

63 changes: 63 additions & 0 deletions lib/database.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# frozen_string_literal: true

require "dotenv"
Dotenv.load(".env")

require "delegate"
require "mysql2"
require "sequel"

# Backend for connection to MySQL database for production information about
# rights
class Database < SimpleDelegator
attr_reader :rawdb
attr_accessor :connection_string

def initialize(connection_string = ENV["DB_CONNECTION_STRING"], **)
@rawdb = self.class.connection(connection_string, **)
super(@rawdb)
end

# #connection will take
# * a full connection string (passed here OR in the environment
# variable MYSQL_CONNECTION_STRING)
# * a set of named arguments, drawn from those passed in and the
# environment. Arguments are those supported by Sequel.
#
# Environment variables are mapped as follows:
#
# user: DB_USER
# password: DB_PASSWORD
# host: DB_HOST
# port: DB_PORT
# database: DB_DATABASE
# adapter: DB_ADAPTER
def self.connection(connection_string = ENV["DB_CONNECTION_STRING"],
**kwargs)

if connection_string.nil?
db_args = gather_db_args(kwargs).merge(
config_local_infile: true
)
Sequel.connect(**db_args, logger: Logger.new($stdout, level: Logger::WARN))
else
Sequel.connect(connection_string, logger: Logger.new($stdout, level: Logger::WARN))
end
end

class << self
private

def gather_db_args(args)
%i[user password host
port database adapter].each do |db_arg|
args[db_arg] ||= ENV["DB_#{db_arg.to_s.upcase}"]
end

args[:host] ||= "localhost"
args[:adapter] ||= :mysql2
args[:database] ||= "ht_rights"
args
end
end
end
2 changes: 1 addition & 1 deletion lib/hathifiles.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# frozen_string_literal: true

require_relative "bib_record"
require_relative "collections_database/collections_db"
require_relative "collections_database/collections"
require_relative "database"
require_relative "hathifile_history/records"
require_relative "hathifile_history/record"
require_relative "hathifile_history/htid_history_entry"
Expand Down
31 changes: 6 additions & 25 deletions lib/item_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ def update_date
@update_date ||= marc["d"]
end

def rights_timestamp
@rights_timestamp ||= rights_from_db.time
end

def rights_determination_note
@rights_determination_note ||= marc["t"]
end
Expand All @@ -77,27 +73,18 @@ def collection_code
end

def responsible_entity_code
@responsible_entity_code ||= Services.collections[collection_code].responsible_entity
@responsible_entity_code ||= Services.collections[collection_code]&.responsible_entity
end

def digitization_agent_code
@digitization_agent_code ||= marc["s"]
end

# From the rights database
# From the database
def content_provider_code
@content_provider_code ||= Services.collections[collection_code]&.content_provider_cluster || collection_code
end

# From the rights database
def access_profile_code
rights_from_db.access_profile
end

def access_profile
Services.access_profiles[access_profile_code]&.name || access_profile_code
end

def to_h
{htid: htid,
access: access,
Expand All @@ -106,20 +93,14 @@ def to_h
source: source,
source_bib_num: source_bib_num,
rights_reason_code: rights_reason_code,
rights_timestamp: rights_timestamp,
rights_date_used: rights_date_used,
collection_code: collection_code,
content_provider_code: content_provider_code,
responsible_entity_code: responsible_entity_code,
digitization_agent_code: digitization_agent_code,
access_profile_code: access_profile_code,
access_profile: access_profile,
update_date: update_date}
end

private

def rights_from_db
@rights_from_db ||= Services.rights.new(item_id: htid)
update_date: update_date,
# rights_timestamp and access_profile must be filled in the caller
rights_timestamp: nil,
access_profile: nil}
end
end
Loading

0 comments on commit 7e3ef9d

Please sign in to comment.