diff --git a/.gitignore b/.gitignore index bb0196c..1cd1499 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,5 @@ /log/* /zold_app/* -/data/* /tmp/* /import/* .irb_history @@ -8,7 +7,15 @@ .bundle/* ./local/* +/data/* +!/data/gingr +/data/gingr/ready/* +/data/gingr/processed/* +/data/gingr/failed/* + /solr/geodata-test/data /solr/geodata-test/data /solr/geodata/data /solr/geodata/data + +!**/.keep diff --git a/Gemfile b/Gemfile index 459eba8..8a024e2 100644 --- a/Gemfile +++ b/Gemfile @@ -13,3 +13,5 @@ gem 'uri' group :test do gem 'rspec', '~> 3.12' end + +gem "listen", "~> 3.8" diff --git a/Gemfile.lock b/Gemfile.lock index 6481549..d523591 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -54,6 +54,7 @@ GEM faraday-net_http_persistent (2.1.0) faraday (~> 2.5) net-http-persistent (~> 4.0) + ffi (1.16.3) geo_combine (0.8.0) activesupport faraday-net_http_persistent (~> 2.0) @@ -76,6 +77,9 @@ GEM reline (>= 0.3.8) json-schema (4.1.1) addressable (>= 2.8) + listen (3.8.0) + rb-fsevent (~> 0.10, >= 0.10.3) + rb-inotify (~> 0.9, >= 0.9.10) lograge (0.14.0) actionpack (>= 4) activesupport (>= 4) @@ -88,6 +92,8 @@ GEM mutex_m (0.1.2) net-http-persistent (4.0.2) connection_pool (~> 2.2) + nokogiri (1.15.4-aarch64-linux) + racc (~> 1.4) nokogiri (1.15.4-x86_64-darwin) racc (~> 1.4) nokogiri (1.15.4-x86_64-linux) @@ -123,6 +129,9 @@ GEM thor (~> 1.0, >= 1.2.2) zeitwerk (~> 2.6) rake (13.1.0) + rb-fsevent (0.11.2) + rb-inotify (0.10.1) + ffi (~> 1.0) rchardet (1.8.0) rdoc (6.5.0) psych (>= 4.0.0) @@ -160,6 +169,7 @@ GEM zeitwerk (2.6.12) PLATFORMS + aarch64-linux x86_64-darwin-22 x86_64-linux @@ -168,6 +178,7 @@ DEPENDENCIES faraday-net_http_persistent (~> 2.0) geo_combine geoserver-publish (~> 0.7.0) + listen (~> 3.8) rsolr rspec (~> 3.12) rubyzip diff --git a/README.md b/README.md index 0574642..cc4b415 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Command-line tool for ingesting GeoData (Solr, GeoServer, and file servers). -## Quick Start +## Quick start The app is Dockerized with an image designed to just run the `gingr` executable: @@ -23,5 +23,21 @@ docker compose exec app bash # You can also run commands from your host, either via `run` or `exec`: docker compose exec app gingr help docker compose run --rm app gingr help -docker compose run --rm app gingr all /opt/app/spec/fixture/zipfile/test_public.zip +docker compose run --rm app gingr all spec/fixture/zipfile/vector.zip +``` + +## Watching a directory for new files + +Gingr's `watch` command monitors a directory for new `.zip` files and automatically ingest them. It accepts the path to a gingr root directory (which should contain `ready`, `processed`, and `failed` subdirectories) and the same arguments as `gingr all`: + +``` +gingr watch /path/to/directory [args to gingr all] +``` + +Gingr will error if the directory doesn't exist or if it doesn't contain the expected subdirectories: + +``` +./ready +./processed +./failed ``` diff --git a/data/.keep b/data/.keep new file mode 100644 index 0000000..e69de29 diff --git a/data/gingr/.DS_Store b/data/gingr/.DS_Store new file mode 100644 index 0000000..5008ddf Binary files /dev/null and b/data/gingr/.DS_Store differ diff --git a/data/gingr/.keep b/data/gingr/.keep new file mode 100644 index 0000000..e69de29 diff --git a/data/gingr/failed/.keep b/data/gingr/failed/.keep new file mode 100644 index 0000000..e69de29 diff --git a/data/gingr/processed/.keep b/data/gingr/processed/.keep new file mode 100644 index 0000000..e69de29 diff --git a/data/gingr/ready/.keep b/data/gingr/ready/.keep new file mode 100644 index 0000000..e69de29 diff --git a/docker-compose.yml b/docker-compose.yml index c326906..2cebbd7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,21 +9,6 @@ services: - tail - -f - /dev/null - develop: - watch: - - path: . - target: /opt/app - ignore: - - .ruby-version - - Dockerfile - - Gemfile* - action: sync - - path: .ruby-version - action: rebuild - - path: Dockerfile - action: rebuild - - path: Gemfile* - action: rebuild environment: DOWNLOAD_URL: https://spatial.lib.berkeley.edu/ GEOSERVER_ROOT: data/geoserver/ @@ -32,6 +17,8 @@ services: SOLR_URL: http://solr:8983/solr/geodata-test SPATIAL_ROOT: data/spatial/ restart: no + volumes: + - ./:/opt/app:delegated solr: command: > diff --git a/lib/gingr.rb b/lib/gingr.rb index bb26ca8..7ec4660 100644 --- a/lib/gingr.rb +++ b/lib/gingr.rb @@ -1,14 +1,12 @@ # frozen_string_literal: true -# monkey-patch first -require_relative 'monkeypatch/geoserver/publish' - require_relative 'gingr/cli' require_relative 'gingr/config' require_relative 'gingr/data_handler' require_relative 'gingr/geoserver_publisher' require_relative 'gingr/import_util' require_relative 'gingr/solr_indexer' +require_relative 'gingr/watcher' module Gingr # diff --git a/lib/gingr/cli.rb b/lib/gingr/cli.rb index 5ca6260..cd5de05 100644 --- a/lib/gingr/cli.rb +++ b/lib/gingr/cli.rb @@ -2,6 +2,7 @@ require 'thor' require_relative 'config' require_relative 'import_util' +require_relative 'watcher' module Gingr class Cli < Thor @@ -10,6 +11,23 @@ class Cli < Thor Thor.check_unknown_options! + desc 'watch', 'Watches a Gingr directory for files ready to be processed' + long_desc <<-TEXT, wrapping: false + EXAMPLES + gingr watch data/gingr --solr-url=https://foo:bar@solr.lib.berkeley.edu:8983/solr/geodata ... + TEXT + option :solr_url + option :update_reference_field, type: :boolean, default: false + option :spatial_root + option :geoserver_root + option :geoserver_url + option :geoserver_secure_url + def watch(root_dir = nil) + root_dir ||= ENV['GINGR_WATCH_DIRECTORY'] || '/opt/app/data/gingr' + watcher = Gingr::Watcher.new(root_dir, *options) + watcher.start! + end + desc 'solr', 'Giving a directory path, it will index all json files from the directory/sub-directory to solr' long_desc <<-TEXT, wrapping: false diff --git a/lib/gingr/geoserver_publisher.rb b/lib/gingr/geoserver_publisher.rb index c4cc199..1d2da1d 100644 --- a/lib/gingr/geoserver_publisher.rb +++ b/lib/gingr/geoserver_publisher.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true -require_relative '../monkeypatch/geoserver/publish' +require 'geoserver/publish' require 'uri' require_relative 'config' @@ -17,8 +17,8 @@ def update(filename) name = File.basename(filename, '.*') filepath = "file:///srv/geofiles/berkeley-#{name}/#{filename}" File.extname(filename).downcase == '.shp' ? publish_shapefile(filepath, name) : pulsih_geotiff(filepath, name) - rescue Geoserver::Publish::Error - Config.logger.error("Publish Geoserver error: #{filename}") + rescue Geoserver::Publish::Error => e + Config.logger.error("Publish Geoserver error: #{filename} -- #{e.inspect}") raise end diff --git a/lib/gingr/import_util.rb b/lib/gingr/import_util.rb index 93aa1bb..11b7f94 100644 --- a/lib/gingr/import_util.rb +++ b/lib/gingr/import_util.rb @@ -1,4 +1,5 @@ # frozen_string_literal: true +require 'find' require 'uri' require_relative 'config' require_relative 'geoserver_publisher' @@ -33,7 +34,7 @@ def get_reference_urls(options) hash = {} Config.reference_urls.each_key do |key| - url = options[key] || ENV.fetch(key.upcase) + url = options[key] || ENV.fetch(key.to_s.upcase) hash[key] = reference_url(url) if url end hash diff --git a/lib/gingr/watcher.rb b/lib/gingr/watcher.rb new file mode 100644 index 0000000..071b54a --- /dev/null +++ b/lib/gingr/watcher.rb @@ -0,0 +1,122 @@ +# frozen_string_literal: true +require 'listen' +require 'open3' +require_relative 'config' + +module Gingr + class Watcher + include Config + + # Only watch for files in WATCHED_DIRECTORIES[:READY] that match this pattern + WATCH_FILTER = Regexp.compile(/\.zip$/) + + WATCHED_DIRECTORIES = { + # Directory into which new files are dropped when ready for processing. + # .watch! monitors this for new .zip's. + READY: 'ready'.freeze, + # Directory into which successfully processed files are moved post-processing. + PROCESSED: 'processed'.freeze, + # Directory into which failed files are moved post-processing. + FAILED: 'failed'.freeze, + } + + attr_reader :options + attr_reader :root_dir + + def initialize(root_dir, *options) + # This is the Gingr root directory, not the directory to be watched. + # Watcher watches the ./ready directory under this one. + @root_dir = root_dir + + # Options are passed as-is to `gingr all`, so they should match the + # arguments you'd otherwise pass to that command + @options = options + + validate_directories! + end + + def start! + start + sleep + end + + def start + Config.logger.info("Monitoring directory for new zipfiles: #{ready_dir}") + listener.start unless listener.processing? + end + + def listener + @listener ||= begin + Listen.to(ready_dir, only: WATCH_FILTER, force_polling: true) do |_, added, _| + added.each do |zipfile| + Config.logger.info("Processing zipfile: #{zipfile}") + + begin + exec_gingr_all!(zipfile) + rescue => e + Config.logger.error("Error processing #{zipfile}, moving to #{failed_dir}: #{e.inspect}") + end + end + end + end + end + + def exec_gingr_all!(zipfile) + begin + command = ['gingr', 'all', zipfile, *options] + Config.logger.debug("Running command: #{command}") + + stdout, stderr, status = Open3.capture3(*command) + if !status.success? + raise SubprocessError, "Call to `gingr all` failed: #{status}" + end + + Config.logger.debug("Processed #{zipfile}, moving to #{processed_dir}") + FileUtils.mv(zipfile, processed_dir) + rescue => e + FileUtils.mv(zipfile, failed_dir) + File.write(error_log_for(zipfile), collate_logs(stdout, stderr)) + raise + end + end + + def ready_dir + @ready_dir ||= File.join(@root_dir, WATCHED_DIRECTORIES[:READY]) + end + + def processed_dir + @processed_dir ||= File.join(@root_dir, WATCHED_DIRECTORIES[:PROCESSED]) + end + + def failed_dir + @failed_dir ||= File.join(@root_dir, WATCHED_DIRECTORIES[:FAILED]) + end + + private + + def collate_logs(stdout, stderr) + "#{stdout}\n#{stderr}\n" + end + + def error_log_for(zipfile) + File.join(failed_dir, "#{File.basename(zipfile, '.*')}.log") + end + + def validate_directories! + WATCHED_DIRECTORIES.values + .collect { |dirname| public_send("#{dirname}_dir") } + .each &method(:validate_directory!) + end + + def validate_directory!(directory) + if !File.writable?(directory) + raise DirectoryError, "Directory is not writable: #{directory}" + end + end + + # Typed errors to help with tests / figuring out what exactly failed + class WatcherError < StandardError; end + class DirectoryError < WatcherError; end + class SubprocessError < WatcherError; end + end +end diff --git a/lib/monkeypatch/geoserver/publish.rb b/lib/monkeypatch/geoserver/publish.rb deleted file mode 100644 index 4c7169d..0000000 --- a/lib/monkeypatch/geoserver/publish.rb +++ /dev/null @@ -1,35 +0,0 @@ -# frozen_string_literal: true -require 'geoserver/publish' - -# Monkey-patch geoserver-publish gem to prefix everything with berkeley_ -# @note Is this really necessary? -module Geoserver - module Publish - def self.delete_geotiff(workspace_name:, id:, connection: nil) - coverage_store_name = "berkeley_#{id}" - CoverageStore.new(connection).delete(workspace_name:, coverage_store_name:) - end - - def self.delete_shapefile(workspace_name:, id:, connection: nil) - data_store_name = "berkeley_#{id}" - DataStore.new(connection).delete(workspace_name:, data_store_name:) - end - - def self.geotiff(workspace_name:, file_path:, id:, title: nil, connection: nil) - coverage_store_name = "berkeley_#{id}" - create_workspace(workspace_name:, connection:) - create_coverage_store(workspace_name:, coverage_store_name:, url: file_path, - connection:) - create_coverage(workspace_name:, coverage_store_name:, coverage_name: id, title:, - connection:) - end - - def self.shapefile(workspace_name:, file_path:, id:, title: nil, connection: nil) - data_store_name = "berkeley_#{id}" - create_workspace(workspace_name:, connection:) - create_data_store(workspace_name:, data_store_name:, url: file_path, connection:) - create_feature_type(workspace_name:, data_store_name:, feature_type_name: id, title:, - connection:) - end - end -end diff --git a/spec/fixture/.DS_Store b/spec/fixture/.DS_Store new file mode 100644 index 0000000..24d04bb Binary files /dev/null and b/spec/fixture/.DS_Store differ diff --git a/spec/fixture/zipfile/.DS_Store b/spec/fixture/zipfile/.DS_Store new file mode 100644 index 0000000..bb8dcbc Binary files /dev/null and b/spec/fixture/zipfile/.DS_Store differ diff --git a/spec/watcher_spec.rb b/spec/watcher_spec.rb new file mode 100644 index 0000000..cd30c09 --- /dev/null +++ b/spec/watcher_spec.rb @@ -0,0 +1,118 @@ +# frozen_string_literal: true +require 'open3' +require 'spec_helper' +require 'fileutils' + +# Class for mocking returned Process::Status objects +class MockStatus + class << self + def successful + new(true) + end + + def failed + new(false) + end + end + + def initialize(success) + @success = success + end + + def success? + @success + end +end + +RSpec.describe Gingr::Watcher do + before(:each) { FileUtils.rm_rf Dir.glob('/opt/app/data/gingr/*/*') } + + context 'a valid watcher' do + subject :watcher do + Gingr::Watcher.new( + '/opt/app/data/gingr', + '--solr-url=http://solr:8983/solr/geodata-test', + '--geoserver-url=http://admin:geoserver@geoserver:8080/geoserver/rest/', + '--geoserver-root=/opt/app/data/geoserver', + '--geoserver-secure-root=/opt/app/data/geoserver', + '--spatial-root=/opt/app/data/spatial', + '--update-reference-field', + ) + end + + it 'passes arguments to `gingr all`' do + expect(Open3).to receive(:capture3).with('gingr', 'all', + '/opt/app/data/gingr/ready/vector.zip', + '--solr-url=http://solr:8983/solr/geodata-test', + '--geoserver-url=http://admin:geoserver@geoserver:8080/geoserver/rest/', + '--geoserver-root=/opt/app/data/geoserver', + '--geoserver-secure-root=/opt/app/data/geoserver', + '--spatial-root=/opt/app/data/spatial', + '--update-reference-field', + ).and_return(['', '', MockStatus.successful]) + + copy_zipfile_to_ready('vector.zip') + watcher.exec_gingr_all!('/opt/app/data/gingr/ready/vector.zip') + end + + it 'moves successfully processed files to the processed directory' do + expect(Open3).to receive(:capture3).and_return(['', '', MockStatus.successful]) + + copy_zipfile_to_ready('vector.zip') + watcher.exec_gingr_all!('/opt/app/data/gingr/ready/vector.zip') + expect(File).to exist('/opt/app/data/gingr/processed/vector.zip') + end + + it 'processes newly added files and keeps processing on error' do + watcher.start + + (1..5).each do |i| + expection = expect(watcher) + .to receive(:exec_gingr_all!) + .with "/opt/app/data/gingr/ready/vector#{i}.zip" + expection.and_raise if i.odd? + + copy_zipfile_to_ready('vector.zip', "vector#{i}.zip") + sleep 3 + end + end + end + + context 'a watcher with invalid arguments' do + subject :watcher do + Gingr::Watcher.new( + '/opt/app/data/gingr', + '--solr-url=http://solr:8983/solr/geodata-test', + '--geoserver-url=http://admin:geoserver@geoserver:8081/geoserver/rest/', + '--invalid-argument' + ) + end + + it 'moves failed files and the logs to the failed directory' do + copy_zipfile_to_ready('vector.zip') + + expect(Open3).to receive(:capture3).and_return(['', 'Unknown switches', MockStatus.failed]) + expect { watcher.exec_gingr_all!('/opt/app/data/gingr/ready/vector.zip') }.to raise_error Gingr::Watcher::SubprocessError + expect(File).to exist('/opt/app/data/gingr/failed/vector.zip') + expect(File).to exist('/opt/app/data/gingr/failed/vector.log') + expect(File.read('/opt/app/data/gingr/failed/vector.log')).to match(/Unknown switches/) + end + end + + context 'a watcher that cannot write to its watch dirs' do + subject :watcher do + Gingr::Watcher.new('/opt/app/data/gingr-does-not-exist') + end + + it 'fails to initialize' do + expect { watcher }.to raise_error(Gingr::Watcher::DirectoryError) + end + end + + private + + def copy_zipfile_to_ready(filename, newname = nil) + newname ||= filename + FileUtils.cp("/opt/app/spec/fixture/zipfile/#{filename}", "/opt/app/data/gingr/ready/#{newname}") + end +end diff --git a/tmp/.keep b/tmp/.keep new file mode 100644 index 0000000..e69de29