diff --git a/Dockerfile b/Dockerfile index 62ed0a9b..e7db7cf2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,7 +7,6 @@ RUN lein uberjar FROM gcr.io/distroless/java17-debian12 COPY --from=builder /app/target/eduhub-rio-mapper.jar /eduhub-rio-mapper.jar -COPY --from=builder /app/test/test-clients.json /test-clients.json # Make sure there is an opentelemetry agent in the workdir in case docker-compose # starts up a process with -javaagent in the JAVA_TOOL_OPTIONS COPY --from=builder /app/vendor/opentelemetry-javaagent-2.9.0.jar /opentelemetry-javaagent.jar diff --git a/docker-compose.yml b/docker-compose.yml index fee239bd..3e7cd39a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,7 +19,7 @@ services: env_file: - .envrc # read environment from local direnv settings environment: - CLIENTS_INFO_PATH: /test-clients.json + CLIENTS_INFO_PATH: /test/test-clients.json REDIS_URI: redis://redis OTEL_METRICS_EXPORTER: prometheus OTEL_EXPORTER_PROMETHEUS_ENDPOINT: http://localhost:9464/metrics @@ -40,7 +40,7 @@ services: env_file: - .envrc # read environment from local direnv settings environment: - CLIENTS_INFO_PATH: /test-clients.json + CLIENTS_INFO_PATH: /test/test-clients.json REDIS_URI: redis://redis OTEL_METRICS_EXPORTER: prometheus OTEL_EXPORTER_PROMETHEUS_ENDPOINT: http://localhost:9464/metrics diff --git a/project.clj b/project.clj index 8ca3c7c5..eca1b0ae 100644 --- a/project.clj +++ b/project.clj @@ -55,6 +55,7 @@ [nl.jomco/proof-specs "RELEASE"] [ring/ring-mock "RELEASE"]] :plugins [[lein-ancient "RELEASE"]] + ;; Uncomment this to test the opentelemetry agent ;;:jvm-opts ["-javaagent:vendor/opentelemetry-javaagent-2.9.0.jar"] :aliases {"lint" ["run" "-m" "clj-kondo.main" "--lint" "src" "test"] "check-deps" ["ancient" "check" ":no-profiles" ":exclude" "keep-this-version"] diff --git a/src/nl/surf/eduhub_rio_mapper/config.clj b/src/nl/surf/eduhub_rio_mapper/config.clj index 3858f111..da9e9934 100644 --- a/src/nl/surf/eduhub_rio_mapper/config.clj +++ b/src/nl/surf/eduhub_rio_mapper/config.clj @@ -160,19 +160,26 @@ trust-store-pass)) (assoc :clients (clients-info/read-clients-data clients-info-config))))))) -(defn make-config-and-handlers [web-api?] +(defn make-config-and-handlers-web [] + (let [{:keys [clients] :as cfg} (make-config) + handlers (processing/make-handlers cfg) + config (update cfg :worker merge + {:queues (clients-info/institution-schac-homes clients) + :queue-fn :institution-schac-home})] + {:handlers handlers :config config})) + +(defn make-config-and-handlers-worker [] (let [{:keys [clients] :as cfg} (make-config) handlers (processing/make-handlers cfg) schac-home-to-name (reduce (fn [h c] (assoc h (:institution-schac-home c) (:institution-name c))) {} clients) institution-schac-homes (clients-info/institution-schac-homes clients) config (update cfg :worker merge - {:queues (clients-info/institution-schac-homes clients) - :queue-fn :institution-schac-home - :run-job-fn #(job/run! handlers % (= (System/getenv "STORE_HTTP_REQUESTS") "true")) - :set-status-fn (status/make-set-status-fn cfg) - :retryable-fn status/retryable? - :error-fn status/errors? + {:queues (clients-info/institution-schac-homes clients) + :queue-fn :institution-schac-home + :run-job-fn #(job/run! handlers % (= (System/getenv "STORE_HTTP_REQUESTS") "true")) + :set-status-fn (status/make-set-status-fn cfg) + :retryable-fn status/retryable? + :error-fn status/errors? ;; The web-api doesn't need the job-counter - :jobs-counter (if web-api? (constantly nil) - (metrics/make-jobs-counter schac-home-to-name #(worker/queue-counts-by-key % cfg) institution-schac-homes))})] + :jobs-counter-fn (metrics/make-jobs-counter schac-home-to-name #(worker/queue-counts-by-key % cfg) institution-schac-homes)})] {:handlers handlers :config config})) diff --git a/src/nl/surf/eduhub_rio_mapper/main.clj b/src/nl/surf/eduhub_rio_mapper/main.clj index fe0761d2..7a07c383 100644 --- a/src/nl/surf/eduhub_rio_mapper/main.clj +++ b/src/nl/surf/eduhub_rio_mapper/main.clj @@ -41,7 +41,9 @@ (println (config/help)) (System/exit 0)) - (let [result (cli-commands/process-command command args (config/make-config-and-handlers (= command "serve-api")))] + (let [result (cli-commands/process-command command args (if (= command "worker") + (config/make-config-and-handlers-worker) + (config/make-config-and-handlers-web)))] (case command ("serve-api" "worker") nil diff --git a/src/nl/surf/eduhub_rio_mapper/worker.clj b/src/nl/surf/eduhub_rio_mapper/worker.clj index 4f5503e4..38acb93b 100644 --- a/src/nl/surf/eduhub_rio_mapper/worker.clj +++ b/src/nl/surf/eduhub_rio_mapper/worker.clj @@ -188,8 +188,11 @@ - `error-fn` a function which takes one argument, the result of the job, and returns true when the job failed. - - `retryable-fn` a functions which one argument, the result of the + - `retryable-fn` a function which takes one argument, the result of the job, and returns true when job failed but can be retried. + + - `jobs-counter-fn` a function which takes two arguments, a job and a job status, + and updates the metrics related to the job status type. Also updates the metrics for the queues. " [{{:keys [queues lock-ttl-ms @@ -203,7 +206,7 @@ retryable-fn run-job-fn set-status-fn - jobs-counter] + jobs-counter-fn] ;; Set lock expiry to 1 minute; locks in production have unexpectedly expired with shorter intervals :or {lock-ttl-ms 60000 nap-ms 1000}} :worker @@ -211,7 +214,7 @@ stop-atom] {:pre [retry-wait-ms max-retries - jobs-counter + jobs-counter-fn (seq queues) (fn? run-job-fn) (fn? set-status-fn) (ifn? retryable-fn) (ifn? error-fn) (ifn? queue-fn)]} @@ -233,9 +236,9 @@ (str (Instant/now))))] ;; Don't count job as started while retrying it (when (nil? (::retries job)) - (jobs-counter job :started)) + (jobs-counter-fn job :started)) ;; run job asynchronous - (let [set-status-fn (metrics/wrap-increment-count jobs-counter set-status-fn) + (let [set-status-fn (metrics/wrap-increment-count jobs-counter-fn set-status-fn) c (async/thread (.setName (Thread/currentThread) (str "runner-" queue)) (run-job-fn job))] diff --git a/test/nl/surf/eduhub_rio_mapper/worker_test.clj b/test/nl/surf/eduhub_rio_mapper/worker_test.clj index 2c7d4ec3..81cf3ef4 100644 --- a/test/nl/surf/eduhub_rio_mapper/worker_test.clj +++ b/test/nl/surf/eduhub_rio_mapper/worker_test.clj @@ -27,15 +27,15 @@ {:redis-conn {:pool {} :spec {:uri (or (System/getenv "REDIS_URI") "redis://localhost")}} :redis-key-prefix "eduhub-rio-mapper-test" :status-ttl-sec 10 - :worker {:nap-ms 10 - :retry-wait-ms 10 - :max-retries 3 - :queues ["foo" "bar"] - :queue-fn :queue - :retryable-fn (constantly false) - :error-fn (constantly false) - :jobs-counter (constantly nil) - :set-status-fn (fn [_ _ & [_]] (comment "nop"))}}) + :worker {:nap-ms 10 + :retry-wait-ms 10 + :max-retries 3 + :queues ["foo" "bar"] + :queue-fn :queue + :retryable-fn (constantly false) + :error-fn (constantly false) + :jobs-counter-fn (constantly nil) + :set-status-fn (fn [_ _ & [_]] (comment "nop"))}}) (deftest ^:redis worker (let [job-runs (atom {"foo" [], "bar" []}) @@ -80,7 +80,7 @@ max-retries 3 config (-> config (assoc-in [:worker :max-retries] max-retries) - (assoc-in [:worker :jobs-counter] (constantly nil)) + (assoc-in [:worker :jobs-counter-fn] (constantly nil)) (assoc-in [:worker :run-job-fn] (fn [job] (reset! last-seen-job (dissoc job :started-at)) @@ -109,7 +109,7 @@ max-retries 3 config (-> config (assoc-in [:worker :max-retries] max-retries) - (assoc-in [:worker :jobs-counter] (constantly nil)) + (assoc-in [:worker :jobs-counter-fn] (constantly nil)) (assoc-in [:worker :run-job-fn] (fn [job] (reset! last-seen-job (dissoc job :started-at)) @@ -138,7 +138,7 @@ retry-wait-ms 3000 config (-> config (assoc-in [:worker :retry-wait-ms] retry-wait-ms) - (assoc-in [:worker :jobs-counter] (constantly nil)) + (assoc-in [:worker :jobs-counter-fn] (constantly nil)) (assoc-in [:worker :run-job-fn] (fn [job] (reset! last-seen-job (dissoc job :started-at)) @@ -172,7 +172,7 @@ config (-> config (assoc-in [:worker :error-fn] :error?) (assoc-in [:worker :run-job-fn] identity) - (assoc-in [:worker :jobs-counter] (constantly nil)) + (assoc-in [:worker :jobs-counter-fn] (constantly nil)) (assoc-in [:worker :set-status-fn] (fn [job status & [data]] (reset! last-seen-status {:job (dissoc job :started-at)