From 34dbce12e43b113fd23be7b9fb44918bccd52b0c Mon Sep 17 00:00:00 2001 From: Daniel Schmidt Date: Tue, 14 Nov 2023 12:02:06 -0800 Subject: [PATCH] DP-827: Adds the watcher CLI command --- .gitignore | 9 ++- Gemfile | 2 + Gemfile.lock | 11 +++ README.md | 20 ++++- data/.keep | 0 data/gingr/.DS_Store | Bin 0 -> 6148 bytes data/gingr/.keep | 0 data/gingr/failed/.keep | 0 data/gingr/processed/.keep | 0 data/gingr/ready/.keep | 0 docker-compose.yml | 17 +--- lib/gingr.rb | 4 +- lib/gingr/cli.rb | 18 +++++ lib/gingr/geoserver_publisher.rb | 6 +- lib/gingr/import_util.rb | 3 +- lib/gingr/watcher.rb | 117 +++++++++++++++++++++++++++ lib/monkeypatch/geoserver/publish.rb | 35 -------- spec/fixture/.DS_Store | Bin 0 -> 6148 bytes spec/fixture/zipfile/.DS_Store | Bin 0 -> 6148 bytes spec/watcher_spec.rb | 116 ++++++++++++++++++++++++++ tmp/.keep | 0 21 files changed, 298 insertions(+), 60 deletions(-) create mode 100644 data/.keep create mode 100644 data/gingr/.DS_Store create mode 100644 data/gingr/.keep create mode 100644 data/gingr/failed/.keep create mode 100644 data/gingr/processed/.keep create mode 100644 data/gingr/ready/.keep create mode 100644 lib/gingr/watcher.rb delete mode 100644 lib/monkeypatch/geoserver/publish.rb create mode 100644 spec/fixture/.DS_Store create mode 100644 spec/fixture/zipfile/.DS_Store create mode 100644 spec/watcher_spec.rb create mode 100644 tmp/.keep diff --git a/.gitignore b/.gitignore index bb0196c..1cd1499 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,5 @@ /log/* /zold_app/* -/data/* /tmp/* /import/* .irb_history @@ -8,7 +7,15 @@ .bundle/* ./local/* +/data/* +!/data/gingr +/data/gingr/ready/* +/data/gingr/processed/* +/data/gingr/failed/* + /solr/geodata-test/data /solr/geodata-test/data /solr/geodata/data /solr/geodata/data + +!**/.keep diff --git a/Gemfile b/Gemfile index 459eba8..8a024e2 100644 --- a/Gemfile +++ b/Gemfile @@ -13,3 +13,5 @@ gem 'uri' group :test do gem 'rspec', '~> 3.12' end + +gem "listen", "~> 3.8" diff --git a/Gemfile.lock b/Gemfile.lock index 6481549..d523591 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -54,6 +54,7 @@ GEM faraday-net_http_persistent (2.1.0) faraday (~> 2.5) net-http-persistent (~> 4.0) + ffi (1.16.3) geo_combine (0.8.0) activesupport faraday-net_http_persistent (~> 2.0) @@ -76,6 +77,9 @@ GEM reline (>= 0.3.8) json-schema (4.1.1) addressable (>= 2.8) + listen (3.8.0) + rb-fsevent (~> 0.10, >= 0.10.3) + rb-inotify (~> 0.9, >= 0.9.10) lograge (0.14.0) actionpack (>= 4) activesupport (>= 4) @@ -88,6 +92,8 @@ GEM mutex_m (0.1.2) net-http-persistent (4.0.2) connection_pool (~> 2.2) + nokogiri (1.15.4-aarch64-linux) + racc (~> 1.4) nokogiri (1.15.4-x86_64-darwin) racc (~> 1.4) nokogiri (1.15.4-x86_64-linux) @@ -123,6 +129,9 @@ GEM thor (~> 1.0, >= 1.2.2) zeitwerk (~> 2.6) rake (13.1.0) + rb-fsevent (0.11.2) + rb-inotify (0.10.1) + ffi (~> 1.0) rchardet (1.8.0) rdoc (6.5.0) psych (>= 4.0.0) @@ -160,6 +169,7 @@ GEM zeitwerk (2.6.12) PLATFORMS + aarch64-linux x86_64-darwin-22 x86_64-linux @@ -168,6 +178,7 @@ DEPENDENCIES faraday-net_http_persistent (~> 2.0) geo_combine geoserver-publish (~> 0.7.0) + listen (~> 3.8) rsolr rspec (~> 3.12) rubyzip diff --git a/README.md b/README.md index 0574642..cc4b415 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Command-line tool for ingesting GeoData (Solr, GeoServer, and file servers). -## Quick Start +## Quick start The app is Dockerized with an image designed to just run the `gingr` executable: @@ -23,5 +23,21 @@ docker compose exec app bash # You can also run commands from your host, either via `run` or `exec`: docker compose exec app gingr help docker compose run --rm app gingr help -docker compose run --rm app gingr all /opt/app/spec/fixture/zipfile/test_public.zip +docker compose run --rm app gingr all spec/fixture/zipfile/vector.zip +``` + +## Watching a directory for new files + +Gingr's `watch` command monitors a directory for new `.zip` files and automatically ingest them. It accepts the path to a gingr root directory (which should contain `ready`, `processed`, and `failed` subdirectories) and the same arguments as `gingr all`: + +``` +gingr watch /path/to/directory [args to gingr all] +``` + +Gingr will error if the directory doesn't exist or if it doesn't contain the expected subdirectories: + +``` +./ready +./processed +./failed ``` diff --git a/data/.keep b/data/.keep new file mode 100644 index 0000000..e69de29 diff --git a/data/gingr/.DS_Store b/data/gingr/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..5008ddfcf53c02e82d7eee2e57c38e5672ef89f6 GIT binary patch literal 6148 zcmeH~Jr2S!425mzP>H1@V-^m;4Wg<&0T*E43hX&L&p$$qDprKhvt+--jT7}7np#A3 zem<@ulZcFPQ@L2!n>{z**++&mCkOWA81W14cNZlEfg7;MkzE(HCqgga^y>{tEnwC%0;vJ&^%eQ zLs35+`xjp>T0 diff --git a/lib/gingr.rb b/lib/gingr.rb index bb26ca8..7ec4660 100644 --- a/lib/gingr.rb +++ b/lib/gingr.rb @@ -1,14 +1,12 @@ # frozen_string_literal: true -# monkey-patch first -require_relative 'monkeypatch/geoserver/publish' - require_relative 'gingr/cli' require_relative 'gingr/config' require_relative 'gingr/data_handler' require_relative 'gingr/geoserver_publisher' require_relative 'gingr/import_util' require_relative 'gingr/solr_indexer' +require_relative 'gingr/watcher' module Gingr # diff --git a/lib/gingr/cli.rb b/lib/gingr/cli.rb index 5ca6260..cd5de05 100644 --- a/lib/gingr/cli.rb +++ b/lib/gingr/cli.rb @@ -2,6 +2,7 @@ require 'thor' require_relative 'config' require_relative 'import_util' +require_relative 'watcher' module Gingr class Cli < Thor @@ -10,6 +11,23 @@ class Cli < Thor Thor.check_unknown_options! + desc 'watch', 'Watches a Gingr directory for files ready to be processed' + long_desc <<-TEXT, wrapping: false + EXAMPLES + gingr watch data/gingr --solr-url=https://foo:bar@solr.lib.berkeley.edu:8983/solr/geodata ... + TEXT + option :solr_url + option :update_reference_field, type: :boolean, default: false + option :spatial_root + option :geoserver_root + option :geoserver_url + option :geoserver_secure_url + def watch(root_dir = nil) + root_dir ||= ENV['GINGR_WATCH_DIRECTORY'] || '/opt/app/data/gingr' + watcher = Gingr::Watcher.new(root_dir, *options) + watcher.start! + end + desc 'solr', 'Giving a directory path, it will index all json files from the directory/sub-directory to solr' long_desc <<-TEXT, wrapping: false diff --git a/lib/gingr/geoserver_publisher.rb b/lib/gingr/geoserver_publisher.rb index c4cc199..1d2da1d 100644 --- a/lib/gingr/geoserver_publisher.rb +++ b/lib/gingr/geoserver_publisher.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true -require_relative '../monkeypatch/geoserver/publish' +require 'geoserver/publish' require 'uri' require_relative 'config' @@ -17,8 +17,8 @@ def update(filename) name = File.basename(filename, '.*') filepath = "file:///srv/geofiles/berkeley-#{name}/#{filename}" File.extname(filename).downcase == '.shp' ? publish_shapefile(filepath, name) : pulsih_geotiff(filepath, name) - rescue Geoserver::Publish::Error - Config.logger.error("Publish Geoserver error: #{filename}") + rescue Geoserver::Publish::Error => e + Config.logger.error("Publish Geoserver error: #{filename} -- #{e.inspect}") raise end diff --git a/lib/gingr/import_util.rb b/lib/gingr/import_util.rb index 93aa1bb..11b7f94 100644 --- a/lib/gingr/import_util.rb +++ b/lib/gingr/import_util.rb @@ -1,4 +1,5 @@ # frozen_string_literal: true +require 'find' require 'uri' require_relative 'config' require_relative 'geoserver_publisher' @@ -33,7 +34,7 @@ def get_reference_urls(options) hash = {} Config.reference_urls.each_key do |key| - url = options[key] || ENV.fetch(key.upcase) + url = options[key] || ENV.fetch(key.to_s.upcase) hash[key] = reference_url(url) if url end hash diff --git a/lib/gingr/watcher.rb b/lib/gingr/watcher.rb new file mode 100644 index 0000000..c0d8af5 --- /dev/null +++ b/lib/gingr/watcher.rb @@ -0,0 +1,117 @@ +# frozen_string_literal: true +require 'listen' +require 'open3' +require_relative 'config' + +module Gingr + class Watcher + include Config + + # Only watch for files in WATCHED_DIRECTORIES[:READY] that match this pattern + WATCH_FILTER = Regexp.compile(/\.zip$/) + + WATCHED_DIRECTORIES = { + # Directory into which new files are dropped when ready for processing. + # .watch! monitors this for new .zip's. + READY: 'ready'.freeze, + # Directory into which successfully processed files are moved post-processing. + PROCESSED: 'processed'.freeze, + # Directory into which failed files are moved post-processing. + FAILED: 'failed'.freeze, + } + + attr_reader :options + attr_reader :root_dir + + def initialize(root_dir, *options) + # This is the Gingr root directory, not the directory to be watched. + # Watcher watches the ./ready directory under this one. + @root_dir = root_dir + + # Options are passed as-is to `gingr all`, so they should match the + # arguments you'd otherwise pass to that command + @options = options + + validate_directories! + end + + def start! + start + sleep + end + + def start + Config.logger.info("Monitoring directory for new zipfiles: #{ready_dir}") + listener.start unless listener.processing? + end + + def listener + @listener ||= begin + Listen.to(ready_dir, only: WATCH_FILTER, force_polling: true) do |_, added, _| + added.each do |zipfile| + Config.logger.info("Processing zipfile: #{zipfile}") + + begin + exec_gingr_with_file!(zipfile) + rescue => e + Config.logger.error("Error processing #{zipfile}, moving to #{failed_dir}: #{e.inspect}") + end + end + end + end + end + + def exec_gingr_with_file!(zipfile) + begin + command = ['gingr', 'all', zipfile, *options] + Config.logger.debug("Running command: #{command}") + + stdout, stderr, status = Open3.capture3(*command) + if !status.success? + raise SubprocessError, "Call to `gingr all` failed: #{status}" + end + + Config.logger.debug("Processed #{zipfile}, moving to #{processed_dir}") + FileUtils.mv(zipfile, processed_dir) + rescue => e + FileUtils.mv(zipfile, failed_dir) + File.write(path_to_logfile(zipfile), "#{stdout}\n#{stderr}\n") + raise + end + end + + def ready_dir + @ready_dir ||= File.join(@root_dir, WATCHED_DIRECTORIES[:READY]) + end + + def processed_dir + @processed_dir ||= File.join(@root_dir, WATCHED_DIRECTORIES[:PROCESSED]) + end + + def failed_dir + @failed_dir ||= File.join(@root_dir, WATCHED_DIRECTORIES[:FAILED]) + end + + private + + def path_to_logfile(zipfile) + File.join(failed_dir, "#{File.basename(zipfile, '.*')}.log") + end + + def validate_directories! + validate_directory!('Ready', ready_dir) + validate_directory!('Processed', processed_dir) + validate_directory!('Failed', failed_dir) + end + + def validate_directory!(name, filepath) + if !File.writable?(filepath) + raise DirectoryError, "#{name} directory is not writable: #{filepath}" + end + end + + # Typed errors to help with test writing / figuring out what exactly failed + class DirectoryError < StandardError; end + class SubprocessError < StandardError; end + end +end diff --git a/lib/monkeypatch/geoserver/publish.rb b/lib/monkeypatch/geoserver/publish.rb deleted file mode 100644 index 4c7169d..0000000 --- a/lib/monkeypatch/geoserver/publish.rb +++ /dev/null @@ -1,35 +0,0 @@ -# frozen_string_literal: true -require 'geoserver/publish' - -# Monkey-patch geoserver-publish gem to prefix everything with berkeley_ -# @note Is this really necessary? -module Geoserver - module Publish - def self.delete_geotiff(workspace_name:, id:, connection: nil) - coverage_store_name = "berkeley_#{id}" - CoverageStore.new(connection).delete(workspace_name:, coverage_store_name:) - end - - def self.delete_shapefile(workspace_name:, id:, connection: nil) - data_store_name = "berkeley_#{id}" - DataStore.new(connection).delete(workspace_name:, data_store_name:) - end - - def self.geotiff(workspace_name:, file_path:, id:, title: nil, connection: nil) - coverage_store_name = "berkeley_#{id}" - create_workspace(workspace_name:, connection:) - create_coverage_store(workspace_name:, coverage_store_name:, url: file_path, - connection:) - create_coverage(workspace_name:, coverage_store_name:, coverage_name: id, title:, - connection:) - end - - def self.shapefile(workspace_name:, file_path:, id:, title: nil, connection: nil) - data_store_name = "berkeley_#{id}" - create_workspace(workspace_name:, connection:) - create_data_store(workspace_name:, data_store_name:, url: file_path, connection:) - create_feature_type(workspace_name:, data_store_name:, feature_type_name: id, title:, - connection:) - end - end -end diff --git a/spec/fixture/.DS_Store b/spec/fixture/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..24d04bb56f5edcc55348d25d098a3d225a75d4cf GIT binary patch literal 6148 zcmeHK%}T>S5Z-O8O({YT3VK`cTChJ-5icRu7cim+m70*M!I&*gYQR#+U0=u-@p+ut z-3ZkjJc-yD*!^bbXE*af_J=XXomtRjti>3!pdoTpN(9ZNu8IjpvgMcHLYv+US@9UrL)BGrk6N6 zlQQxP-Sf}GaW-mg9?LlO!gxGY31KjXkgJO@4rJ!YSsWxP*VPWIVKqjro%y`q>$OGq zpgUN!#r&|>X^Z{-V6kXeTf2Klr=y4HF_tfyOAfRv*)UkZD=1&edUB_6B;z~q6j?%r)wC#?{I&k6F2Ryl}NT*o6vb+}21vF+dC~Gf>sT z7M}kn@XM4w@|RO+L<|rE|BL}%8@fXWiZW;GxAO3;70_;=pxKC-Qx d+AmOtINM;Z5obZWN(ZEifFguCV&E4T_yDW%Ow9lQ literal 0 HcmV?d00001 diff --git a/spec/fixture/zipfile/.DS_Store b/spec/fixture/zipfile/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..bb8dcbc0c541e49f74d68f4ca1d359e598c73576 GIT binary patch literal 6148 zcmeHK!AitH41Li>7WUvCJDC5fw^F29kkfAQ?yoevSd2 z*^=eTF>NxC3?u^|3}}BSRACc1I@Ya&rnUe?U8Aee*JW8bC9nw`9l1g=PbGS)MT=pc z&iN8?P2lM0>9A-%ES~({yjXoY>z5i1YaG)i1IfUYfm0u@b^c%Rmzizyrztr~29kk) z#(*uhH`@(AD$dqVpQp37Q17S`#f@?(^e2}94z!P4)J0z}@)_3zj*hCL`I=75i$EnL KO)~Hc47>qeyf~8p literal 0 HcmV?d00001 diff --git a/spec/watcher_spec.rb b/spec/watcher_spec.rb new file mode 100644 index 0000000..21a3610 --- /dev/null +++ b/spec/watcher_spec.rb @@ -0,0 +1,116 @@ +# frozen_string_literal: true +require 'open3' +require 'spec_helper' +require 'fileutils' + +# Class for mocking returned Process::Status objects +class MockStatus + def initialize(success) + @success = success + end + + def success? + @success + end + + def self.successful + new(true) + end + + def self.failed + new(false) + end +end + +RSpec.describe Gingr::Watcher do + before(:each) { FileUtils.rm_rf Dir.glob('/opt/app/data/gingr/*/*') } + + context 'a valid watcher' do + subject :watcher do + Gingr::Watcher.new( + '/opt/app/data/gingr', + '--solr-url=http://solr:8983/solr/geodata-test', + '--geoserver-url=http://admin:geoserver@geoserver:8080/geoserver/rest/', + '--geoserver-root=/opt/app/data/geoserver', + '--geoserver-secure-root=/opt/app/data/geoserver', + '--spatial-root=/opt/app/data/spatial', + '--update-reference-field', + ) + end + + it 'passes arguments to `gingr all`' do + expect(Open3).to receive(:capture3).with('gingr', 'all', + '/opt/app/data/gingr/ready/vector.zip', + '--solr-url=http://solr:8983/solr/geodata-test', + '--geoserver-url=http://admin:geoserver@geoserver:8080/geoserver/rest/', + '--geoserver-root=/opt/app/data/geoserver', + '--geoserver-secure-root=/opt/app/data/geoserver', + '--spatial-root=/opt/app/data/spatial', + '--update-reference-field', + ).and_return(['', '', MockStatus.successful]) + + copy_zipfile_to_ready('vector.zip') + watcher.exec_gingr_with_file!('/opt/app/data/gingr/ready/vector.zip') + end + + it 'moves successfully processed files to the processed directory' do + expect(Open3).to receive(:capture3).and_return(['', '', MockStatus.successful]) + + copy_zipfile_to_ready('vector.zip') + watcher.exec_gingr_with_file!('/opt/app/data/gingr/ready/vector.zip') + expect(File).to exist('/opt/app/data/gingr/processed/vector.zip') + end + + it 'processes newly added files and keeps processing on error' do + watcher.start + + (1..5).each do |i| + expection = expect(watcher) + .to receive(:exec_gingr_with_file!) + .with "/opt/app/data/gingr/ready/vector#{i}.zip" + expection.and_raise if i.odd? + + copy_zipfile_to_ready('vector.zip', "vector#{i}.zip") + sleep 3 + end + end + end + + context 'a watcher with invalid arguments' do + subject :watcher do + Gingr::Watcher.new( + '/opt/app/data/gingr', + '--solr-url=http://solr:8983/solr/geodata-test', + '--geoserver-url=http://admin:geoserver@geoserver:8081/geoserver/rest/', + '--invalid-argument' + ) + end + + it 'moves failed files and the logs to the failed directory' do + copy_zipfile_to_ready('vector.zip') + + expect(Open3).to receive(:capture3).and_return(['', 'Unknown switches', MockStatus.failed]) + expect { watcher.exec_gingr_with_file!('/opt/app/data/gingr/ready/vector.zip') }.to raise_error Gingr::Watcher::SubprocessError + expect(File).to exist('/opt/app/data/gingr/failed/vector.zip') + expect(File).to exist('/opt/app/data/gingr/failed/vector.log') + expect(File.read('/opt/app/data/gingr/failed/vector.log')).to match(/Unknown switches/) + end + end + + context 'a watcher that cannot write to its watch dirs' do + subject :watcher do + Gingr::Watcher.new('/opt/app/data/gingr-does-not-exist') + end + + it 'fails to initialize' do + expect { watcher }.to raise_error(Gingr::Watcher::DirectoryError) + end + end + + private + + def copy_zipfile_to_ready(filename, newname = nil) + newname ||= filename + FileUtils.cp("/opt/app/spec/fixture/zipfile/#{filename}", "/opt/app/data/gingr/ready/#{newname}") + end +end diff --git a/tmp/.keep b/tmp/.keep new file mode 100644 index 0000000..e69de29